/* * lftp - file transfer program * * Copyright (c) 1996-2016 by Alexander V. Lukyanov (lav@yars.free.net) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #include #include #include #include #include "pgetJob.h" #include "url.h" #include "misc.h" #include "log.h" ResType pget_vars[] = { {"pget:save-status", "10s", ResMgr::TimeIntervalValidate,ResMgr::NoClosure}, {"pget:default-n", "5", ResMgr::UNumberValidate,ResMgr::NoClosure}, {"pget:min-chunk-size", "1M", ResMgr::UNumberValidate,ResMgr::NoClosure}, {0} }; ResDecls pget_vars_register(pget_vars); #undef MIN #define MIN(a,b) ((a)<(b)?(a):(b)) #define super CopyJob int pgetJob::Do() { int m=STALL; if(Done()) return m; if(status_timer.Stopped()) { SaveStatus(); status_timer.Reset(); } if(c->Done()) { if(status_file) { remove(status_file); status_file.set(0); } } if(no_parallel || max_chunks<2) { c->Resume(); return super::Do(); } if(chunks_done && chunks && c->GetPos()>=limit0) { c->SetRangeLimit(limit0); // make it stop. c->Resume(); c->Do(); free_chunks(); m=MOVED; } if(chunks==0 || c->GetPos()Resume(); m|=super::Do(); } else if(chunks.count()>0) { if(chunks[0]->Error()) { Log::global->Format(0,"pget: chunk[%d] error: %s\n",0,chunks[0]->ErrorText()); no_parallel=true; c->Resume(); } else if(!chunks[0]->Done() && chunks[0]->GetBytesCount()Resume(); if(chunks.count()==1) { free_chunks(); no_parallel=true; } else { limit0=chunks[0]->c->GetRangeLimit(); chunks.remove(0); } m=MOVED; } else c->Suspend(); } if(Done()) return m; off_t offset=c->GetPos(); off_t size=c->GetSize(); if(chunks==0 && !chunks_done) { if(size==NO_SIZE_YET) return m; if(size==NO_SIZE || (c->put && c->put->GetLocal()==0)) { Log::global->Write(0,_("pget: falling back to plain get")); Log::global->Write(0," ("); if(c->put && c->put->GetLocal()==0) { Log::global->Write(0,_("the target file is remote")); if(size==NO_SIZE) Log::global->Write(0,", "); } if(size==NO_SIZE) Log::global->Write(0,_("the source file size is unknown")); Log::global->Write(0,")\n"); no_parallel=true; return m; } // Make sure the destination file is open before starting chunks, // it disables temp-name creation in the chunk's Init. if(c->put->GetLocal()->getfd()==-1) return m; c->put->NeedSeek(); // seek before writing if(pget_cont) LoadStatus(); else if(status_file) remove(status_file); if(!chunks) InitChunks(offset,size); m=MOVED; if(!chunks) { no_parallel=true; return m; } if(!pget_cont) { SaveStatus(); status_timer.Reset(); if(ResMgr::QueryBool("file:use-fallocate",0)) { // allocate space after creating *.lftp-pget-status file, // so that the incomplete status is more obvious. const Ref& local=c->put->GetLocal(); if(lftp_fallocate(local->getfd(),size)==-1 && errno!=ENOSYS && errno!=EOPNOTSUPP) { eprintf(_("pget: warning: space allocation for %s (%lld bytes) failed: %s\n"), local->name.get(),(long long)size,strerror(errno)); } } } } /* cycle through the chunks */ chunks_done=true; total_xferred=MIN(offset,limit0); off_t got_already=c->GetSize()-limit0; total_xfer_rate=c->GetRate(); off_t rem=limit0-c->GetPos(); if(rem<=0) total_eta=0; else total_eta=c->GetETA(rem); for(int i=0; iError()) { Log::global->Format(0,"pget: chunk[%d] error: %s\n",i,chunks[i]->ErrorText()); no_parallel=true; break; } if(!chunks[i]->Done()) { if(chunks[i]->GetPos()>=chunks[i]->start) total_xferred+=MIN(chunks[i]->GetPos(),chunks[i]->limit) -chunks[i]->start; if(total_eta>=0) { long eta=chunks[i]->GetETA(); if(eta<0) total_eta=-1; else if(eta>total_eta) total_eta=eta; // total eta is the maximum. } total_xfer_rate+=chunks[i]->GetRate(); chunks_done=false; } else // done { total_xferred+=chunks[i]->limit-chunks[i]->start; } got_already-=chunks[i]->limit-chunks[i]->start; } total_xferred+=got_already; if(no_parallel) { free_chunks(); return MOVED; } return m; } // xgettext:c-format static const char pget_status_format[]=N_("`%s', got %lld of %lld (%d%%) %s%s"); #define PGET_STATUS _(pget_status_format),name, \ (long long)total_xferred,(long long)size, \ percent(total_xferred,size),Speedometer::GetStrS(total_xfer_rate), \ c->GetETAStrSFromTime(total_eta) void pgetJob::ShowRunStatus(const SMTaskRef& s) { if(Done() || no_parallel || max_chunks<2 || !chunks) { super::ShowRunStatus(s); return; } const char *name=SqueezeName(s->GetWidthDelayed()-58); off_t size=GetSize(); StringSet status; status.AppendFormat(PGET_STATUS); int w=s->GetWidthDelayed(); char *bar=string_alloca(w--); memset(bar,'+',w); bar[w]=0; int i; int p=c->GetPos()*w/size; for(i=start0*w/size; iDone()?chunks[chunk]->limit:chunks[chunk]->GetPos())*w/size; for(i=chunks[chunk]->start*w/size; ilimit*w/size; for( ; iShow(status); } // list subjobs (chunk xfers) only when verbose xstring& pgetJob::FormatJobs(xstring& s,int verbose,int indent) { indent--; if(!chunks) return Job::FormatJobs(s,verbose,indent); if(verbose>1) { if(c->GetPos()SetRangeLimit(limit0); // to see right ETA. CopyJob::FormatStatus(s,verbose,"\t"); c->SetRangeLimit(FILE_END); } Job::FormatJobs(s,verbose,indent); } return s; } xstring& pgetJob::FormatStatus(xstring& s,int verbose,const char *prefix) { if(Done() || no_parallel || max_chunks<2 || !chunks) return super::FormatStatus(s,verbose,prefix); s.append(prefix); const char *name=GetDispName(); off_t size=GetSize(); s.appendf(PGET_STATUS); return s.append('\n'); } void pgetJob::free_chunks() { if(chunks) { for(int i=0; iGetBytesCount(); chunks.unset(); } } pgetJob::pgetJob(FileCopy *c1,const char *n,int m) : CopyJob(c1,n,"pget") { chunks_bytes=0; start0=limit0=0; total_xferred=0; total_xfer_rate=0; no_parallel=false; chunks_done=false; pget_cont=c->SetContinue(false); max_chunks=m?m:ResMgr::Query("pget:default-n",0); total_eta=-1; status_timer.SetResource("pget:save-status",0); const Ref& local=c->put->GetLocal(); if(local && local->full_name) { status_file.vset(local->full_name.get(),".lftp-pget-status",NULL); if(pget_cont) LoadStatus0(); } } void pgetJob::PrepareToDie() { free_chunks(); super::PrepareToDie(); } pgetJob::~pgetJob() { } pgetJob::ChunkXfer *pgetJob::NewChunk(const char *remote,off_t start,off_t limit) { const Ref& local=c->put->GetLocal(); FileCopyPeerFDStream *dst_peer=new FileCopyPeerFDStream(local,FileCopyPeer::PUT); dst_peer->NeedSeek(); // seek before writing dst_peer->SetBase(0); FileCopy *c1=FileCopy::New(c->get->Clone(),dst_peer,false); c1->SetRange(start,limit); c1->SetSize(GetSize()); c1->DontCopyDate(); c1->DontVerify(); c1->FailIfCannotSeek(); ChunkXfer *chunk=new ChunkXfer(c1,remote,start,limit); chunk->cmdline.setf("\\chunk %lld-%lld",(long long)start,(long long)(limit-1)); return chunk; } pgetJob::ChunkXfer::ChunkXfer(FileCopy *c1,const char *name, off_t s,off_t lim) : CopyJob(c1,name,"pget-chunk") { start=s; limit=lim; } void pgetJob::SaveStatus() { if(!status_file) return; FILE *f=fopen(status_file,"w"); if(!f) return; off_t size=GetSize(); fprintf(f,"size=%lld\n",(long long)size); int i=0; fprintf(f,"%d.pos=%lld\n",i,(long long)GetPos()); if(!chunks) goto out_close; fprintf(f,"%d.limit=%lld\n",i,(long long)limit0); for(int chunk=0; chunkDone()) continue; i++; fprintf(f,"%d.pos=%lld\n",i,(long long)chunks[chunk]->GetPos()); fprintf(f,"%d.limit=%lld\n",i,(long long)chunks[chunk]->limit); } out_close: fclose(f); } void pgetJob::LoadStatus0() { if(!status_file) return; FILE *f=fopen(status_file,"r"); if(!f) { int saved_errno=errno; // Probably the file is already complete // or it was previously downloaded by plain get. struct stat st; if(stat(c->put->GetLocal()->full_name,&st)==-1) return; Log::global->Format(0,"pget: %s: cannot open (%s), resuming at the file end\n", status_file.get(),strerror(saved_errno)); c->SetRange(st.st_size,FILE_END); return; } long long size; if(fscanf(f,"size=%lld\n",&size)<1) goto out_close; long long pos; int j; if(fscanf(f,"%d.pos=%lld\n",&j,&pos)<2 || j!=0) goto out_close; Log::global->Format(10,"pget: got chunk[%d] pos=%lld\n",j,pos); c->SetRange(pos,FILE_END); out_close: fclose(f); } void pgetJob::LoadStatus() { if(!status_file) return; FILE *f=fopen(status_file,"r"); if(!f) return; struct stat st; if(fstat(fileno(f),&st)<0) { out_close: fclose(f); return; } long long size; if(fscanf(f,"size=%lld\n",&size)<1) goto out_close; int i=0; int max_chunks=st.st_size/20; // highest estimate - min 20 bytes per chunk in status file. long long *pos=(long long *)alloca(2*max_chunks*sizeof(*pos)); long long *limit=pos+max_chunks; for(;;) { int j; if(fscanf(f,"%d.pos=%lld\n",&j,pos+i)<2 || j!=i) break; if(fscanf(f,"%d.limit=%lld\n",&j,limit+i)<2 || j!=i) goto out_close; if(i>0 && pos[i]>=limit[i]) continue; Log::global->Format(10,"pget: got chunk[%d] pos=%lld\n",j,pos[i]); Log::global->Format(10,"pget: got chunk[%d] limit=%lld\n",j,limit[i]); i++; } if(i<1) goto out_close; if(sizeGetSize()) // file grew? { if(limit[i-1]==size) limit[i-1]=c->GetSize(); else { pos[i]=size; limit[i]=c->GetSize(); i++; } } int num_of_chunks=i-1; start0=pos[0]; limit0=limit[0]; c->SetRange(pos[0],FILE_END); if(num_of_chunks<1) goto out_close; for(i=0; iSetParentFg(this,false); chunks.append(c); } goto out_close; } void pgetJob::InitChunks(off_t offset,off_t size) { /* initialize chunks */ off_t chunk_size=(size-offset)/max_chunks; int min_chunk_size=ResMgr::Query("pget:min-chunk-size",0); if(chunk_sizeSetParentFg(this,false); chunks.append(c); curr_offs+=chunk_size; } assert(curr_offs==size); }