& s)
{
if(Done() || no_parallel || max_chunks<2 || !chunks)
{
super::ShowRunStatus(s);
return;
}
const char *name=SqueezeName(s->GetWidthDelayed()-58);
off_t size=GetSize();
StringSet status;
status.AppendFormat(PGET_STATUS);
int w=s->GetWidthDelayed();
char *bar=string_alloca(w--);
memset(bar,'+',w);
bar[w]=0;
int i;
int p=c->GetPos()*w/size;
for(i=start0*w/size; iDone()?chunks[chunk]->limit:chunks[chunk]->GetPos())*w/size;
for(i=chunks[chunk]->start*w/size; i
limit*w/size;
for( ; i
Show(status);
}
// list subjobs (chunk xfers) only when verbose
xstring& pgetJob::FormatJobs(xstring& s,int verbose,int indent)
{
indent--;
if(!chunks)
return Job::FormatJobs(s,verbose,indent);
if(verbose>1)
{
if(c->GetPos()SetRangeLimit(limit0); // to see right ETA.
CopyJob::FormatStatus(s,verbose,"\t");
c->SetRangeLimit(FILE_END);
}
Job::FormatJobs(s,verbose,indent);
}
return s;
}
xstring& pgetJob::FormatStatus(xstring& s,int verbose,const char *prefix)
{
if(Done() || no_parallel || max_chunks<2 || !chunks)
return super::FormatStatus(s,verbose,prefix);
s.append(prefix);
const char *name=GetDispName();
off_t size=GetSize();
s.appendf(PGET_STATUS);
return s.append('\n');
}
void pgetJob::free_chunks()
{
if(chunks)
{
for(int i=0; iGetBytesCount();
chunks.unset();
}
}
pgetJob::pgetJob(FileCopy *c1,const char *n,int m)
: CopyJob(c1,n,"pget")
{
chunks_bytes=0;
start0=limit0=0;
total_xferred=0;
total_xfer_rate=0;
no_parallel=false;
chunks_done=false;
pget_cont=c->SetContinue(false);
max_chunks=m?m:ResMgr::Query("pget:default-n",0);
total_eta=-1;
status_timer.SetResource("pget:save-status",0);
const Ref& local=c->put->GetLocal();
if(local && local->full_name)
{
status_file.vset(local->full_name.get(),".lftp-pget-status",NULL);
if(pget_cont)
LoadStatus0();
}
}
void pgetJob::PrepareToDie()
{
free_chunks();
super::PrepareToDie();
}
pgetJob::~pgetJob()
{
}
pgetJob::ChunkXfer *pgetJob::NewChunk(const char *remote,off_t start,off_t limit)
{
const Ref& local=c->put->GetLocal();
FileCopyPeerFDStream
*dst_peer=new FileCopyPeerFDStream(local,FileCopyPeer::PUT);
dst_peer->NeedSeek(); // seek before writing
dst_peer->SetBase(0);
FileCopy *c1=FileCopy::New(c->get->Clone(),dst_peer,false);
c1->SetRange(start,limit);
c1->SetSize(GetSize());
c1->DontCopyDate();
c1->DontVerify();
c1->FailIfCannotSeek();
ChunkXfer *chunk=new ChunkXfer(c1,remote,start,limit);
chunk->cmdline.setf("\\chunk %lld-%lld",(long long)start,(long long)(limit-1));
return chunk;
}
pgetJob::ChunkXfer::ChunkXfer(FileCopy *c1,const char *name,
off_t s,off_t lim)
: CopyJob(c1,name,"pget-chunk")
{
start=s;
limit=lim;
}
void pgetJob::SaveStatus()
{
if(!status_file)
return;
FILE *f=fopen(status_file,"w");
if(!f)
return;
off_t size=GetSize();
fprintf(f,"size=%lld\n",(long long)size);
int i=0;
fprintf(f,"%d.pos=%lld\n",i,(long long)GetPos());
if(!chunks)
goto out_close;
fprintf(f,"%d.limit=%lld\n",i,(long long)limit0);
for(int chunk=0; chunkDone())
continue;
i++;
fprintf(f,"%d.pos=%lld\n",i,(long long)chunks[chunk]->GetPos());
fprintf(f,"%d.limit=%lld\n",i,(long long)chunks[chunk]->limit);
}
out_close:
fclose(f);
}
void pgetJob::LoadStatus0()
{
if(!status_file)
return;
FILE *f=fopen(status_file,"r");
if(!f) {
int saved_errno=errno;
// Probably the file is already complete
// or it was previously downloaded by plain get.
struct stat st;
if(stat(c->put->GetLocal()->full_name,&st)==-1)
return;
Log::global->Format(0,"pget: %s: cannot open (%s), resuming at the file end\n",
status_file.get(),strerror(saved_errno));
c->SetRange(st.st_size,FILE_END);
return;
}
long long size;
if(fscanf(f,"size=%lld\n",&size)<1)
goto out_close;
long long pos;
int j;
if(fscanf(f,"%d.pos=%lld\n",&j,&pos)<2 || j!=0)
goto out_close;
Log::global->Format(10,"pget: got chunk[%d] pos=%lld\n",j,pos);
c->SetRange(pos,FILE_END);
out_close:
fclose(f);
}
void pgetJob::LoadStatus()
{
if(!status_file)
return;
FILE *f=fopen(status_file,"r");
if(!f)
return;
struct stat st;
if(fstat(fileno(f),&st)<0)
{
out_close:
fclose(f);
return;
}
long long size;
if(fscanf(f,"size=%lld\n",&size)<1)
goto out_close;
int i=0;
int max_chunks=st.st_size/20; // highest estimate - min 20 bytes per chunk in status file.
long long *pos=(long long *)alloca(2*max_chunks*sizeof(*pos));
long long *limit=pos+max_chunks;
for(;;)
{
int j;
if(fscanf(f,"%d.pos=%lld\n",&j,pos+i)<2 || j!=i)
break;
if(fscanf(f,"%d.limit=%lld\n",&j,limit+i)<2 || j!=i)
goto out_close;
if(i>0 && pos[i]>=limit[i])
continue;
Log::global->Format(10,"pget: got chunk[%d] pos=%lld\n",j,pos[i]);
Log::global->Format(10,"pget: got chunk[%d] limit=%lld\n",j,limit[i]);
i++;
}
if(i<1)
goto out_close;
if(sizeGetSize()) // file grew?
{
if(limit[i-1]==size)
limit[i-1]=c->GetSize();
else
{
pos[i]=size;
limit[i]=c->GetSize();
i++;
}
}
int num_of_chunks=i-1;
start0=pos[0];
limit0=limit[0];
c->SetRange(pos[0],FILE_END);
if(num_of_chunks<1)
goto out_close;
for(i=0; iSetParentFg(this,false);
chunks.append(c);
}
goto out_close;
}
void pgetJob::InitChunks(off_t offset,off_t size)
{
/* initialize chunks */
off_t chunk_size=(size-offset)/max_chunks;
int min_chunk_size=ResMgr::Query("pget:min-chunk-size",0);
if(chunk_sizeSetParentFg(this,false);
chunks.append(c);
curr_offs+=chunk_size;
}
assert(curr_offs==size);
}