Blame src/buffer.cc

Packit Service a2489d
/*
Packit Service a2489d
 * lftp - file transfer program
Packit Service a2489d
 *
Packit Service a2489d
 * Copyright (c) 1996-2016 by Alexander V. Lukyanov (lav@yars.free.net)
Packit Service a2489d
 *
Packit Service a2489d
 * This program is free software; you can redistribute it and/or modify
Packit Service a2489d
 * it under the terms of the GNU General Public License as published by
Packit Service a2489d
 * the Free Software Foundation; either version 3 of the License, or
Packit Service a2489d
 * (at your option) any later version.
Packit Service a2489d
 *
Packit Service a2489d
 * This program is distributed in the hope that it will be useful,
Packit Service a2489d
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service a2489d
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit Service a2489d
 * GNU General Public License for more details.
Packit Service a2489d
 *
Packit Service a2489d
 * You should have received a copy of the GNU General Public License
Packit Service a2489d
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
Packit Service a2489d
 */
Packit Service a2489d
Packit Service a2489d
#include <config.h>
Packit Service a2489d
#include <errno.h>
Packit Service a2489d
#include "buffer.h"
Packit Service a2489d
#include "FileAccess.h"
Packit Service a2489d
#include "misc.h"
Packit Service a2489d
#include "trio.h"
Packit Service a2489d
#include "Speedometer.h"
Packit Service a2489d
#include "log.h"
Packit Service a2489d
Packit Service a2489d
#define BUFFER_INC	   (8*1024) // should be power of 2
Packit Service a2489d
Packit Service a2489d
const char *Buffer::Get() const
Packit Service a2489d
{
Packit Service a2489d
   if(Size()==0)
Packit Service a2489d
      return eof?0:"";
Packit Service a2489d
   return buffer+buffer_ptr;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void Buffer::Get(const char **buf,int *size) const
Packit Service a2489d
{
Packit Service a2489d
   *size=Size();
Packit Service a2489d
   *buf=Get();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void Buffer::GetSaved(const char **buf,int *size) const
Packit Service a2489d
{
Packit Service a2489d
   if(!save)
Packit Service a2489d
   {
Packit Service a2489d
      *size=0;
Packit Service a2489d
      *buf=0;
Packit Service a2489d
      return;
Packit Service a2489d
   }
Packit Service a2489d
   *buf=buffer;
Packit Service a2489d
   *size=buffer.length();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void Buffer::SaveRollback(off_t p)
Packit Service a2489d
{
Packit Service a2489d
   pos=p;
Packit Service a2489d
   if(buffer_ptr
Packit Service a2489d
      save=false;
Packit Service a2489d
   if(!save)
Packit Service a2489d
      p=0;
Packit Service a2489d
   buffer.truncate(buffer_ptr=p);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void Buffer::Allocate(int size)
Packit Service a2489d
{
Packit Service a2489d
   if(buffer_ptr>0 && Size()==0 && !save)
Packit Service a2489d
   {
Packit Service a2489d
      buffer.truncate(0);
Packit Service a2489d
      buffer_ptr=0;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   size_t in_buffer_real=Size();
Packit Service a2489d
   /* disable data movement to beginning of the buffer, if:
Packit Service a2489d
      1. we save the data explicitly;
Packit Service a2489d
      2. we add more data than there is space in the beginning of the buffer
Packit Service a2489d
	 (because the probability of realloc is high anyway);
Packit Service a2489d
      3. the gap at beginning is smaller than the amount of data in the buffer
Packit Service a2489d
	 (because the penalty of data movement is high). */
Packit Service a2489d
   if(save || buffer_ptr
Packit Service a2489d
      in_buffer_real+=buffer_ptr;
Packit Service a2489d
Packit Service a2489d
   // could be round-robin, but this is easier
Packit Service a2489d
   if(buffer.length()>in_buffer_real)
Packit Service a2489d
   {
Packit Service a2489d
      buffer.nset(buffer+buffer_ptr,Size());
Packit Service a2489d
      buffer_ptr=0;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   buffer.get_space2(in_buffer_real+size,BUFFER_INC);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void Buffer::SaveMaxCheck(int size)
Packit Service a2489d
{
Packit Service a2489d
   if(save && buffer_ptr+size>save_max)
Packit Service a2489d
      save=false;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void Buffer::Append(const char *buf,int size)
Packit Service a2489d
{
Packit Service a2489d
   if(size==0)
Packit Service a2489d
      return;
Packit Service a2489d
Packit Service a2489d
   SaveMaxCheck(size);
Packit Service a2489d
   if(Size()==0 && buffer_ptr>0 && !save)
Packit Service a2489d
   {
Packit Service a2489d
      buffer.truncate(0);
Packit Service a2489d
      buffer_ptr=0;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   memmove(GetSpace(size),buf,size);
Packit Service a2489d
   SpaceAdd(size);
Packit Service a2489d
}
Packit Service a2489d
void Buffer::Put(const char *buf,int size)
Packit Service a2489d
{
Packit Service a2489d
   Append(buf,size);
Packit Service a2489d
   pos+=size;
Packit Service a2489d
}
Packit Service a2489d
void Buffer::Prepend(const char *buf,int size)
Packit Service a2489d
{
Packit Service a2489d
   if(size==0)
Packit Service a2489d
      return;
Packit Service a2489d
   save=false;
Packit Service a2489d
   if(Size()==0)
Packit Service a2489d
   {
Packit Service a2489d
      memmove(GetSpace(size),buf,size);
Packit Service a2489d
      SpaceAdd(size);
Packit Service a2489d
      return;
Packit Service a2489d
   }
Packit Service a2489d
   if(buffer_ptr
Packit Service a2489d
   {
Packit Service a2489d
      Allocate(size-buffer_ptr);
Packit Service a2489d
      memmove(buffer.get_non_const()+size,buffer+buffer_ptr,Size());
Packit Service a2489d
      SpaceAdd(size-buffer_ptr);
Packit Service a2489d
      buffer_ptr=size;
Packit Service a2489d
   }
Packit Service a2489d
   memmove(buffer.get_non_const()+buffer_ptr-size,buf,size);
Packit Service a2489d
   buffer_ptr-=size;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void Buffer::Format(const char *f,...)
Packit Service a2489d
{
Packit Service a2489d
   va_list v;
Packit Service a2489d
   va_start(v,f);
Packit Service a2489d
   vFormat(f, v);
Packit Service a2489d
   va_end(v);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void Buffer::vFormat(const char *f, va_list v)
Packit Service a2489d
{
Packit Service a2489d
   int size=64;
Packit Service a2489d
   for(;;)
Packit Service a2489d
   {
Packit Service a2489d
      va_list tmp;
Packit Service a2489d
      VA_COPY(tmp,v);
Packit Service a2489d
      int res=vsnprintf(GetSpace(size), size, f, tmp);
Packit Service a2489d
      va_end(tmp);
Packit Service a2489d
      if(res>=0 && res
Packit Service a2489d
      {
Packit Service a2489d
	 SpaceAdd(res);
Packit Service a2489d
	 return;
Packit Service a2489d
      }
Packit Service a2489d
      if(res>size)   // some vsnprintf's return desired buffer size.
Packit Service a2489d
	 size=res+1;
Packit Service a2489d
      else
Packit Service a2489d
	 size*=2;
Packit Service a2489d
   }
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void Buffer::Skip(int len)
Packit Service a2489d
{
Packit Service a2489d
   if(len>Size())
Packit Service a2489d
      len=Size();
Packit Service a2489d
   buffer_ptr+=len;
Packit Service a2489d
   pos+=len;
Packit Service a2489d
}
Packit Service a2489d
void Buffer::UnSkip(int len)
Packit Service a2489d
{
Packit Service a2489d
   if(len>buffer_ptr)
Packit Service a2489d
      len=buffer_ptr;
Packit Service a2489d
   buffer_ptr-=len;
Packit Service a2489d
   pos-=len;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void Buffer::Empty()
Packit Service a2489d
{
Packit Service a2489d
   buffer.truncate(0);
Packit Service a2489d
   buffer_ptr=0;
Packit Service a2489d
   if(save_max>0)
Packit Service a2489d
      save=true;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
// move data from other buffer, prepare for SpaceAdd.
Packit Service a2489d
int Buffer::MoveDataHere(Buffer *o,int max_len)
Packit Service a2489d
{
Packit Service a2489d
   const char *b;
Packit Service a2489d
   int size;
Packit Service a2489d
   o->Get(&b,&size);
Packit Service a2489d
   if(size>max_len)
Packit Service a2489d
      size=max_len;
Packit Service a2489d
   if(size>0) {
Packit Service a2489d
      if(size>=64 && Size()==0 && o->Size()==size && !save && !o->save) {
Packit Service a2489d
	 // optimization by swapping buffers
Packit Service a2489d
	 buffer.swap(o->buffer);
Packit Service a2489d
	 buffer_ptr=replace_value(o->buffer_ptr,buffer_ptr);
Packit Service a2489d
	 buffer.set_length_no_z(buffer_ptr);
Packit Service a2489d
	 o->pos+=size;
Packit Service a2489d
      } else {
Packit Service a2489d
	 memcpy(GetSpace(size),b,size);
Packit Service a2489d
	 o->Skip(size);
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   return size;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
Buffer::Buffer()
Packit Service a2489d
{
Packit Service a2489d
   saved_errno=0;
Packit Service a2489d
   error_fatal=false;
Packit Service a2489d
   buffer_ptr=0;
Packit Service a2489d
   eof=false;
Packit Service a2489d
   broken=false;
Packit Service a2489d
   save=false;
Packit Service a2489d
   save_max=0;
Packit Service a2489d
   pos=0;
Packit Service a2489d
}
Packit Service a2489d
Buffer::~Buffer()
Packit Service a2489d
{
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
const char *Buffer::GetRateStrS()
Packit Service a2489d
{
Packit Service a2489d
   if(!rate || !rate->Valid())
Packit Service a2489d
      return "";
Packit Service a2489d
   return rate->GetStrS();
Packit Service a2489d
}
Packit Service a2489d
void Buffer::RateAdd(int n)
Packit Service a2489d
{
Packit Service a2489d
   if(!rate)
Packit Service a2489d
      return;
Packit Service a2489d
   rate->Add(n);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void Buffer::SetError(const char *e,bool fatal)
Packit Service a2489d
{
Packit Service a2489d
   error_text.set(e);
Packit Service a2489d
   error_fatal=fatal;
Packit Service a2489d
}
Packit Service a2489d
void Buffer::SetErrorCached(const char *e)
Packit Service a2489d
{
Packit Service a2489d
   SetError(e,false);
Packit Service a2489d
   error_text.append(_(" [cached]"));
Packit Service a2489d
}
Packit Service a2489d
const char *Buffer::Dump() const
Packit Service a2489d
{
Packit Service a2489d
   if(buffer_ptr==0)
Packit Service a2489d
      return buffer.dump();
Packit Service a2489d
   return xstring::get_tmp(Get(),Size()).dump();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void DataTranslator::AppendTranslated(Buffer *target,const char *put_buf,int size)
Packit Service a2489d
{
Packit Service a2489d
   off_t old_pos=target->GetPos();
Packit Service a2489d
   PutTranslated(target,put_buf,size);
Packit Service a2489d
   target->SetPos(old_pos);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void DirectedBuffer::SetTranslator(DataTranslator *t)
Packit Service a2489d
{
Packit Service a2489d
   if(mode==GET && !translator && Size()>0) {
Packit Service a2489d
      // translate unread data
Packit Service a2489d
      const char *data;
Packit Service a2489d
      int len;
Packit Service a2489d
      Get(&data,&len;;
Packit Service a2489d
      t->Put(data,len);
Packit Service a2489d
      buffer.truncate(buffer_ptr);
Packit Service a2489d
      t->AppendTranslated(this,0,0);
Packit Service a2489d
   }
Packit Service a2489d
   translator=t;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
#ifdef HAVE_ICONV
Packit Service a2489d
void DataRecoder::PutTranslated(Buffer *target,const char *put_buf,int size)
Packit Service a2489d
{
Packit Service a2489d
   bool from_untranslated=false;
Packit Service a2489d
   if(Size()>0)
Packit Service a2489d
   {
Packit Service a2489d
      Put(put_buf,size);
Packit Service a2489d
      Get(&put_buf,&size);
Packit Service a2489d
      from_untranslated=true;
Packit Service a2489d
   }
Packit Service a2489d
   if(size<=0)
Packit Service a2489d
      return;
Packit Service a2489d
   if(!backend_translate)
Packit Service a2489d
   {
Packit Service a2489d
      target->Put(put_buf,size);
Packit Service a2489d
      if(from_untranslated)
Packit Service a2489d
	 Skip(size);
Packit Service a2489d
      return;
Packit Service a2489d
   }
Packit Service a2489d
   size_t put_size=size;
Packit Service a2489d
Packit Service a2489d
   int size_coeff=6;
Packit Service a2489d
try_again:
Packit Service a2489d
   if(put_size==0)
Packit Service a2489d
      return;
Packit Service a2489d
   size_t store_size=size_coeff*put_size;
Packit Service a2489d
   char *store_space=target->GetSpace(store_size);
Packit Service a2489d
   char *store_buf=store_space;
Packit Service a2489d
   const char *base_buf=put_buf;
Packit Service a2489d
   // do the translation
Packit Service a2489d
   ICONV_CONST char **put_buf_ptr=const_cast<ICONV_CONST char**>(&put_buf);
Packit Service a2489d
   size_t res=iconv(backend_translate,put_buf_ptr,&put_size,&store_buf,&store_size);
Packit Service a2489d
   target->SpaceAdd(store_buf-store_space);
Packit Service a2489d
   if(from_untranslated)
Packit Service a2489d
      Skip(put_buf-base_buf);
Packit Service a2489d
   if(res==(size_t)-1)
Packit Service a2489d
   {
Packit Service a2489d
      switch(errno)
Packit Service a2489d
      {
Packit Service a2489d
      case EINVAL: // incomplete character
Packit Service a2489d
	 if(!from_untranslated)
Packit Service a2489d
	    Put(put_buf,put_size);
Packit Service a2489d
	 break;
Packit Service a2489d
      case EILSEQ: // invalid character
Packit Service a2489d
	 target->Put("?");
Packit Service a2489d
	 put_buf++;
Packit Service a2489d
	 put_size--;
Packit Service a2489d
	 goto try_again;
Packit Service a2489d
      case E2BIG:  // no room to store result, allocate more.
Packit Service a2489d
	 size_coeff*=2;
Packit Service a2489d
	 goto try_again;
Packit Service a2489d
      default:
Packit Service a2489d
	 break;
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   return;
Packit Service a2489d
}
Packit Service a2489d
void DataRecoder::ResetTranslation()
Packit Service a2489d
{
Packit Service a2489d
   Empty();
Packit Service a2489d
   if(!backend_translate)
Packit Service a2489d
      return;
Packit Service a2489d
   iconv(backend_translate,0,0,0,0);
Packit Service a2489d
}
Packit Service a2489d
DataRecoder::~DataRecoder()
Packit Service a2489d
{
Packit Service a2489d
   if(backend_translate)
Packit Service a2489d
      iconv_close(backend_translate);
Packit Service a2489d
}
Packit Service a2489d
DataRecoder::DataRecoder(const char *from_code,const char *to_code,bool translit)
Packit Service a2489d
{
Packit Service a2489d
   backend_translate=0;
Packit Service a2489d
Packit Service a2489d
   if(translit) {
Packit Service a2489d
      const char *to_code_translit=xstring::cat(to_code,"//TRANSLIT",NULL);
Packit Service a2489d
      backend_translate=iconv_open(to_code_translit,from_code);
Packit Service a2489d
      if(backend_translate!=(iconv_t)-1) {
Packit Service a2489d
	 Log::global->Format(9,"initialized translation from %s to %s\n",from_code,to_code_translit);
Packit Service a2489d
	 return;
Packit Service a2489d
      }
Packit Service a2489d
      backend_translate=0;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   backend_translate=iconv_open(to_code,from_code);
Packit Service a2489d
   if(backend_translate!=(iconv_t)-1) {
Packit Service a2489d
      Log::global->Format(9,"initialized translation from %s to %s\n",from_code,to_code);
Packit Service a2489d
      return;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   Log::global->Format(0,"iconv_open(%s,%s) failed: %s\n",
Packit Service a2489d
			      to_code,from_code,strerror(errno));
Packit Service a2489d
   backend_translate=0;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void DirectedBuffer::SetTranslation(const char *enc,bool translit)
Packit Service a2489d
{
Packit Service a2489d
   if(!enc || !*enc)
Packit Service a2489d
      return;
Packit Service a2489d
   const char *local_code=ResMgr::Query("file:charset",0);
Packit Service a2489d
   if(!local_code || !*local_code)
Packit Service a2489d
      return;
Packit Service a2489d
   const char *from_code=(mode==GET?enc:local_code);
Packit Service a2489d
   const char *to_code  =(mode==GET?local_code:enc);
Packit Service a2489d
   if(!strcasecmp(from_code,to_code))
Packit Service a2489d
      return;
Packit Service a2489d
   SetTranslator(new DataRecoder(from_code,to_code,translit));
Packit Service a2489d
}
Packit Service a2489d
#endif //HAVE_ICONV
Packit Service a2489d
Packit Service a2489d
void DirectedBuffer::ResetTranslation()
Packit Service a2489d
{
Packit Service a2489d
   if(translator)
Packit Service a2489d
      translator->ResetTranslation();
Packit Service a2489d
}
Packit Service a2489d
void DirectedBuffer::Put(const char *buf,int size)
Packit Service a2489d
{
Packit Service a2489d
   if(mode==PUT && translator)
Packit Service a2489d
      translator->PutTranslated(this,buf,size);
Packit Service a2489d
   else
Packit Service a2489d
      Buffer::Put(buf,size);
Packit Service a2489d
}
Packit Service a2489d
void DirectedBuffer::PutTranslated(const char *buf,int size)
Packit Service a2489d
{
Packit Service a2489d
   if(translator)
Packit Service a2489d
      translator->PutTranslated(this,buf,size);
Packit Service a2489d
   else
Packit Service a2489d
      Buffer::Put(buf,size);
Packit Service a2489d
}
Packit Service a2489d
int DirectedBuffer::MoveDataHere(Buffer *buf,int size)
Packit Service a2489d
{
Packit Service a2489d
   if(size>buf->Size())
Packit Service a2489d
      size=buf->Size();
Packit Service a2489d
   if(mode==PUT && translator)
Packit Service a2489d
      translator->PutTranslated(this,buf->Get(),size);
Packit Service a2489d
   else
Packit Service a2489d
      return Buffer::MoveDataHere(buf,size);
Packit Service a2489d
   return size;
Packit Service a2489d
}
Packit Service a2489d
void DirectedBuffer::PutEOF()
Packit Service a2489d
{
Packit Service a2489d
   if(mode==PUT && translator)
Packit Service a2489d
      translator->PutTranslated(this,0,0);
Packit Service a2489d
   Buffer::PutEOF();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void DirectedBuffer::EmbraceNewData(int len)
Packit Service a2489d
{
Packit Service a2489d
   if(len<=0)
Packit Service a2489d
      return;
Packit Service a2489d
   RateAdd(len);
Packit Service a2489d
   if(translator)
Packit Service a2489d
   {
Packit Service a2489d
      // copy the data to free room for translated data
Packit Service a2489d
      translator->Put(buffer+buffer.length(),len);
Packit Service a2489d
      translator->AppendTranslated(this,0,0);
Packit Service a2489d
   }
Packit Service a2489d
   else
Packit Service a2489d
      SpaceAdd(len);
Packit Service a2489d
   SaveMaxCheck(0);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
Packit Service a2489d
IOBuffer::IOBuffer(dir_t m)
Packit Service a2489d
   : DirectedBuffer(m), event_time(now),
Packit Service a2489d
     max_buf(0), get_size(GET_BUFSIZE)
Packit Service a2489d
{
Packit Service a2489d
}
Packit Service a2489d
IOBuffer::~IOBuffer()
Packit Service a2489d
{
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void IOBuffer::Put(const char *buf,int size)
Packit Service a2489d
{
Packit Service a2489d
   if(size>=PUT_LL_MIN && Size()==0 && mode==PUT && !save && !translator)
Packit Service a2489d
   {
Packit Service a2489d
      int res=Put_LL(buf,size);
Packit Service a2489d
      if(res>=0)
Packit Service a2489d
      {
Packit Service a2489d
	 buf+=res;
Packit Service a2489d
	 size-=res;
Packit Service a2489d
	 pos+=res;
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   if(size<=0)
Packit Service a2489d
      return;
Packit Service a2489d
   if(Size()==0)
Packit Service a2489d
      current->Timeout(0);
Packit Service a2489d
   DirectedBuffer::Put(buf,size);
Packit Service a2489d
}
Packit Service a2489d
void IOBuffer::Put(const char *buf)
Packit Service a2489d
{
Packit Service a2489d
   Put(buf,strlen(buf));
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
int IOBuffer::TuneGetSize(int res)
Packit Service a2489d
{
Packit Service a2489d
   if(res>0)
Packit Service a2489d
   {
Packit Service a2489d
      // buffer size tuning depending on data rate
Packit Service a2489d
      const int max_get_size=(max_buf?max_buf:0x100000);
Packit Service a2489d
      if(res>get_size/2 && Size()+get_size*2<=max_get_size)
Packit Service a2489d
	 get_size*=2;
Packit Service a2489d
   }
Packit Service a2489d
   return res;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
int IOBuffer::Do()
Packit Service a2489d
{
Packit Service a2489d
   if(Done() || Error())
Packit Service a2489d
      return STALL;
Packit Service a2489d
   int res=0;
Packit Service a2489d
   switch(mode)
Packit Service a2489d
   {
Packit Service a2489d
   case PUT:
Packit Service a2489d
      if(Size()==0)
Packit Service a2489d
	 return STALL;
Packit Service a2489d
      res=Put_LL(buffer+buffer_ptr,Size());
Packit Service a2489d
      if(res>0)
Packit Service a2489d
      {
Packit Service a2489d
	 RateAdd(res);
Packit Service a2489d
	 buffer_ptr+=res;
Packit Service a2489d
	 event_time=now;
Packit Service a2489d
	 if(eof)
Packit Service a2489d
	    PutEOF_LL();
Packit Service a2489d
	 return MOVED;
Packit Service a2489d
      }
Packit Service a2489d
      break;
Packit Service a2489d
Packit Service a2489d
   case GET:
Packit Service a2489d
      if(eof)
Packit Service a2489d
	 return STALL;
Packit Service a2489d
      res=TuneGetSize(Get_LL(get_size));
Packit Service a2489d
      if(res>0)
Packit Service a2489d
      {
Packit Service a2489d
	 EmbraceNewData(res);
Packit Service a2489d
	 event_time=now;
Packit Service a2489d
	 return MOVED;
Packit Service a2489d
      }
Packit Service a2489d
      if(eof)
Packit Service a2489d
      {
Packit Service a2489d
	 event_time=now;
Packit Service a2489d
	 return MOVED;
Packit Service a2489d
      }
Packit Service a2489d
      break;
Packit Service a2489d
   }
Packit Service a2489d
   if(res<0)
Packit Service a2489d
   {
Packit Service a2489d
      event_time=now;
Packit Service a2489d
      return MOVED;
Packit Service a2489d
   }
Packit Service a2489d
   return STALL;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
// IOBufferStacked implementation
Packit Service a2489d
#undef super
Packit Service a2489d
#define super IOBuffer
Packit Service a2489d
int IOBufferStacked::Do()
Packit Service a2489d
{
Packit Service a2489d
   int m=STALL;
Packit Service a2489d
   if(Done() || Error())
Packit Service a2489d
      return m;
Packit Service a2489d
   int res=0;
Packit Service a2489d
   switch(mode)
Packit Service a2489d
   {
Packit Service a2489d
   case PUT:
Packit Service a2489d
      if(down->Broken() && !broken)
Packit Service a2489d
      {
Packit Service a2489d
	 broken=true;
Packit Service a2489d
	 return MOVED;
Packit Service a2489d
      }
Packit Service a2489d
      if(down->Error())
Packit Service a2489d
      {
Packit Service a2489d
	 SetError(down->ErrorText(),down->ErrorFatal());
Packit Service a2489d
	 m=MOVED;
Packit Service a2489d
      }
Packit Service a2489d
      if(Size()==0)
Packit Service a2489d
	 return m;
Packit Service a2489d
      res=Put_LL(buffer+buffer_ptr,Size());
Packit Service a2489d
      if(res>0)
Packit Service a2489d
      {
Packit Service a2489d
	 buffer_ptr+=res;
Packit Service a2489d
	 m=MOVED;
Packit Service a2489d
      }
Packit Service a2489d
      break;
Packit Service a2489d
Packit Service a2489d
   case GET:
Packit Service a2489d
      if(eof)
Packit Service a2489d
	 return m;
Packit Service a2489d
      res=Get_LL(/*unused*/0);
Packit Service a2489d
      if(res>0)
Packit Service a2489d
      {
Packit Service a2489d
	 EmbraceNewData(res);
Packit Service a2489d
	 m=MOVED;
Packit Service a2489d
      }
Packit Service a2489d
      if(eof)
Packit Service a2489d
	 m=MOVED;
Packit Service a2489d
      if(down->Error())
Packit Service a2489d
      {
Packit Service a2489d
	 SetError(down->ErrorText(),down->ErrorFatal());
Packit Service a2489d
	 m=MOVED;
Packit Service a2489d
      }
Packit Service a2489d
      break;
Packit Service a2489d
   }
Packit Service a2489d
   if(res<0)
Packit Service a2489d
      return MOVED;
Packit Service a2489d
   return m;
Packit Service a2489d
}
Packit Service a2489d
int IOBufferStacked::Put_LL(const char *buf,int size)
Packit Service a2489d
{
Packit Service a2489d
   if(down->Broken())
Packit Service a2489d
   {
Packit Service a2489d
      broken=true;
Packit Service a2489d
      return -1;
Packit Service a2489d
   }
Packit Service a2489d
   down->Put(buf,size);
Packit Service a2489d
   return size;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
int IOBufferStacked::Get_LL(int)
Packit Service a2489d
{
Packit Service a2489d
   if(max_buf && Size()>=max_buf) {
Packit Service a2489d
      down->SuspendSlave();
Packit Service a2489d
      return 0;
Packit Service a2489d
   }
Packit Service a2489d
   down->ResumeSlave();
Packit Service a2489d
   int size=MoveDataHere(down,down->Size());
Packit Service a2489d
   if(down->Size()==0 && down->Eof())
Packit Service a2489d
      PutEOF();
Packit Service a2489d
   return size;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
bool IOBufferStacked::Done()
Packit Service a2489d
{
Packit Service a2489d
   if(super::Done())
Packit Service a2489d
      return down->Done();
Packit Service a2489d
   return false;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void IOBufferStacked::SuspendInternal()
Packit Service a2489d
{
Packit Service a2489d
   super::SuspendInternal();
Packit Service a2489d
   down->SuspendSlave();
Packit Service a2489d
}
Packit Service a2489d
void IOBufferStacked::ResumeInternal()
Packit Service a2489d
{
Packit Service a2489d
   if(!max_buf || Size()
Packit Service a2489d
      down->ResumeSlave();
Packit Service a2489d
   super::ResumeInternal();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
// IOBufferFDStream implementation
Packit Service a2489d
#include <fcntl.h>
Packit Service a2489d
#include <unistd.h>
Packit Service a2489d
#undef super
Packit Service a2489d
#define super IOBuffer
Packit Service a2489d
int IOBufferFDStream::Put_LL(const char *buf,int size)
Packit Service a2489d
{
Packit Service a2489d
   if(put_ll_timer && !eof && Size()
Packit Service a2489d
   && !put_ll_timer->Stopped())
Packit Service a2489d
      return 0;
Packit Service a2489d
   if(stream->broken())
Packit Service a2489d
   {
Packit Service a2489d
      broken=true;
Packit Service a2489d
      return -1;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   int res=0;
Packit Service a2489d
Packit Service a2489d
   int fd=stream->getfd();
Packit Service a2489d
   if(fd==-1)
Packit Service a2489d
   {
Packit Service a2489d
      if(stream->error())
Packit Service a2489d
	 goto stream_err;
Packit Service a2489d
      TimeoutS(1);
Packit Service a2489d
      event_time=now;
Packit Service a2489d
      return 0;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   res=write(fd,buf,size);
Packit Service a2489d
   if(res==-1)
Packit Service a2489d
   {
Packit Service a2489d
      saved_errno=errno;
Packit Service a2489d
      if(E_RETRY(saved_errno))
Packit Service a2489d
      {
Packit Service a2489d
	 Block(fd,POLLOUT);
Packit Service a2489d
	 return 0;
Packit Service a2489d
      }
Packit Service a2489d
      if(NonFatalError(saved_errno))
Packit Service a2489d
	 return 0;
Packit Service a2489d
      if(errno==EPIPE)
Packit Service a2489d
      {
Packit Service a2489d
	 broken=true;
Packit Service a2489d
	 return -1;
Packit Service a2489d
      }
Packit Service a2489d
      stream->MakeErrorText(saved_errno);
Packit Service a2489d
      goto stream_err;
Packit Service a2489d
   }
Packit Service a2489d
   if(put_ll_timer)
Packit Service a2489d
      put_ll_timer->Reset();
Packit Service a2489d
   return res;
Packit Service a2489d
Packit Service a2489d
stream_err:
Packit Service a2489d
   SetError(stream->error_text,!TemporaryNetworkError(saved_errno));
Packit Service a2489d
   return -1;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
int IOBufferFDStream::Get_LL(int size)
Packit Service a2489d
{
Packit Service a2489d
   if(max_buf && Size()>=max_buf)
Packit Service a2489d
      return 0;
Packit Service a2489d
Packit Service a2489d
   int res=0;
Packit Service a2489d
Packit Service a2489d
   int fd=stream->getfd();
Packit Service a2489d
   if(fd==-1)
Packit Service a2489d
   {
Packit Service a2489d
      if(stream->error())
Packit Service a2489d
	 goto stream_err;
Packit Service a2489d
      TimeoutS(1);
Packit Service a2489d
      return 0;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   if(!Ready(fd,POLLIN))
Packit Service a2489d
   {
Packit Service a2489d
      Block(fd,POLLIN);
Packit Service a2489d
      return 0;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   res=read(fd,GetSpace(size),size);
Packit Service a2489d
   if(res==-1)
Packit Service a2489d
   {
Packit Service a2489d
      saved_errno=errno;
Packit Service a2489d
      if(E_RETRY(saved_errno))
Packit Service a2489d
      {
Packit Service a2489d
	 SetNotReady(fd,POLLIN);
Packit Service a2489d
	 Block(fd,POLLIN);
Packit Service a2489d
	 return 0;
Packit Service a2489d
      }
Packit Service a2489d
      if(NonFatalError(saved_errno))
Packit Service a2489d
	 return 0;
Packit Service a2489d
      stream->MakeErrorText(saved_errno);
Packit Service a2489d
      goto stream_err;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   if(res==0) {
Packit Service a2489d
      Log::global->Format(10,"buffer: EOF on FD %d\n",fd);
Packit Service a2489d
      eof=true;
Packit Service a2489d
   }
Packit Service a2489d
   return res;
Packit Service a2489d
Packit Service a2489d
stream_err:
Packit Service a2489d
   SetError(stream->error_text,!TemporaryNetworkError(saved_errno));
Packit Service a2489d
   return -1;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
FgData *IOBufferFDStream::GetFgData(bool fg)
Packit Service a2489d
{
Packit Service a2489d
   if(stream->getfd()!=-1)
Packit Service a2489d
      return new FgData(stream->GetProcGroup(),fg);
Packit Service a2489d
   return 0;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
bool IOBufferFDStream::Done()
Packit Service a2489d
{
Packit Service a2489d
   if(put_ll_timer)
Packit Service a2489d
      put_ll_timer->Stop();
Packit Service a2489d
   if(super::Done())
Packit Service a2489d
      return stream->Done(); // stream->Done indicates if sub-process finished
Packit Service a2489d
   return false;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
IOBufferFDStream::~IOBufferFDStream() {}
Packit Service a2489d
Packit Service a2489d
Packit Service a2489d
// IOBufferFileAccess implementation
Packit Service a2489d
#undef super
Packit Service a2489d
#define super IOBuffer
Packit Service a2489d
int IOBufferFileAccess::Get_LL(int size)
Packit Service a2489d
{
Packit Service a2489d
   if(max_buf && Size()>=max_buf) {
Packit Service a2489d
      session->SuspendSlave();
Packit Service a2489d
      return 0;
Packit Service a2489d
   }
Packit Service a2489d
   session->ResumeSlave();
Packit Service a2489d
Packit Service a2489d
   int res=0;
Packit Service a2489d
Packit Service a2489d
   res=session->Read(this,size);
Packit Service a2489d
   if(res<0)
Packit Service a2489d
   {
Packit Service a2489d
      if(res==FA::DO_AGAIN)
Packit Service a2489d
	 return 0;
Packit Service a2489d
      SetError(session->StrError(res));
Packit Service a2489d
      return -1;
Packit Service a2489d
   }
Packit Service a2489d
   if(res==0)
Packit Service a2489d
      eof=true;
Packit Service a2489d
   return res;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void IOBufferFileAccess::SuspendInternal()
Packit Service a2489d
{
Packit Service a2489d
   super::SuspendInternal();
Packit Service a2489d
   session->SuspendSlave();
Packit Service a2489d
}
Packit Service a2489d
void IOBufferFileAccess::ResumeInternal()
Packit Service a2489d
{
Packit Service a2489d
   if(!max_buf || Size()
Packit Service a2489d
      session->ResumeSlave();
Packit Service a2489d
   super::ResumeInternal();
Packit Service a2489d
}
Packit Service a2489d
const char *IOBufferFileAccess::Status()
Packit Service a2489d
{
Packit Service a2489d
   return session->CurrentStatus();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
unsigned long long Buffer::UnpackUINT64BE(int offset) const
Packit Service a2489d
{
Packit Service a2489d
   if(Size()-offset<8)
Packit Service a2489d
      return 0;
Packit Service a2489d
   unsigned long long res=UnpackUINT32BE(offset);
Packit Service a2489d
   res=(res<<32)|UnpackUINT32BE(offset+4);
Packit Service a2489d
   return res;
Packit Service a2489d
}
Packit Service a2489d
long long Buffer::UnpackINT64BE(int offset) const
Packit Service a2489d
{
Packit Service a2489d
   unsigned long long n=UnpackUINT64BE(offset);
Packit Service a2489d
   if(n&0x8000000000000000ULL)
Packit Service a2489d
      return -(long long)(n^0xFFFFFFFFFFFFFFFFULL)-1;
Packit Service a2489d
   return (long long)n;
Packit Service a2489d
}
Packit Service a2489d
unsigned Buffer::UnpackUINT32BE(int offset) const
Packit Service a2489d
{
Packit Service a2489d
   if(Size()-offset<4)
Packit Service a2489d
      return 0;
Packit Service a2489d
   unsigned char *b=(unsigned char*)buffer.get()+buffer_ptr+offset;
Packit Service a2489d
   return (b[0]<<24)|(b[1]<<16)|(b[2]<<8)|b[3];
Packit Service a2489d
}
Packit Service a2489d
int Buffer::UnpackINT32BE(int offset) const
Packit Service a2489d
{
Packit Service a2489d
   unsigned n=UnpackUINT32BE(offset);
Packit Service a2489d
   if(n&0x80000000U)
Packit Service a2489d
      return -(int)(n^0xFFFFFFFFU)-1;
Packit Service a2489d
   return (int)n;
Packit Service a2489d
}
Packit Service a2489d
unsigned Buffer::UnpackUINT16BE(int offset) const
Packit Service a2489d
{
Packit Service a2489d
   if(Size()-offset<2)
Packit Service a2489d
      return 0;
Packit Service a2489d
   unsigned char *b=(unsigned char*)buffer.get()+buffer_ptr+offset;
Packit Service a2489d
   return (b[0]<<8)|b[1];
Packit Service a2489d
}
Packit Service a2489d
unsigned Buffer::UnpackUINT8(int offset) const
Packit Service a2489d
{
Packit Service a2489d
   if(Size()-offset<1)
Packit Service a2489d
      return 0;
Packit Service a2489d
   unsigned char *b=(unsigned char*)buffer.get()+buffer_ptr+offset;
Packit Service a2489d
   return b[0];
Packit Service a2489d
}
Packit Service a2489d
void Buffer::PackUINT64BE(unsigned long long data)
Packit Service a2489d
{
Packit Service a2489d
#ifndef NDEBUG
Packit Service a2489d
   Log::global->Format(11,"PackUINT64BE(0x%016llX)\n",data);
Packit Service a2489d
#endif
Packit Service a2489d
   Allocate(8);
Packit Service a2489d
   PackUINT32BE((unsigned)(data>>32));
Packit Service a2489d
   PackUINT32BE((unsigned)(data&0xFFFFFFFFU));
Packit Service a2489d
}
Packit Service a2489d
void Buffer::PackINT64BE(long long data)
Packit Service a2489d
{
Packit Service a2489d
   unsigned long long n;
Packit Service a2489d
   if(data<0)
Packit Service a2489d
      n=((unsigned long long)(-data)^0xFFFFFFFFFFFFFFFFULL)+1;
Packit Service a2489d
   else
Packit Service a2489d
      n=(unsigned long long)data;
Packit Service a2489d
   PackUINT64BE(n);
Packit Service a2489d
}
Packit Service a2489d
void Buffer::PackUINT32BE(unsigned data)
Packit Service a2489d
{
Packit Service a2489d
#ifndef NDEBUG
Packit Service a2489d
   Log::global->Format(11,"PackUINT32BE(0x%08X)\n",data);
Packit Service a2489d
#endif
Packit Service a2489d
   char *b=GetSpace(4);
Packit Service a2489d
   b[0]=(data>>24)&255;
Packit Service a2489d
   b[1]=(data>>16)&255;
Packit Service a2489d
   b[2]=(data>>8)&255;
Packit Service a2489d
   b[3]=(data)&255;
Packit Service a2489d
   SpaceAdd(4);
Packit Service a2489d
}
Packit Service a2489d
void Buffer::PackINT32BE(int data)
Packit Service a2489d
{
Packit Service a2489d
   unsigned n;
Packit Service a2489d
   if(data<0)
Packit Service a2489d
      n=((unsigned)(-data)^0xFFFFFFFFU)+1;
Packit Service a2489d
   else
Packit Service a2489d
      n=(unsigned)data;
Packit Service a2489d
   PackUINT32BE(n);
Packit Service a2489d
}
Packit Service a2489d
void Buffer::PackUINT16BE(unsigned data)
Packit Service a2489d
{
Packit Service a2489d
   char *b=GetSpace(2);
Packit Service a2489d
   b[0]=(data>>8)&255;
Packit Service a2489d
   b[1]=(data)&255;
Packit Service a2489d
   SpaceAdd(2);
Packit Service a2489d
}
Packit Service a2489d
void Buffer::PackUINT8(unsigned data)
Packit Service a2489d
{
Packit Service a2489d
#ifndef NDEBUG
Packit Service a2489d
   Log::global->Format(11,"PackUINT8(0x%02X)\n",data);
Packit Service a2489d
#endif
Packit Service a2489d
   char *b=GetSpace(1);
Packit Service a2489d
   b[0]=(data)&255;
Packit Service a2489d
   SpaceAdd(1);
Packit Service a2489d
}