Blob Blame History Raw
/*
 * lftp - file transfer program
 *
 * Copyright (c) 1996-2016 by Alexander V. Lukyanov (lav@yars.free.net)
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <config.h>
#include <errno.h>
#include "buffer.h"
#include "FileAccess.h"
#include "misc.h"
#include "trio.h"
#include "Speedometer.h"
#include "log.h"

#define BUFFER_INC	   (8*1024) // should be power of 2

const char *Buffer::Get() const
{
   if(Size()==0)
      return eof?0:"";
   return buffer+buffer_ptr;
}

void Buffer::Get(const char **buf,int *size) const
{
   *size=Size();
   *buf=Get();
}

void Buffer::GetSaved(const char **buf,int *size) const
{
   if(!save)
   {
      *size=0;
      *buf=0;
      return;
   }
   *buf=buffer;
   *size=buffer.length();
}

void Buffer::SaveRollback(off_t p)
{
   pos=p;
   if(buffer_ptr<p)
      save=false;
   if(!save)
      p=0;
   buffer.truncate(buffer_ptr=p);
}

void Buffer::Allocate(int size)
{
   if(buffer_ptr>0 && Size()==0 && !save)
   {
      buffer.truncate(0);
      buffer_ptr=0;
   }

   size_t in_buffer_real=Size();
   /* disable data movement to beginning of the buffer, if:
      1. we save the data explicitly;
      2. we add more data than there is space in the beginning of the buffer
	 (because the probability of realloc is high anyway);
      3. the gap at beginning is smaller than the amount of data in the buffer
	 (because the penalty of data movement is high). */
   if(save || buffer_ptr<size || buffer_ptr<Size())
      in_buffer_real+=buffer_ptr;

   // could be round-robin, but this is easier
   if(buffer.length()>in_buffer_real)
   {
      buffer.nset(buffer+buffer_ptr,Size());
      buffer_ptr=0;
   }

   buffer.get_space2(in_buffer_real+size,BUFFER_INC);
}

void Buffer::SaveMaxCheck(int size)
{
   if(save && buffer_ptr+size>save_max)
      save=false;
}

void Buffer::Append(const char *buf,int size)
{
   if(size==0)
      return;

   SaveMaxCheck(size);
   if(Size()==0 && buffer_ptr>0 && !save)
   {
      buffer.truncate(0);
      buffer_ptr=0;
   }

   memmove(GetSpace(size),buf,size);
   SpaceAdd(size);
}
void Buffer::Put(const char *buf,int size)
{
   Append(buf,size);
   pos+=size;
}
void Buffer::Prepend(const char *buf,int size)
{
   if(size==0)
      return;
   save=false;
   if(Size()==0)
   {
      memmove(GetSpace(size),buf,size);
      SpaceAdd(size);
      return;
   }
   if(buffer_ptr<size)
   {
      Allocate(size-buffer_ptr);
      memmove(buffer.get_non_const()+size,buffer+buffer_ptr,Size());
      SpaceAdd(size-buffer_ptr);
      buffer_ptr=size;
   }
   memmove(buffer.get_non_const()+buffer_ptr-size,buf,size);
   buffer_ptr-=size;
}

void Buffer::Format(const char *f,...)
{
   va_list v;
   va_start(v,f);
   vFormat(f, v);
   va_end(v);
}

void Buffer::vFormat(const char *f, va_list v)
{
   int size=64;
   for(;;)
   {
      va_list tmp;
      VA_COPY(tmp,v);
      int res=vsnprintf(GetSpace(size), size, f, tmp);
      va_end(tmp);
      if(res>=0 && res<size)
      {
	 SpaceAdd(res);
	 return;
      }
      if(res>size)   // some vsnprintf's return desired buffer size.
	 size=res+1;
      else
	 size*=2;
   }
}

void Buffer::Skip(int len)
{
   if(len>Size())
      len=Size();
   buffer_ptr+=len;
   pos+=len;
}
void Buffer::UnSkip(int len)
{
   if(len>buffer_ptr)
      len=buffer_ptr;
   buffer_ptr-=len;
   pos-=len;
}

void Buffer::Empty()
{
   buffer.truncate(0);
   buffer_ptr=0;
   if(save_max>0)
      save=true;
}

// move data from other buffer, prepare for SpaceAdd.
int Buffer::MoveDataHere(Buffer *o,int max_len)
{
   const char *b;
   int size;
   o->Get(&b,&size);
   if(size>max_len)
      size=max_len;
   if(size>0) {
      if(size>=64 && Size()==0 && o->Size()==size && !save && !o->save) {
	 // optimization by swapping buffers
	 buffer.swap(o->buffer);
	 buffer_ptr=replace_value(o->buffer_ptr,buffer_ptr);
	 buffer.set_length_no_z(buffer_ptr);
	 o->pos+=size;
      } else {
	 memcpy(GetSpace(size),b,size);
	 o->Skip(size);
      }
   }
   return size;
}

Buffer::Buffer()
{
   saved_errno=0;
   error_fatal=false;
   buffer_ptr=0;
   eof=false;
   broken=false;
   save=false;
   save_max=0;
   pos=0;
}
Buffer::~Buffer()
{
}

const char *Buffer::GetRateStrS()
{
   if(!rate || !rate->Valid())
      return "";
   return rate->GetStrS();
}
void Buffer::RateAdd(int n)
{
   if(!rate)
      return;
   rate->Add(n);
}

void Buffer::SetError(const char *e,bool fatal)
{
   error_text.set(e);
   error_fatal=fatal;
}
void Buffer::SetErrorCached(const char *e)
{
   SetError(e,false);
   error_text.append(_(" [cached]"));
}
const char *Buffer::Dump() const
{
   if(buffer_ptr==0)
      return buffer.dump();
   return xstring::get_tmp(Get(),Size()).dump();
}

void DataTranslator::AppendTranslated(Buffer *target,const char *put_buf,int size)
{
   off_t old_pos=target->GetPos();
   PutTranslated(target,put_buf,size);
   target->SetPos(old_pos);
}

void DirectedBuffer::SetTranslator(DataTranslator *t)
{
   if(mode==GET && !translator && Size()>0) {
      // translate unread data
      const char *data;
      int len;
      Get(&data,&len);
      t->Put(data,len);
      buffer.truncate(buffer_ptr);
      t->AppendTranslated(this,0,0);
   }
   translator=t;
}

#ifdef HAVE_ICONV
void DataRecoder::PutTranslated(Buffer *target,const char *put_buf,int size)
{
   bool from_untranslated=false;
   if(Size()>0)
   {
      Put(put_buf,size);
      Get(&put_buf,&size);
      from_untranslated=true;
   }
   if(size<=0)
      return;
   if(!backend_translate)
   {
      target->Put(put_buf,size);
      if(from_untranslated)
	 Skip(size);
      return;
   }
   size_t put_size=size;

   int size_coeff=6;
try_again:
   if(put_size==0)
      return;
   size_t store_size=size_coeff*put_size;
   char *store_space=target->GetSpace(store_size);
   char *store_buf=store_space;
   const char *base_buf=put_buf;
   // do the translation
   ICONV_CONST char **put_buf_ptr=const_cast<ICONV_CONST char**>(&put_buf);
   size_t res=iconv(backend_translate,put_buf_ptr,&put_size,&store_buf,&store_size);
   target->SpaceAdd(store_buf-store_space);
   if(from_untranslated)
      Skip(put_buf-base_buf);
   if(res==(size_t)-1)
   {
      switch(errno)
      {
      case EINVAL: // incomplete character
	 if(!from_untranslated)
	    Put(put_buf,put_size);
	 break;
      case EILSEQ: // invalid character
	 target->Put("?");
	 put_buf++;
	 put_size--;
	 goto try_again;
      case E2BIG:  // no room to store result, allocate more.
	 size_coeff*=2;
	 goto try_again;
      default:
	 break;
      }
   }
   return;
}
void DataRecoder::ResetTranslation()
{
   Empty();
   if(!backend_translate)
      return;
   iconv(backend_translate,0,0,0,0);
}
DataRecoder::~DataRecoder()
{
   if(backend_translate)
      iconv_close(backend_translate);
}
DataRecoder::DataRecoder(const char *from_code,const char *to_code,bool translit)
{
   backend_translate=0;

   if(translit) {
      const char *to_code_translit=xstring::cat(to_code,"//TRANSLIT",NULL);
      backend_translate=iconv_open(to_code_translit,from_code);
      if(backend_translate!=(iconv_t)-1) {
	 Log::global->Format(9,"initialized translation from %s to %s\n",from_code,to_code_translit);
	 return;
      }
      backend_translate=0;
   }

   backend_translate=iconv_open(to_code,from_code);
   if(backend_translate!=(iconv_t)-1) {
      Log::global->Format(9,"initialized translation from %s to %s\n",from_code,to_code);
      return;
   }

   Log::global->Format(0,"iconv_open(%s,%s) failed: %s\n",
			      to_code,from_code,strerror(errno));
   backend_translate=0;
}

void DirectedBuffer::SetTranslation(const char *enc,bool translit)
{
   if(!enc || !*enc)
      return;
   const char *local_code=ResMgr::Query("file:charset",0);
   if(!local_code || !*local_code)
      return;
   const char *from_code=(mode==GET?enc:local_code);
   const char *to_code  =(mode==GET?local_code:enc);
   if(!strcasecmp(from_code,to_code))
      return;
   SetTranslator(new DataRecoder(from_code,to_code,translit));
}
#endif //HAVE_ICONV

void DirectedBuffer::ResetTranslation()
{
   if(translator)
      translator->ResetTranslation();
}
void DirectedBuffer::Put(const char *buf,int size)
{
   if(mode==PUT && translator)
      translator->PutTranslated(this,buf,size);
   else
      Buffer::Put(buf,size);
}
void DirectedBuffer::PutTranslated(const char *buf,int size)
{
   if(translator)
      translator->PutTranslated(this,buf,size);
   else
      Buffer::Put(buf,size);
}
int DirectedBuffer::MoveDataHere(Buffer *buf,int size)
{
   if(size>buf->Size())
      size=buf->Size();
   if(mode==PUT && translator)
      translator->PutTranslated(this,buf->Get(),size);
   else
      return Buffer::MoveDataHere(buf,size);
   return size;
}
void DirectedBuffer::PutEOF()
{
   if(mode==PUT && translator)
      translator->PutTranslated(this,0,0);
   Buffer::PutEOF();
}

void DirectedBuffer::EmbraceNewData(int len)
{
   if(len<=0)
      return;
   RateAdd(len);
   if(translator)
   {
      // copy the data to free room for translated data
      translator->Put(buffer+buffer.length(),len);
      translator->AppendTranslated(this,0,0);
   }
   else
      SpaceAdd(len);
   SaveMaxCheck(0);
}


IOBuffer::IOBuffer(dir_t m)
   : DirectedBuffer(m), event_time(now),
     max_buf(0), get_size(GET_BUFSIZE)
{
}
IOBuffer::~IOBuffer()
{
}

void IOBuffer::Put(const char *buf,int size)
{
   if(size>=PUT_LL_MIN && Size()==0 && mode==PUT && !save && !translator)
   {
      int res=Put_LL(buf,size);
      if(res>=0)
      {
	 buf+=res;
	 size-=res;
	 pos+=res;
      }
   }
   if(size<=0)
      return;
   if(Size()==0)
      current->Timeout(0);
   DirectedBuffer::Put(buf,size);
}
void IOBuffer::Put(const char *buf)
{
   Put(buf,strlen(buf));
}

int IOBuffer::TuneGetSize(int res)
{
   if(res>0)
   {
      // buffer size tuning depending on data rate
      const int max_get_size=(max_buf?max_buf:0x100000);
      if(res>get_size/2 && Size()+get_size*2<=max_get_size)
	 get_size*=2;
   }
   return res;
}

int IOBuffer::Do()
{
   if(Done() || Error())
      return STALL;
   int res=0;
   switch(mode)
   {
   case PUT:
      if(Size()==0)
	 return STALL;
      res=Put_LL(buffer+buffer_ptr,Size());
      if(res>0)
      {
	 RateAdd(res);
	 buffer_ptr+=res;
	 event_time=now;
	 if(eof)
	    PutEOF_LL();
	 return MOVED;
      }
      break;

   case GET:
      if(eof)
	 return STALL;
      res=TuneGetSize(Get_LL(get_size));
      if(res>0)
      {
	 EmbraceNewData(res);
	 event_time=now;
	 return MOVED;
      }
      if(eof)
      {
	 event_time=now;
	 return MOVED;
      }
      break;
   }
   if(res<0)
   {
      event_time=now;
      return MOVED;
   }
   return STALL;
}

// IOBufferStacked implementation
#undef super
#define super IOBuffer
int IOBufferStacked::Do()
{
   int m=STALL;
   if(Done() || Error())
      return m;
   int res=0;
   switch(mode)
   {
   case PUT:
      if(down->Broken() && !broken)
      {
	 broken=true;
	 return MOVED;
      }
      if(down->Error())
      {
	 SetError(down->ErrorText(),down->ErrorFatal());
	 m=MOVED;
      }
      if(Size()==0)
	 return m;
      res=Put_LL(buffer+buffer_ptr,Size());
      if(res>0)
      {
	 buffer_ptr+=res;
	 m=MOVED;
      }
      break;

   case GET:
      if(eof)
	 return m;
      res=Get_LL(/*unused*/0);
      if(res>0)
      {
	 EmbraceNewData(res);
	 m=MOVED;
      }
      if(eof)
	 m=MOVED;
      if(down->Error())
      {
	 SetError(down->ErrorText(),down->ErrorFatal());
	 m=MOVED;
      }
      break;
   }
   if(res<0)
      return MOVED;
   return m;
}
int IOBufferStacked::Put_LL(const char *buf,int size)
{
   if(down->Broken())
   {
      broken=true;
      return -1;
   }
   down->Put(buf,size);
   return size;
}

int IOBufferStacked::Get_LL(int)
{
   if(max_buf && Size()>=max_buf) {
      down->SuspendSlave();
      return 0;
   }
   down->ResumeSlave();
   int size=MoveDataHere(down,down->Size());
   if(down->Size()==0 && down->Eof())
      PutEOF();
   return size;
}

bool IOBufferStacked::Done()
{
   if(super::Done())
      return down->Done();
   return false;
}

void IOBufferStacked::SuspendInternal()
{
   super::SuspendInternal();
   down->SuspendSlave();
}
void IOBufferStacked::ResumeInternal()
{
   if(!max_buf || Size()<max_buf)
      down->ResumeSlave();
   super::ResumeInternal();
}

// IOBufferFDStream implementation
#include <fcntl.h>
#include <unistd.h>
#undef super
#define super IOBuffer
int IOBufferFDStream::Put_LL(const char *buf,int size)
{
   if(put_ll_timer && !eof && Size()<PUT_LL_MIN
   && !put_ll_timer->Stopped())
      return 0;
   if(stream->broken())
   {
      broken=true;
      return -1;
   }

   int res=0;

   int fd=stream->getfd();
   if(fd==-1)
   {
      if(stream->error())
	 goto stream_err;
      TimeoutS(1);
      event_time=now;
      return 0;
   }

   res=write(fd,buf,size);
   if(res==-1)
   {
      saved_errno=errno;
      if(E_RETRY(saved_errno))
      {
	 Block(fd,POLLOUT);
	 return 0;
      }
      if(NonFatalError(saved_errno))
	 return 0;
      if(errno==EPIPE)
      {
	 broken=true;
	 return -1;
      }
      stream->MakeErrorText(saved_errno);
      goto stream_err;
   }
   if(put_ll_timer)
      put_ll_timer->Reset();
   return res;

stream_err:
   SetError(stream->error_text,!TemporaryNetworkError(saved_errno));
   return -1;
}

int IOBufferFDStream::Get_LL(int size)
{
   if(max_buf && Size()>=max_buf)
      return 0;

   int res=0;

   int fd=stream->getfd();
   if(fd==-1)
   {
      if(stream->error())
	 goto stream_err;
      TimeoutS(1);
      return 0;
   }

   if(!Ready(fd,POLLIN))
   {
      Block(fd,POLLIN);
      return 0;
   }

   res=read(fd,GetSpace(size),size);
   if(res==-1)
   {
      saved_errno=errno;
      if(E_RETRY(saved_errno))
      {
	 SetNotReady(fd,POLLIN);
	 Block(fd,POLLIN);
	 return 0;
      }
      if(NonFatalError(saved_errno))
	 return 0;
      stream->MakeErrorText(saved_errno);
      goto stream_err;
   }

   if(res==0) {
      Log::global->Format(10,"buffer: EOF on FD %d\n",fd);
      eof=true;
   }
   return res;

stream_err:
   SetError(stream->error_text,!TemporaryNetworkError(saved_errno));
   return -1;
}

FgData *IOBufferFDStream::GetFgData(bool fg)
{
   if(stream->getfd()!=-1)
      return new FgData(stream->GetProcGroup(),fg);
   return 0;
}

bool IOBufferFDStream::Done()
{
   if(put_ll_timer)
      put_ll_timer->Stop();
   if(super::Done())
      return stream->Done(); // stream->Done indicates if sub-process finished
   return false;
}

IOBufferFDStream::~IOBufferFDStream() {}


// IOBufferFileAccess implementation
#undef super
#define super IOBuffer
int IOBufferFileAccess::Get_LL(int size)
{
   if(max_buf && Size()>=max_buf) {
      session->SuspendSlave();
      return 0;
   }
   session->ResumeSlave();

   int res=0;

   res=session->Read(this,size);
   if(res<0)
   {
      if(res==FA::DO_AGAIN)
	 return 0;
      SetError(session->StrError(res));
      return -1;
   }
   if(res==0)
      eof=true;
   return res;
}

void IOBufferFileAccess::SuspendInternal()
{
   super::SuspendInternal();
   session->SuspendSlave();
}
void IOBufferFileAccess::ResumeInternal()
{
   if(!max_buf || Size()<max_buf)
      session->ResumeSlave();
   super::ResumeInternal();
}
const char *IOBufferFileAccess::Status()
{
   return session->CurrentStatus();
}

unsigned long long Buffer::UnpackUINT64BE(int offset) const
{
   if(Size()-offset<8)
      return 0;
   unsigned long long res=UnpackUINT32BE(offset);
   res=(res<<32)|UnpackUINT32BE(offset+4);
   return res;
}
long long Buffer::UnpackINT64BE(int offset) const
{
   unsigned long long n=UnpackUINT64BE(offset);
   if(n&0x8000000000000000ULL)
      return -(long long)(n^0xFFFFFFFFFFFFFFFFULL)-1;
   return (long long)n;
}
unsigned Buffer::UnpackUINT32BE(int offset) const
{
   if(Size()-offset<4)
      return 0;
   unsigned char *b=(unsigned char*)buffer.get()+buffer_ptr+offset;
   return (b[0]<<24)|(b[1]<<16)|(b[2]<<8)|b[3];
}
int Buffer::UnpackINT32BE(int offset) const
{
   unsigned n=UnpackUINT32BE(offset);
   if(n&0x80000000U)
      return -(int)(n^0xFFFFFFFFU)-1;
   return (int)n;
}
unsigned Buffer::UnpackUINT16BE(int offset) const
{
   if(Size()-offset<2)
      return 0;
   unsigned char *b=(unsigned char*)buffer.get()+buffer_ptr+offset;
   return (b[0]<<8)|b[1];
}
unsigned Buffer::UnpackUINT8(int offset) const
{
   if(Size()-offset<1)
      return 0;
   unsigned char *b=(unsigned char*)buffer.get()+buffer_ptr+offset;
   return b[0];
}
void Buffer::PackUINT64BE(unsigned long long data)
{
#ifndef NDEBUG
   Log::global->Format(11,"PackUINT64BE(0x%016llX)\n",data);
#endif
   Allocate(8);
   PackUINT32BE((unsigned)(data>>32));
   PackUINT32BE((unsigned)(data&0xFFFFFFFFU));
}
void Buffer::PackINT64BE(long long data)
{
   unsigned long long n;
   if(data<0)
      n=((unsigned long long)(-data)^0xFFFFFFFFFFFFFFFFULL)+1;
   else
      n=(unsigned long long)data;
   PackUINT64BE(n);
}
void Buffer::PackUINT32BE(unsigned data)
{
#ifndef NDEBUG
   Log::global->Format(11,"PackUINT32BE(0x%08X)\n",data);
#endif
   char *b=GetSpace(4);
   b[0]=(data>>24)&255;
   b[1]=(data>>16)&255;
   b[2]=(data>>8)&255;
   b[3]=(data)&255;
   SpaceAdd(4);
}
void Buffer::PackINT32BE(int data)
{
   unsigned n;
   if(data<0)
      n=((unsigned)(-data)^0xFFFFFFFFU)+1;
   else
      n=(unsigned)data;
   PackUINT32BE(n);
}
void Buffer::PackUINT16BE(unsigned data)
{
   char *b=GetSpace(2);
   b[0]=(data>>8)&255;
   b[1]=(data)&255;
   SpaceAdd(2);
}
void Buffer::PackUINT8(unsigned data)
{
#ifndef NDEBUG
   Log::global->Format(11,"PackUINT8(0x%02X)\n",data);
#endif
   char *b=GetSpace(1);
   b[0]=(data)&255;
   SpaceAdd(1);
}