Blob Blame History Raw
/*
 * lftp - file transfer program
 *
 * Copyright (c) 1996-2016 by Alexander V. Lukyanov (lav@yars.free.net)
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <config.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <stddef.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include "pgetJob.h"
#include "url.h"
#include "misc.h"
#include "log.h"

ResType pget_vars[] = {
   {"pget:save-status",	"10s",   ResMgr::TimeIntervalValidate,ResMgr::NoClosure},
   {"pget:default-n",   "5",	 ResMgr::UNumberValidate,ResMgr::NoClosure},
   {"pget:min-chunk-size", "1M", ResMgr::UNumberValidate,ResMgr::NoClosure},
   {0}
};
ResDecls pget_vars_register(pget_vars);

#undef MIN
#define MIN(a,b) ((a)<(b)?(a):(b))

#define super CopyJob

int pgetJob::Do()
{
   int m=STALL;

   if(Done())
      return m;

   if(status_timer.Stopped())
   {
      SaveStatus();
      status_timer.Reset();
   }

   if(c->Done())
   {
      if(status_file)
      {
	 remove(status_file);
	 status_file.set(0);
      }
   }

   if(no_parallel || max_chunks<2)
   {
      c->Resume();
      return super::Do();
   }

   if(chunks_done && chunks && c->GetPos()>=limit0)
   {
      c->SetRangeLimit(limit0);    // make it stop.
      c->Resume();
      c->Do();
      free_chunks();
      m=MOVED;
   }

   if(chunks==0 || c->GetPos()<limit0)
   {
      c->Resume();
      m|=super::Do();
   }
   else if(chunks.count()>0)
   {
      if(chunks[0]->Error())
      {
	 Log::global->Format(0,"pget: chunk[%d] error: %s\n",0,chunks[0]->ErrorText());
	 no_parallel=true;
	 c->Resume();
      }
      else if(!chunks[0]->Done() && chunks[0]->GetBytesCount()<limit0/16)
      {
	 c->Resume();
	 if(chunks.count()==1)
	 {
	    free_chunks();
	    no_parallel=true;
	 }
	 else
	 {
	    limit0=chunks[0]->c->GetRangeLimit();
	    chunks.remove(0);
	 }
	 m=MOVED;
      }
      else
	 c->Suspend();
   }

   if(Done())
      return m;

   off_t offset=c->GetPos();
   off_t size=c->GetSize();

   if(chunks==0 && !chunks_done)
   {
      if(size==NO_SIZE_YET)
	 return m;

      if(size==NO_SIZE || (c->put && c->put->GetLocal()==0))
      {
	 Log::global->Write(0,_("pget: falling back to plain get"));
	 Log::global->Write(0," (");
	 if(c->put && c->put->GetLocal()==0)
	 {
	    Log::global->Write(0,_("the target file is remote"));
	    if(size==NO_SIZE)
	       Log::global->Write(0,", ");
	 }
	 if(size==NO_SIZE)
	    Log::global->Write(0,_("the source file size is unknown"));
	 Log::global->Write(0,")\n");

	 no_parallel=true;
	 return m;
      }

      // Make sure the destination file is open before starting chunks,
      // it disables temp-name creation in the chunk's Init.
      if(c->put->GetLocal()->getfd()==-1)
	 return m;

      c->put->NeedSeek(); // seek before writing

      if(pget_cont)
	 LoadStatus();
      else if(status_file)
	 remove(status_file);
      if(!chunks)
	 InitChunks(offset,size);

      m=MOVED;

      if(!chunks)
      {
	 no_parallel=true;
	 return m;
      }
      if(!pget_cont)
      {
	 SaveStatus();
	 status_timer.Reset();
	 if(ResMgr::QueryBool("file:use-fallocate",0)) {
	    // allocate space after creating *.lftp-pget-status file,
	    // so that the incomplete status is more obvious.
	    const Ref<FDStream>& local=c->put->GetLocal();
	    if(lftp_fallocate(local->getfd(),size)==-1 && errno!=ENOSYS && errno!=EOPNOTSUPP) {
	       eprintf(_("pget: warning: space allocation for %s (%lld bytes) failed: %s\n"),
		  local->name.get(),(long long)size,strerror(errno));
	    }
	 }
      }
   }

   /* cycle through the chunks */
   chunks_done=true;
   total_xferred=MIN(offset,limit0);
   off_t got_already=c->GetSize()-limit0;
   total_xfer_rate=c->GetRate();

   off_t rem=limit0-c->GetPos();
   if(rem<=0)
      total_eta=0;
   else
      total_eta=c->GetETA(rem);

   for(int i=0; i<chunks.count(); i++)
   {
      if(chunks[i]->Error())
      {
	 Log::global->Format(0,"pget: chunk[%d] error: %s\n",i,chunks[i]->ErrorText());
	 no_parallel=true;
	 break;
      }
      if(!chunks[i]->Done())
      {
	 if(chunks[i]->GetPos()>=chunks[i]->start)
	    total_xferred+=MIN(chunks[i]->GetPos(),chunks[i]->limit)
			   -chunks[i]->start;
	 if(total_eta>=0)
	 {
	    long eta=chunks[i]->GetETA();
	    if(eta<0)
	       total_eta=-1;
	    else if(eta>total_eta)
	       total_eta=eta;	// total eta is the maximum.
	 }
	 total_xfer_rate+=chunks[i]->GetRate();
	 chunks_done=false;
      }
      else  // done
      {
	 total_xferred+=chunks[i]->limit-chunks[i]->start;
      }
      got_already-=chunks[i]->limit-chunks[i]->start;
   }
   total_xferred+=got_already;

   if(no_parallel)
   {
      free_chunks();
      return MOVED;
   }

   return m;
}

// xgettext:c-format
static const char pget_status_format[]=N_("`%s', got %lld of %lld (%d%%) %s%s");
#define PGET_STATUS _(pget_status_format),name, \
   (long long)total_xferred,(long long)size, \
   percent(total_xferred,size),Speedometer::GetStrS(total_xfer_rate), \
   c->GetETAStrSFromTime(total_eta)

void pgetJob::ShowRunStatus(const SMTaskRef<StatusLine>& s)
{
   if(Done() || no_parallel || max_chunks<2 || !chunks)
   {
      super::ShowRunStatus(s);
      return;
   }

   const char *name=SqueezeName(s->GetWidthDelayed()-58);
   off_t size=GetSize();
   StringSet status;
   status.AppendFormat(PGET_STATUS);

   int w=s->GetWidthDelayed();
   char *bar=string_alloca(w--);
   memset(bar,'+',w);
   bar[w]=0;

   int i;
   int p=c->GetPos()*w/size;
   for(i=start0*w/size; i<p; i++)
      bar[i]='o';
   p=limit0*w/size;
   for( ; i<p; i++)
      bar[i]='.';

   for(int chunk=0; chunk<chunks.count(); chunk++)
   {
      p=(chunks[chunk]->Done()?chunks[chunk]->limit:chunks[chunk]->GetPos())*w/size;
      for(i=chunks[chunk]->start*w/size; i<p; i++)
	 bar[i]='o';
      p=chunks[chunk]->limit*w/size;
      for( ; i<p; i++)
	 bar[i]='.';
   }

   status.Append(bar);

   s->Show(status);
}

// list subjobs (chunk xfers) only when verbose
xstring& pgetJob::FormatJobs(xstring& s,int verbose,int indent)
{
   indent--;
   if(!chunks)
      return Job::FormatJobs(s,verbose,indent);
   if(verbose>1)
   {
      if(c->GetPos()<limit0)
      {
	 s.appendf("%*s\\chunk %lld-%lld\n",indent,"",(long long)start0,(long long)limit0);
	 c->SetRangeLimit(limit0); // to see right ETA.
	 CopyJob::FormatStatus(s,verbose,"\t");
	 c->SetRangeLimit(FILE_END);
      }
      Job::FormatJobs(s,verbose,indent);
   }
   return s;
}

xstring& pgetJob::FormatStatus(xstring& s,int verbose,const char *prefix)
{
   if(Done() || no_parallel || max_chunks<2 || !chunks)
      return super::FormatStatus(s,verbose,prefix);

   s.append(prefix);
   const char *name=GetDispName();
   off_t size=GetSize();
   s.appendf(PGET_STATUS);
   return s.append('\n');
}

void pgetJob::free_chunks()
{
   if(chunks)
   {
      for(int i=0; i<chunks.count(); i++)
	 chunks_bytes+=chunks[i]->GetBytesCount();
      chunks.unset();
   }
}

pgetJob::pgetJob(FileCopy *c1,const char *n,int m)
   : CopyJob(c1,n,"pget")
{
   chunks_bytes=0;
   start0=limit0=0;
   total_xferred=0;
   total_xfer_rate=0;
   no_parallel=false;
   chunks_done=false;
   pget_cont=c->SetContinue(false);
   max_chunks=m?m:ResMgr::Query("pget:default-n",0);
   total_eta=-1;
   status_timer.SetResource("pget:save-status",0);
   const Ref<FDStream>& local=c->put->GetLocal();
   if(local && local->full_name)
   {
      status_file.vset(local->full_name.get(),".lftp-pget-status",NULL);
      if(pget_cont)
	 LoadStatus0();
   }
}
void pgetJob::PrepareToDie()
{
   free_chunks();
   super::PrepareToDie();
}
pgetJob::~pgetJob()
{
}

pgetJob::ChunkXfer *pgetJob::NewChunk(const char *remote,off_t start,off_t limit)
{
   const Ref<FDStream>& local=c->put->GetLocal();
   FileCopyPeerFDStream
		*dst_peer=new FileCopyPeerFDStream(local,FileCopyPeer::PUT);
   dst_peer->NeedSeek(); // seek before writing
   dst_peer->SetBase(0);

   FileCopy *c1=FileCopy::New(c->get->Clone(),dst_peer,false);
   c1->SetRange(start,limit);
   c1->SetSize(GetSize());
   c1->DontCopyDate();
   c1->DontVerify();
   c1->FailIfCannotSeek();

   ChunkXfer *chunk=new ChunkXfer(c1,remote,start,limit);
   chunk->cmdline.setf("\\chunk %lld-%lld",(long long)start,(long long)(limit-1));
   return chunk;
}

pgetJob::ChunkXfer::ChunkXfer(FileCopy *c1,const char *name,
			      off_t s,off_t lim)
   : CopyJob(c1,name,"pget-chunk")
{
   start=s;
   limit=lim;
}

void pgetJob::SaveStatus()
{
   if(!status_file)
      return;

   FILE *f=fopen(status_file,"w");
   if(!f)
      return;

   off_t size=GetSize();
   fprintf(f,"size=%lld\n",(long long)size);

   int i=0;
   fprintf(f,"%d.pos=%lld\n",i,(long long)GetPos());
   if(!chunks)
      goto out_close;
   fprintf(f,"%d.limit=%lld\n",i,(long long)limit0);
   for(int chunk=0; chunk<chunks.count(); chunk++)
   {
      if(chunks[chunk]->Done())
	 continue;
      i++;
      fprintf(f,"%d.pos=%lld\n",i,(long long)chunks[chunk]->GetPos());
      fprintf(f,"%d.limit=%lld\n",i,(long long)chunks[chunk]->limit);
   }
out_close:
   fclose(f);
}
void pgetJob::LoadStatus0()
{
   if(!status_file)
      return;

   FILE *f=fopen(status_file,"r");
   if(!f) {
      int saved_errno=errno;
      // Probably the file is already complete
      // or it was previously downloaded by plain get.
      struct stat st;
      if(stat(c->put->GetLocal()->full_name,&st)==-1)
	 return;
      Log::global->Format(0,"pget: %s: cannot open (%s), resuming at the file end\n",
	 status_file.get(),strerror(saved_errno));
      c->SetRange(st.st_size,FILE_END);
      return;
   }

   long long size;
   if(fscanf(f,"size=%lld\n",&size)<1)
      goto out_close;

   long long pos;
   int j;
   if(fscanf(f,"%d.pos=%lld\n",&j,&pos)<2 || j!=0)
      goto out_close;
   Log::global->Format(10,"pget: got chunk[%d] pos=%lld\n",j,pos);
   c->SetRange(pos,FILE_END);

out_close:
   fclose(f);
}
void pgetJob::LoadStatus()
{
   if(!status_file)
      return;

   FILE *f=fopen(status_file,"r");
   if(!f)
      return;

   struct stat st;
   if(fstat(fileno(f),&st)<0)
   {
   out_close:
      fclose(f);
      return;
   }

   long long size;
   if(fscanf(f,"size=%lld\n",&size)<1)
      goto out_close;

   int i=0;
   int max_chunks=st.st_size/20; // highest estimate - min 20 bytes per chunk in status file.
   long long *pos=(long long *)alloca(2*max_chunks*sizeof(*pos));
   long long *limit=pos+max_chunks;
   for(;;)
   {
      int j;
      if(fscanf(f,"%d.pos=%lld\n",&j,pos+i)<2 || j!=i)
	 break;
      if(fscanf(f,"%d.limit=%lld\n",&j,limit+i)<2 || j!=i)
	 goto out_close;
      if(i>0 && pos[i]>=limit[i])
	 continue;
      Log::global->Format(10,"pget: got chunk[%d] pos=%lld\n",j,pos[i]);
      Log::global->Format(10,"pget: got chunk[%d] limit=%lld\n",j,limit[i]);
      i++;
   }
   if(i<1)
      goto out_close;
   if(size<c->GetSize())  // file grew?
   {
      if(limit[i-1]==size)
	 limit[i-1]=c->GetSize();
      else
      {
	 pos[i]=size;
	 limit[i]=c->GetSize();
	 i++;
      }
   }
   int num_of_chunks=i-1;
   start0=pos[0];
   limit0=limit[0];
   c->SetRange(pos[0],FILE_END);
   if(num_of_chunks<1)
      goto out_close;
   for(i=0; i<num_of_chunks; i++)
   {
      ChunkXfer *c=NewChunk(GetName(),pos[i+1],limit[i+1]);
      c->SetParentFg(this,false);
      chunks.append(c);
   }
   goto out_close;
}

void pgetJob::InitChunks(off_t offset,off_t size)
{
   /* initialize chunks */
   off_t chunk_size=(size-offset)/max_chunks;
   int min_chunk_size=ResMgr::Query("pget:min-chunk-size",0);
   if(chunk_size<min_chunk_size)
      chunk_size=min_chunk_size;
   int num_of_chunks=(size-offset)/chunk_size-1;
   if(num_of_chunks<1)
      return;
   start0=0;
   limit0=size-chunk_size*num_of_chunks;
   off_t curr_offs=limit0;
   for(int i=0; i<num_of_chunks; i++)
   {
      ChunkXfer *c=NewChunk(GetName(),curr_offs,curr_offs+chunk_size);
      c->SetParentFg(this,false);
      chunks.append(c);
      curr_offs+=chunk_size;
   }
   assert(curr_offs==size);
}