/*
* lftp - file transfer program
*
* Copyright (c) 1996-2017 by Alexander V. Lukyanov (lav@yars.free.net)
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
/* FileCopyPeer behaviour:
1) when suspended, does nothing
2) tries to read some data at seek_pos, sets pos to position of Get (get).
2.5) tries to position to seek_pos and gets ready to write (put).
3) if it cannot seek to seek_pos, changes pos to what it can seek.
4) if it knows that it cannot seek to pos>0, CanSeek()==false
5) if it knows that it cannot seek to pos==0, CanSeek0()==false
6) it tries to get date/size if told to. (get)
7) it sets date on the file if eof is reached and date is known (put).
8) if put needs size/date before it writes data, NeedSizeDateBeforehand()==true.
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "FileCopy.h"
#include "url.h"
#include "log.h"
#include "misc.h"
#include "LsCache.h"
#include "plural.h"
#include "ArgV.h"
#define skip_threshold 0x1000
ResDecl rate_period ("xfer:rate-period","15", ResMgr::UNumberValidate,ResMgr::NoClosure);
ResDecl eta_period ("xfer:eta-period", "120",ResMgr::UNumberValidate,ResMgr::NoClosure);
ResDecl max_redir ("xfer:max-redirections", "5",ResMgr::UNumberValidate,ResMgr::NoClosure);
ResDecl buffer_size ("xfer:buffer-size","0x10000",ResMgr::UNumberValidate,ResMgr::NoClosure);
// It's bad when lftp receives data in small chunks, try to accumulate
// data in a kernel buffer using a delay and slurp it at once:
enum {
// Delays in microseconds
MAX_DELAY=30000,
DELAY_STEP=30,
// This size is related to socket buffer size.
// When it is too large, tcp slowdown happens.
// SSL has packet size 0x4000, so we have to use a lower threshold.
MAX_READ_TO_DELAY=0x3F00,
};
// FileCopy
#define super SMTask
#define set_state(s) do { state=(s); \
Log::global->Format(11,"FileCopy(%p) enters state %s\n", this, #s); } while(0)
int FileCopy::Do()
{
int m=STALL;
const char *b;
int s;
int rate_add;
if(Error() || Done())
return m;
switch(state)
{
pre_INITIAL:
set_state(INITIAL);
m=MOVED;
case(INITIAL):
if(remove_target_first && !put->FileRemoved())
return m;
remove_target_first=false;
if(cont && put->CanSeek())
put->WantSize();
if(put->NeedSizeDateBeforehand() || (cont && put->CanSeek() && put->GetSize()==NO_SIZE_YET))
{
if(get->GetSize()==NO_SIZE_YET || get->GetDate()==NO_DATE_YET)
{
put->Suspend();
get->DontStartTransferYet();
get->Resume();
get->WantSize();
if(put->NeedDate())
get->WantDate();
goto pre_GET_INFO_WAIT;
}
}
if(get->GetSize()==NO_SIZE_YET)
get->WantSize();
if(get->GetSize()!=NO_SIZE && get->GetSize()!=NO_SIZE_YET)
put->SetEntitySize(get->GetSize());
if(get->GetDate()!=NO_DATE && get->GetDate()!=NO_DATE_YET)
put->SetDate(get->GetDate());
else if(get->GetDate()==NO_DATE_YET)
{
if(put->NeedDate())
get->WantDate();
}
if(cont && put->CanSeek())
put->Seek(FILE_END);
else
{
if(put->range_start>0 && put->CanSeek())
put->Seek(put->range_start);
if(get->range_start>0 && get->CanSeek())
get->Seek(get->range_start);
goto pre_DO_COPY;
}
get->Suspend();
put->Resume();
set_state(PUT_WAIT);
m=MOVED;
/* fallthrough */
case(PUT_WAIT):
if(put->Error())
goto put_error;
if(put->GetSeekPos()!=FILE_END && get->GetSize()>=0
&& put->GetSeekPos()>=get->GetSize())
{
debug((9,_("copy: destination file is already complete\n")));
if(get->GetDate()!=NO_DATE)
goto pre_CONFIRM_WAIT; // have to set the date.
goto pre_GET_DONE_WAIT;
}
if(!put->IOReady())
return m;
/* now we know if put's seek failed. Seek get accordingly. */
if(get->CanSeek())
get->Seek(put->GetRealPos());
pre_DO_COPY:
get->Resume();
get->StartTransfer();
RateReset();
set_state(DO_COPY);
m=MOVED;
/* fallthrough */
case(DO_COPY): {
if(put->Error())
{
put_error:
SetError(put->ErrorText());
return MOVED;
}
if(get->Error() && get->Size()==0)
{
put->DontVerify();
if(put->GetPos()>0)
{
put->PutEOF();
put->Roll();
}
get_error:
SetError(get->ErrorText());
return MOVED;
}
if(put->Broken())
{
get->Suspend();
if(!put->Done())
return m;
debug((9,_("copy: put is broken\n")));
if(fail_if_broken)
{
SetError(strerror(EPIPE));
return MOVED;
}
goto pre_GET_DONE_WAIT;
}
put->Resume();
if(put->GetSeekPos()==FILE_END) // put position is not known yet.
{
get->Suspend();
return m;
}
get->Resume();
if(fail_if_cannot_seek && (get->GetRealPos()range_start
|| put->GetRealPos()range_start
|| get->GetRealPos()!=put->GetRealPos()))
{
SetError(_("seek failed"));
return MOVED;
}
if(high_watermark_timeout.Stopped())
{
SetError(_("no progress timeout"));
return MOVED;
}
if(get->GetSize()>0 && get->GetRealPos()>get->GetSize())
{
get->SetSize(NO_SIZE_YET);
get->SetDate(NO_DATE_YET);
}
long lbsize=0;
if(line_buffer)
lbsize=line_buffer->Size();
/* check if positions are correct */
off_t get_pos=get->GetRealPos()-get->range_start;
off_t put_pos=put->GetRealPos()-put->range_start;
if(get_pos-lbsize!=put_pos)
{
if(line_buffer)
line_buffer->Empty();
if(get_pos==put_pos)
{ // rare case.
return MOVED;
}
if(put_posCanSeek(put->GetRealPos()))
{
// we lose... How about a large buffer?
SetError(_("cannot seek on data source"));
return MOVED;
}
debug((9,_("copy: put rolled back to %lld, seeking get accordingly\n"),
(long long)put->GetRealPos()));
debug((10,"copy: get position was %lld\n",
(long long)get->GetRealPos()));
get->Seek(put->GetRealPos());
return MOVED;
}
else // put_pos > get_pos
{
off_t size=get->GetSize();
if(size>=0 && put->GetRealPos()>=size)
{
// simulate eof, as we have already have the whole file.
debug((9,_("copy: all data received, but get rolled back\n")));
goto eof;
}
off_t skip=put->GetRealPos()-get->GetRealPos();
if(!put->CanSeek(get->GetRealPos()) || skipGet(&b,&s);
if(skip>s)
skip=s;
if(skip==0)
return m;
get->Skip(skip);
bytes_count+=skip;
return MOVED;
}
debug((9,_("copy: get rolled back to %lld, seeking put accordingly\n"),
(long long)get->GetRealPos()));
put->Seek(get->GetRealPos());
return MOVED;
}
}
if(put->IsFull())
get->Suspend(); // stall the get.
get->Get(&b,&s);
if(b==0) // eof
{
debug((10,"copy: get hit eof\n"));
goto eof;
}
rate_add=put_buf;
if(s==0)
{
put_buf=put->Buffered();
rate_add-=put_buf;
RateAdd(rate_add);
if(put->Size()==0)
put->Suspend();
return m;
}
m=MOVED;
if(get->range_limit!=FILE_END && get->range_limitGetRealPos()+s)
{
s=get->range_limit-get->GetRealPos();
if(s<0)
s=0;
}
if(line_buffer)
{
const char *lb;
int ls;
if(line_buffer->Size()>line_buffer_max)
{
line_buffer->Get(&lb,&ls);
put->Put(lb,ls);
line_buffer->Skip(ls);
}
line_buffer->Put(b,s);
get->Skip(s);
bytes_count+=s;
// now find eol in line_buffer.
line_buffer->Get(&lb,&ls);
const char *eol=0;
if(get->Eof() || get->Error())
eol=lb+ls-1;
else
eol=memrchr(lb,'\n',ls);
if(eol)
{
put->Put(lb,eol-lb+1);
line_buffer->Skip(eol-lb+1);
}
}
else
{
put->Put(b,s);
get->Skip(s);
bytes_count+=s;
}
put_buf=put->Buffered();
rate_add-=put_buf-s;
RateAdd(rate_add);
if(high_watermarkrange_limit!=FILE_END && get->range_limit<=get->GetRealPos())
{
debug((10,"copy: get reached range limit\n"));
goto eof;
}
return m;
}
eof:
if(line_buffer)
{
line_buffer->Get(&b,&s);
put->Put(b,s);
line_buffer->Skip(s);
}
if(!CheckFileSizeAtEOF())
{
SetError(_("file size decreased during transfer"));
return MOVED;
}
pre_CONFIRM_WAIT:
put->SetSuggestedFileName(get->GetSuggestedFileName());
put->SetDate(get->GetDate());
if(get->GetSize()!=NO_SIZE && get->GetSize()!=NO_SIZE_YET)
put->SetEntitySize(get->GetSize());
put->PutEOF();
get->Suspend();
put->Resume();
put_eof_pos=put->GetRealPos();
debug((10,"copy: waiting for put confirmation\n"));
set_state(CONFIRM_WAIT);
m=MOVED;
case(CONFIRM_WAIT):
if(put->Error())
goto put_error;
/* check if put position is correct */
if(put_eof_pos!=put->GetRealPos() || put->GetSeekPos()==FILE_END)
{
set_state(DO_COPY);
return MOVED;
}
rate_add=put_buf;
put_buf=put->Buffered();
rate_add-=put_buf;
RateAdd(rate_add);
if(!put->Done())
return m;
debug((10,"copy: put confirmed store\n"));
pre_GET_DONE_WAIT:
get->Empty();
get->PutEOF();
get->Resume();
set_state(GET_DONE_WAIT);
m=MOVED;
end_time=now;
put->Suspend();
/* fallthrough */
case(GET_DONE_WAIT):
if(get->Error())
goto get_error;
if(remove_source_later)
{
get->RemoveFile();
remove_source_later=false;
}
if(!get->Done())
return m;
debug((10,"copy: get is finished - all done\n"));
set_state(ALL_DONE);
get->Suspend();
LogTransfer();
return MOVED;
pre_GET_INFO_WAIT:
set_state(GET_INFO_WAIT);
m=MOVED;
case(GET_INFO_WAIT):
if(get->Error())
goto get_error;
if(get->GetSize()==NO_SIZE_YET || get->GetDate()==NO_DATE_YET)
return m;
goto pre_INITIAL;
case(ALL_DONE):
return m;
}
return m;
}
FileCopy::FileCopy(FileCopyPeer *s,FileCopyPeer *d,bool c)
: get(s), put(d), cont(c),
rate("xfer:rate-period"),
rate_for_eta("xfer:eta-period"),
high_watermark_timeout("xfer:timeout",0)
{
set_state(INITIAL);
int max_buf=buffer_size.Query(0);
if(max_buf<1)
max_buf=1;
s->SetMaxBuffered(max_buf);
d->SetMaxBuffered(max_buf);
put_buf=0;
put_eof_pos=0;
high_watermark=0;
bytes_count=0;
fail_if_cannot_seek=false;
fail_if_broken=true;
remove_source_later=false;
remove_target_first=false;
line_buffer_max=0;
}
FileCopy::~FileCopy()
{
}
FileCopy *FileCopy::New(FileCopyPeer *s,FileCopyPeer *d,bool c)
{
FileCopy *res=0;
if(fxp_create)
res=fxp_create(s,d,c);
if(res)
return res;
return new FileCopy(s,d,c);
}
void FileCopy::SuspendInternal()
{
super::SuspendInternal();
if(get) get->SuspendSlave();
if(put) put->SuspendSlave();
}
void FileCopy::ResumeInternal()
{
if(get) get->ResumeSlave();
if(put) put->ResumeSlave();
super::ResumeInternal();
}
void FileCopy::Fg()
{
if(get) get->Fg();
if(put) put->Fg();
}
void FileCopy::Bg()
{
if(get) get->Bg();
if(put) put->Bg();
}
void FileCopy::SetError(const char *str)
{
error_text.set(str);
get=0;
}
void FileCopy::LineBuffered(int s)
{
if(!line_buffer)
line_buffer=new Buffer();
line_buffer_max=s;
}
off_t FileCopy::GetPos() const
{
if(put) {
off_t pos = put->GetRealPos() - put->Buffered();
// sometimes Buffered overestimates the amount of buffered data
if(pos<0)
pos=0;
return pos;
}
if(get)
return get->GetRealPos();
return 0;
}
off_t FileCopy::GetSize() const
{
if(get)
return get->GetSize();
return NO_SIZE;
}
int FileCopy::GetPercentDone() const
{
if(!get || !put)
return 100;
off_t size=get->GetSize();
if(size==NO_SIZE || size==NO_SIZE_YET)
return -1;
if(size==0)
return 0;
off_t ppos=put->GetRealPos() - put->Buffered() - put->range_start;
if(ppos<0)
return 0;
off_t psize=size-put->range_start;
if(put->range_limit!=FILE_END)
psize=put->range_limit-put->range_start;
if(psize<0)
return 100;
if(ppos>psize)
return -1;
return percent(ppos,psize);
}
const char *FileCopy::GetPercentDoneStr() const
{
int pct=GetPercentDone();
if(pct==-1)
return "";
static char buf[8];
snprintf(buf,8,"(%d%%) ",pct);
return buf;
}
void FileCopy::RateAdd(int a)
{
rate.Add(a);
rate_for_eta.Add(a);
}
void FileCopy::RateReset()
{
start_time=now;
rate.Reset();
rate_for_eta.Reset();
}
float FileCopy::GetRate()
{
if(!rate.Valid() || !put)
return 0;
return rate.Get();
}
const char *FileCopy::GetRateStr()
{
if(!rate.Valid() || !put)
return "";
return rate.GetStrS();
}
off_t FileCopy::GetBytesRemaining()
{
if(!get)
return 0;
if(get->range_limit==FILE_END)
{
off_t size=get->GetSize();
if(size<=0 || sizeGetRealPos() || !rate_for_eta.Valid())
return -1;
return(size-GetPos());
}
return get->range_limit-GetPos();
}
const char *FileCopy::GetETAStr()
{
off_t b=GetBytesRemaining();
if(b<0 || !put)
return "";
return rate_for_eta.GetETAStrSFromSize(b);
}
long FileCopy::GetETA(off_t b)
{
if(b<0 || !rate_for_eta.Valid())
return -1;
return (long)(double(b) / rate_for_eta.Get() + 0.5);
}
const char *FileCopy::GetStatus()
{
static xstring buf;
const char *get_st=get?get->GetStatus():0;
const char *put_st=put?put->GetStatus():0;
if(get_st && get_st[0] && put_st && put_st[0])
buf.vset("[",get_st,"->",put_st,"]",NULL);
else if(get_st && get_st[0])
buf.vset("[",get_st,"]",NULL);
else if(put_st && put_st[0])
buf.vset("[",put_st,"]",NULL);
else
return "";
return buf;
}
double FileCopy::GetTimeSpent()
{
if(end_timeGetFgData(fg);
if(f) return f;
if(put) f=put->GetFgData(fg);
return f;
}
pid_t FileCopy::GetProcGroup()
{
pid_t p=0;
if(get) p=get->GetProcGroup();
if(p) return p;
if(put) p=put->GetProcGroup();
return p;
}
void FileCopy::Kill(int sig)
{
if(get) get->Kill(sig);
if(put) put->Kill(sig);
}
Ref FileCopy::transfer_log;
void FileCopy::LogTransfer()
{
const char *log_ctx="xfer";
if(!ResMgr::QueryBool("log:enabled",log_ctx))
return;
const char *src=get->GetURL();
if(!src)
return;
src=alloca_strdup(src);
const char *dst=put->GetURL();
if(!dst)
return;
dst=alloca_strdup(dst);
if(!transfer_log)
transfer_log=new Log(log_ctx);
long long range_limit=GetRangeLimit();
if(range_limit==FILE_END)
range_limit=get->GetPos();
transfer_log->Format(0,"%s -> %s %lld-%lld %s\n",
url::remove_password(src),url::remove_password(dst),
(long long)GetRangeStart(),range_limit,
Speedometer::GetStrProper(GetBytesCount()/GetTimeSpent()).get());
}
void FileCopy::SetRange(off_t s,off_t lim)
{
get->SetRange(s,lim);
put->SetRange(s,lim);
}
bool FileCopy::CheckFileSizeAtEOF() const
{
long long range_limit=GetRangeLimit();
if(range_limit==FILE_END)
{
const long long size=GetSize();
if(size==NO_SIZE || size==NO_SIZE_YET)
return true; // nothing to compare with.
range_limit=size;
}
const long long get_pos=get->GetRealPos();
const long long put_pos=put->GetRealPos();
const long long pos=(get_pos>put_pos ? get_pos : put_pos);
if(pos<=0 || pos>=range_limit)
return true;
debug((0,"expected pos=%lld, actual pos=%lld\n",range_limit,pos));
return false;
}
// FileCopyPeer implementation
#undef super
#define super Buffer
off_t FileCopyPeer::GetSize()
{
if(size>=0 && pos>size)
WantSize();
return size;
}
void FileCopyPeer::SetSize(off_t s)
{
size=s;
if(seek_pos==FILE_END)
{
if(size!=NO_SIZE && size!=NO_SIZE_YET)
seek_pos=size;
else
seek_pos=0;
}
}
void FileCopyPeer::SetDate(time_t d,int p)
{
date.set(d,p);
if(d==NO_DATE || d==NO_DATE_YET)
date_set=true;
else
date_set=false;
}
void FileCopyPeer::SetRange(const off_t s,const off_t lim)
{
range_start=s;
range_limit=lim;
if(mode==PUT || range_start>GetPos()+0x4000)
Seek(range_start);
}
bool FileCopyPeer::Done()
{
if(Error())
return true;
if(eof && Size()==0)
{
if(removing)
return false;
if(mode==PUT)
return done;
return true;
}
if(broken)
return true;
return false;
}
void FileCopyPeer::Seek(off_t offs)
{
seek_pos=offs;
if(mode==PUT)
pos-=Size();
Empty();
eof=false;
broken=false;
}
const char *FileCopy::TempFileName(const char *file)
{
if(!ResMgr::QueryBool("xfer:use-temp-file",0))
return file;
xstring &temp=xstring::get_tmp(ResMgr::Query("xfer:temp-file-name",0));
if(temp.length()==0 || temp.eq("*"))
return file;
const char *name=basename_ptr(file);
int subst_pos=temp.instr('*');
if(subst_pos>=0)
temp.set_substr(subst_pos,1,name);
else {
if(temp.last_char()=='.')
temp.append(name);
else if(temp[0]=='.')
temp.set_substr(0,0,name);
else
temp.append('.').append(name);
}
return dir_file(dirname(file),temp);
}
const char *FileCopyPeer::UseTempFile(const char *file)
{
const char *temp=FileCopy::TempFileName(file);
if(temp==file)
return file;
auto_rename=true;
temp_file=true;
SetSuggestedFileName(basename_ptr(file));
return temp;
}
FileCopyPeer::FileCopyPeer(dir_t m) : IOBuffer(m)
{
want_size=false;
want_date=false;
start_transfer=true;
size=NO_SIZE_YET;
date=NO_DATE_YET;
e_size=NO_SIZE;
seek_pos=0;
can_seek=false;
can_seek0=false;
date_set=false;
do_set_date=true;
do_verify=true;
ascii=false;
range_start=0;
range_limit=FILE_END;
removing=false;
file_removed=false;
temp_file=false;
do_mkdir=false;
use_cache=true;
write_allowed=true;
done=false;
auto_rename=false;
Suspend(); // don't do anything too early
}
// FileCopyPeerFA implementation
#undef super
#define super FileCopyPeer
int FileCopyPeerFA::Do()
{
int m=STALL;
int res;
if(session->OpenMode()==FA::MAKE_DIR)
{
// doing mkdir
int res=session->Done();
if(res==FA::IN_PROGRESS)
return m;
if(res<0)
debug((3,"mkdir failed: %s\n",session->StrError(res)));
session->Close();
m=MOVED;
}
else if(session->OpenMode()==FA::RENAME)
{
int res=session->Done();
if(res==FA::IN_PROGRESS)
return m;
if(res<0) {
if(temp_file)
SetError(session->StrError(res));
else
debug((3,"rename failed: %s\n",session->StrError(res)));
}
session->Close();
done=true;
return MOVED;
}
if(do_mkdir) {
// do mkdir just once
do_mkdir=false;
assert(!session->IsOpen());
const xstring& dir=dirname(file);
if(dir.length()>0 && dir.ne("/") && dir.ne(".") && dir.ne("..")) {
// FIXME: .././.. should be also excluded
session->Mkdir(dirname(file),true);
return MOVED;
}
}
if(removing)
{
res=session->Done();
if(res<=0)
{
removing=false;
file_removed=true;
session->Close();
Suspend();
m=MOVED;
}
return m;
}
if(Done() || Error())
return m;
if(verify)
{
if(verify->Error())
{
SetError(verify->ErrorText());
m=MOVED;
}
else if(verify->Done())
{
if(suggested_filename && auto_rename)
{
const char *new_name=dir_file(dirname(file),suggested_filename);
bool clobber=temp_file;
session->Rename(file,new_name,clobber);
return MOVED;
}
done=true;
m=MOVED;
}
return m;
}
// if we need some info and cannot start the transfer (yet),
// then use ARRAY_INFO to fetch the file information.
if(((want_size && size==NO_SIZE_YET) || (want_date && date==NO_DATE_YET))
&& (mode==PUT || !start_transfer) && session->IsClosed())
{
FileInfo *fi=new FileInfo(file);
if(want_size)
fi->Need(fi->SIZE);
if(want_date)
fi->Need(fi->DATE);
info.Empty();
info.Add(fi);
session->GetInfoArray(&info);
m=MOVED;
}
if(session->OpenMode()==FA::ARRAY_INFO)
{
res=session->Done();
if(res==FA::IN_PROGRESS)
return m;
if(res<0)
{
session->Close();
SetSize(NO_SIZE);
SetDate(NO_DATE);
return MOVED;
}
FileInfo *fi=info[0];
if(want_size)
SetSize(fi->size);
if(want_date)
SetDate(fi->date);
session->Close();
return MOVED;
}
switch(mode)
{
case PUT:
if(fxp)
{
if(eof)
goto fxp_eof;
return m;
}
res=Put_LL(buffer+buffer_ptr,Size());
if(res>0)
{
buffer_ptr+=res;
m=MOVED;
}
else if(res<0)
return MOVED;
if(Size()==0)
{
if(eof)
{
if(date!=NO_DATE && date!=NO_DATE_YET)
session->SetDate(date);
if(e_size!=NO_SIZE && e_size!=NO_SIZE_YET)
session->SetSize(e_size);
res=session->StoreStatus();
if(res==FA::OK)
{
session->Close();
fxp_eof:
// FIXME: set date for real.
date_set=true;
if(!verify && do_verify)
verify=new FileVerificator(session,file);
else
done=true;
return MOVED;
}
else if(res==FA::IN_PROGRESS)
return m;
else
{
if(res==FA::DO_AGAIN)
return m;
if(res==FA::STORE_FAILED)
{
upload_state.Save(session);
session->Close();
if(can_seek && seek_pos>0)
Seek(FILE_END);
else
Seek(0);
return MOVED;
}
SetError(session->StrError(res));
return MOVED;
}
return m;
}
}
break;
case GET:
if(eof)
return m;
if(fxp)
return m;
res=TuneGetSize(Get_LL(get_size));
if(res>0)
{
EmbraceNewData(res);
SaveMaxCheck(0);
return MOVED;
}
if(res<0)
return MOVED;
if(eof)
{
session->Close();
return MOVED;
}
break;
}
return m;
}
bool FileCopyPeerFA::IOReady()
{
if(seek_pos==0)
return true;
if(seek_pos==FILE_END && size==NO_SIZE_YET)
return false;
return session->IOReady();
}
void FileCopyPeerFA::SuspendInternal()
{
if(fxp && mode==PUT)
return;
if(session->IsOpen())
session->SuspendSlave();
super::SuspendInternal();
}
void FileCopyPeerFA::ResumeInternal()
{
super::ResumeInternal();
session->ResumeSlave();
}
const char *FileCopyPeerFA::GetStatus()
{
if(verify)
return verify->Status();
if(!session->IsOpen())
return 0;
return session->CurrentStatus();
}
void FileCopyPeerFA::Seek(off_t new_pos)
{
if(pos==new_pos)
return;
super::Seek(new_pos);
session->Close();
if(seek_pos==FILE_END)
WantSize();
else
pos=new_pos;
}
void FileCopyPeerFA::OpenSession()
{
current->Timeout(0); // mark it MOVED.
if(mode==GET)
{
if(size!=NO_SIZE && size!=NO_SIZE_YET && !ascii
&& (seek_pos>size || (seek_pos==size && size>0)))
{
past_eof:
debug((10,"copy src: seek past eof (seek_pos=%lld, size=%lld)\n",
(long long)seek_pos,(long long)size));
pos=seek_pos;
eof=true;
return;
}
const char *b;
int s;
int err;
if(use_cache && FileAccess::cache->Find(session,file,FAmode,&err,&b,&s))
{
if(err)
{
SetError(b);
return;
}
size=s;
if(seek_pos>=s)
goto past_eof;
b+=seek_pos;
s-=seek_pos;
Save(0);
Put(b,s);
pos=seek_pos;
eof=true;
return;
}
}
else // mode==PUT
{
if(e_size>=0 && size>=0 && seek_pos>=e_size)
{
debug((10,"copy dst: seek past eof (seek_pos=%lld, size=%lld)\n",
(long long)seek_pos,(long long)e_size));
eof=true;
if(date==NO_DATE || date==NO_DATE_YET)
return;
}
}
session->Open(file,FAmode,seek_pos);
session->SetFileURL(orig_url);
session->SetLimit(range_limit);
if(mode==PUT) {
upload_state.Restore(session);
if(e_size!=NO_SIZE && e_size!=NO_SIZE_YET)
session->SetSize(e_size);
if(date!=NO_DATE && date!=NO_DATE_YET)
session->SetDate(date);
} else {
if(size!=NO_SIZE && size!=NO_SIZE_YET)
session->SetSize(size);
}
session->RereadManual();
if(ascii)
session->AsciiTransfer();
if(want_size && size==NO_SIZE_YET)
session->WantSize(&size);
if(want_date && (date==NO_DATE_YET || date.ts_prec>0))
session->WantDate(&date);
if(mode==GET)
SaveRollback(seek_pos);
else
pos=seek_pos+Size();
}
void FileCopyPeerFA::WantSize()
{
struct stat st;
if(!strcmp(session->GetProto(),"file")
&& stat(dir_file(session->GetCwd(),file),&st)!=-1)
SetSize(S_ISREG(st.st_mode)?st.st_size:NO_SIZE);
else
super::WantSize();
}
void FileCopyPeerFA::RemoveFile()
{
session->Open(file,FA::REMOVE);
removing=true;
}
int FileCopyPeerFA::Get_LL(int len)
{
if(get_delay>0)
{
if(!get_ll_timer.Stopped())
return 0;
session->ResumeSlave();
}
int res=0;
if(session->IsClosed())
OpenSession();
if(eof) // OpenSession can set eof=true.
return 0;
off_t io_at=pos;
if(GetRealPos()!=io_at) // GetRealPos can alter pos.
return 0;
res=session->Read(this,len);
if(res<0)
{
if(res==FA::DO_AGAIN)
return 0;
if(res==FA::FILE_MOVED)
{
// handle redirection.
assert(!fxp);
const char *loc_c=session->GetNewLocation();
int max_redirections=max_redir.Query(0);
if(loc_c && loc_c[0] && max_redirections>0)
{
Log::global->Format(3,_("copy: received redirection to `%s'\n"),loc_c);
if(++redirections>max_redirections)
{
SetError(_("Too many redirections"));
return -1;
}
if(FAmode==FA::QUOTE_CMD)
FAmode=FA::RETRIEVE;
xstring loc(loc_c);
session->Close(); // loc_c is no longer valid.
loc_c=0;
ParsedURL u(loc,true);
if(u.proto)
{
my_session=FileAccess::New(&u);
session=my_session;
file.set(u.path?u.path.get():"");
orig_url.set(loc);
}
else // !proto
{
if(orig_url)
{
int p_ind=url::path_index(orig_url);
const char *s=strrchr(orig_url,'/');
int s_ind=s?s-orig_url:-1;
if(p_ind==-1 || s_ind==-1 || s_indTimeout(0); // retry with new location.
return 0;
}
}
SetError(session->StrError(res));
return -1;
}
else if(res==0)
{
debug((10,"copy-peer: EOF on %s\n",session->GetFileURL(session->GetFile()).get()));
eof=true;
FileAccess::cache->Add(session,file,FAmode,FA::OK,this);
SetSuggestedFileName(session->GetSuggestedFileName());
session->Close();
}
else if(res<=MAX_READ_TO_DELAY)
{
if(get_delay<=MAX_DELAY-DELAY_STEP)
get_delay+=DELAY_STEP;
get_ll_timer.SetMicroSeconds(get_delay);
session->SuspendSlave();
}
else if(res>MAX_READ_TO_DELAY && get_delay>=DELAY_STEP)
get_delay-=DELAY_STEP;
return res;
}
int FileCopyPeerFA::Put_LL(const char *buf,int len)
{
if(do_mkdir)
return 0; // can't write yet
if(session->IsClosed())
OpenSession();
off_t io_at=pos; // GetRealPos can alter pos, save it.
if(GetRealPos()!=io_at)
return 0;
if(len==0 && eof)
return 0;
int res=session->Write(buf,len);
if(res<0)
{
if(res==FA::DO_AGAIN)
return 0;
if(res==FA::STORE_FAILED)
{
upload_state.Save(session);
session->Close();
if(can_seek && seek_pos>0)
Seek(FILE_END);
else
Seek(0);
return 0;
}
SetError(session->StrError(res));
return -1;
}
seek_pos+=res; // mainly to indicate that there was some output.
return res;
}
int FileCopyPeerFA::PutEOF_LL()
{
if(mode==GET && session)
session->Close();
return 0;
}
off_t FileCopyPeerFA::GetRealPos()
{
if(session->OpenMode()!=FAmode || fxp)
return pos;
if(mode==PUT)
{
if(pos-Size()!=session->GetPos())
{
Empty();
can_seek=false;
pos=session->GetPos();
}
}
else
{
if(eof)
return pos;
if(session->GetRealPos()==0 && session->GetPos()>0)
{
can_seek=false;
session->SeekReal();
}
if(pos+Size()!=session->GetPos())
SaveRollback(session->GetPos());
}
return pos;
}
void FileCopyPeerFA::Init()
{
get_delay=0;
fxp=false;
redirections=0;
can_seek=true;
can_seek0=true;
if(FAmode==FA::LIST || FAmode==FA::LONG_LIST)
Save(FileAccess::cache->SizeLimit());
if(mode==PUT)
file.set(UseTempFile(file));
}
FileCopyPeerFA::FileCopyPeerFA(FileAccess *s,const char *f,int m)
: FileCopyPeer(m==FA::STORE ? PUT : GET), file(f),
my_session(s), session(my_session), FAmode(m)
{
Init();
}
FileCopyPeerFA::FileCopyPeerFA(const FileAccessRef& s,const char *f,int m)
: FileCopyPeer(m==FA::STORE ? PUT : GET), file(f),
session(s), FAmode(m)
{
Init();
}
FileCopyPeerFA::FileCopyPeerFA(const ParsedURL *u,int m)
: FileCopyPeer(m==FA::STORE ? PUT : GET), file(u->path), orig_url(u->orig_url),
my_session(FileAccess::New(u)), session(my_session), FAmode(m)
{
Init();
if(!file)
SetError(_("file name missed in URL"));
}
void FileCopyPeerFA::PrepareToDie()
{
if(session)
session->Close();
}
FileCopyPeerFA::~FileCopyPeerFA() {}
FileCopyPeerFA *FileCopyPeerFA::New(FileAccess *s,const char *url,int m)
{
ParsedURL u(url,true);
if(u.proto)
{
SessionPool::Reuse(s);
return new FileCopyPeerFA(&u,m);
}
return new FileCopyPeerFA(s,url,m);
}
FileCopyPeerFA *FileCopyPeerFA::New(const FileAccessRef& s,const char *url,int m)
{
ParsedURL u(url,true);
if(u.proto)
return new FileCopyPeerFA(&u,m);
return new FileCopyPeerFA(s,url,m);
}
FileCopyPeer *FileCopyPeerFA::Clone()
{
FileCopyPeerFA *c=new FileCopyPeerFA(session->Clone(),file,FAmode);
c->orig_url.set(orig_url);
return c;
}
// FileCopyPeerFDStream
#undef super
#define super FileCopyPeer
FileCopyPeerFDStream::FileCopyPeerFDStream(FDStream *o,dir_t m)
: FileCopyPeer(m), my_stream(o?o:new FDStream(1,"")), stream(my_stream), close_when_done(o!=0)
{
Init();
}
FileCopyPeerFDStream::FileCopyPeerFDStream(const Ref& o,dir_t m)
: FileCopyPeer(m), stream(o), close_when_done(false)
{
Init();
}
void FileCopyPeerFDStream::Init()
{
seek_base=0;
create_fg_data=true;
need_seek=false;
can_seek = can_seek0 = stream->can_seek();
if(can_seek && stream->fd!=-1)
{
seek_base=lseek(stream->fd,0,SEEK_CUR);
if(seek_base==-1)
{
can_seek=false;
can_seek0=false;
seek_base=0;
}
}
if(stream->usesfd(1))
write_allowed=false;
if(mode==PUT)
put_ll_timer=new Timer(0,200);
if(mode==PUT && stream->fd==-1 && stream->can_setmtime())
stream->full_name.set(UseTempFile(stream->full_name));
}
void FileCopyPeerFDStream::Seek_LL()
{
int fd=stream->fd;
assert(fd!=-1);
if(CanSeek(seek_pos))
{
if(seek_pos==FILE_END)
{
seek_pos=lseek(fd,0,SEEK_END);
if(seek_pos==-1)
{
can_seek=false;
can_seek0=false;
seek_pos=0;
}
else
{
SetSize(seek_pos);
if(seek_pos>seek_base)
seek_pos-=seek_base;
else
seek_pos=0;
}
pos=seek_pos;
}
else
{
if(lseek(fd,seek_pos+seek_base,SEEK_SET)==-1)
{
can_seek=false;
can_seek0=false;
seek_pos=0;
}
pos=seek_pos;
}
if(mode==PUT)
pos+=Size();
}
else
{
seek_pos=pos;
}
}
int FileCopyPeerFDStream::getfd()
{
if(!stream)
return -1;
if(stream->fd!=-1)
return stream->fd;
int fd=stream->getfd();
if(fd==-1)
{
if(stream->error())
{
SetError(stream->error_text);
current->Timeout(0);
}
else
{
current->TimeoutS(1);
}
return -1;
}
stream->clear_status();
pos=0;
if(mode==PUT)
pos+=Size();
Seek_LL();
return fd;
}
int FileCopyPeerFDStream::Do()
{
int m=STALL;
if(Done() || Error())
return m;
if(do_mkdir) {
do_mkdir=false;
create_directories(dirname(stream->full_name).get_non_const());
}
if(verify)
{
if(verify->Error())
{
SetError(verify->ErrorText());
m=MOVED;
}
else if(verify->Done())
{
if(suggested_filename && stream && stream->full_name && auto_rename)
{
const char *new_name=dir_file(dirname(stream->full_name),suggested_filename);
struct stat st;
if(temp_file || (lstat(new_name,&st)==-1 && errno==ENOENT) || ResMgr::QueryBool("xfer:clobber",0)) {
debug((5,"copy: renaming `%s' to `%s'\n",stream->full_name.get(),suggested_filename.get()));
int res=rename(stream->full_name,new_name);
if(res==-1 && errno==EIO) {
// FUSE with HadoopFS workaround
unlink(new_name);
res=rename(stream->full_name,new_name);
}
if(res==-1) {
const char *err=xstring::format("rename(%s, %s): %s\n",stream->full_name.get(),new_name,strerror(errno));
if(temp_file)
SetError(err);
else
debug((3,"%s\n",err));
}
}
}
done=true;
m=MOVED;
}
return m;
}
bool check_min_size=true;
#ifndef NATIVE_CRLF
if(ascii)
check_min_size=false;
#endif
int res;
switch(mode)
{
case PUT:
if(Size()==0)
{
if(eof)
{
// make sure the stream is open - it may create an empty file.
if(stream && !stream->is_closed() && getfd()==-1)
return m;
if(!date_set && date!=NO_DATE && do_set_date)
{
if(date==NO_DATE_YET)
return m;
stream->setmtime(date);
date_set=true;
m=MOVED;
}
if(stream && close_when_done && !stream->Done())
return m;
if(!verify && do_verify)
verify=new FileVerificator(stream);
else
done=true;
return MOVED;
}
if(seek_pos==0)
return m;
}
if(!write_allowed)
return m;
if(getfd()==-1)
return m;
if(check_min_size && !eof && Size()Stopped())
break;
res=Put_LL(buffer+buffer_ptr,Size());
if(res>0)
buffer_ptr+=res;
if(res!=0)
m=MOVED;
break;
case GET:
if(eof)
return m;
res=TuneGetSize(Get_LL(get_size));
if(res>0)
{
EmbraceNewData(res);
SaveMaxCheck(0);
}
if(res!=0 || eof)
m=MOVED;
break;
}
return m;
}
bool FileCopyPeerFDStream::IOReady()
{
return seek_pos==pos || stream->fd!=-1;
}
void FileCopyPeerFDStream::Seek(off_t new_pos)
{
if(pos==new_pos)
return;
#ifndef NATIVE_CRLF
if(ascii && new_pos!=0)
{
// it is possible to read file to determine right position,
// but it is costly.
can_seek=false;
// can_seek0 is still true.
return;
}
#endif
super::Seek(new_pos);
int fd=stream->fd;
if(fd==-1)
{
if(seek_pos!=FILE_END)
{
pos=seek_pos;
if(mode==PUT)
pos+=Size();
return;
}
else
{
off_t s=stream->get_size();
if(s!=-1)
{
SetSize(s);
pos=seek_pos+((mode==PUT)?Size():0);
return;
}
else
{
// ok, have to try getfd.
fd=getfd();
}
}
if(fd==-1)
return;
}
Seek_LL();
}
int FileCopyPeerFDStream::Get_LL(int len)
{
int res=0;
int fd=getfd();
if(fd==-1)
return 0;
if((want_date && date==NO_DATE_YET)
|| (want_size && size==NO_SIZE_YET))
{
struct stat st;
if(fstat(fd,&st)==-1)
{
SetDate(NO_DATE);
SetSize(NO_SIZE);
}
else
{
SetDate(st.st_mtime);
SetSize(S_ISREG(st.st_mode)?st.st_size:NO_SIZE);
#ifndef NATIVE_CRLF
if(ascii)
SetSize(NO_SIZE);
#endif
}
}
if(need_seek) // this does not combine with ascii.
lseek(fd,seek_base+pos,SEEK_SET);
char *p=GetSpace(ascii?len*2:len);
res=read(fd,p,len);
if(res==-1)
{
if(E_RETRY(errno))
{
Block(fd,POLLIN);
return 0;
}
if(stream->NonFatalError(errno))
return 0;
stream->MakeErrorText();
SetError(stream->error_text);
return -1;
}
stream->clear_status();
#ifndef NATIVE_CRLF
if(ascii)
{
for(int i=res; i>0; i--)
{
if(*p=='\n')
{
memmove(p+1,p,i);
*p++='\r';
res++;
}
p++;
}
}
#endif
if(res==0) {
debug((10,"copy-peer: EOF on FD %d\n",fd));
eof=true;
}
return res;
}
int FileCopyPeerFDStream::Put_LL(const char *buf,int len)
{
if(len==0)
return 0;
int fd=getfd();
if(fd==-1)
return 0;
int skip_cr=0;
#ifndef NATIVE_CRLF
if(ascii)
{
// find where line ends.
const char *cr=buf;
for(;;)
{
cr=(const char *)memchr(cr,'\r',len-(cr-buf));
if(!cr)
break;
if(cr-bufNonFatalError(errno))
{
// in case of full disk, check file correctness.
if(errno==ENOSPC && can_seek)
{
struct stat st;
if(fstat(fd,&st)!=-1)
{
if(st.st_size=seek_base+pos-Size()-buffer_ptr-st.st_size)
UnSkip(seek_base+pos-Size()-st.st_size);
else
{
Empty();
pos=st.st_size;
}
}
}
}
return 0;
}
stream->MakeErrorText();
SetError(stream->error_text);
return -1;
}
stream->clear_status();
if(res==len && skip_cr)
{
res+=skip_cr;
// performance gets worse because of writing a single char,
// but leaving uncomplete line on screen allows mixing it with debug text.
if(write(fd,"\n",1)==1)
res+=1;
}
if(put_ll_timer)
put_ll_timer->Reset();
return res;
}
FgData *FileCopyPeerFDStream::GetFgData(bool fg)
{
if(!my_stream || !create_fg_data)
return 0; // if we don't own the stream, don't create FgData.
if(stream->GetProcGroup())
return new FgData(stream->GetProcGroup(),fg);
return 0;
}
void FileCopyPeerFDStream::WantSize()
{
struct stat st;
int res=-1;
if(stream->fd!=-1)
res=fstat(stream->fd,&st);
else if(stream->full_name)
res=stat(stream->full_name,&st);
if(res!=-1)
SetSize(S_ISREG(st.st_mode)?st.st_size:NO_SIZE);
else
super::WantSize();
}
void FileCopyPeerFDStream::RemoveFile()
{
stream->remove();
removing=false; // it is instant.
file_removed=true;
Suspend();
current->Timeout(0);
}
const char *FileCopyPeerFDStream::GetStatus()
{
if(verify)
return verify->Status();
return stream->status;
}
void FileCopyPeerFDStream::Kill(int sig)
{
stream->Kill(sig);
}
FileCopyPeerFDStream *FileCopyPeerFDStream::NewPut(const char *file,bool cont)
{
int flags=O_WRONLY|O_CREAT;
if(!cont) {
flags|=O_TRUNC;
if(!ResMgr::QueryBool("xfer:clobber",0))
flags|=O_EXCL;
}
return new FileCopyPeerFDStream(new FileStream(file,flags),
FileCopyPeer::PUT);
}
FileCopyPeerFDStream *FileCopyPeerFDStream::NewGet(const char *file)
{
return new FileCopyPeerFDStream(new FileStream(file,O_RDONLY),
FileCopyPeer::GET);
}
FileCopyPeer *FileCopyPeerFDStream::Clone()
{
NeedSeek();
FileCopyPeerFDStream *peer=new FileCopyPeerFDStream(stream,mode);
peer->NeedSeek();
peer->SetBase(0);
return peer;
}
// FileCopyPeerDirList
FileCopyPeerDirList::FileCopyPeerDirList(FA *s,ArgV *v)
: FileCopyPeer(GET), session(s)
{
dl=session->MakeDirList(v);
if(dl==0)
eof=true;
can_seek=false;
can_seek0=false;
}
int FileCopyPeerDirList::Do()
{
if(Done())
return STALL;
if(dl->Error())
{
SetError(dl->ErrorText());
return MOVED;
}
const char *b;
int s;
dl->Get(&b,&s);
if(b==0) // eof
{
eof=true;
return MOVED;
}
if(s==0)
return STALL;
memcpy(GetSpace(s),b,s);
SpaceAdd(s);
dl->Skip(s);
return MOVED;
}
// FileCopyPeerMemory
int FileCopyPeerMemory::Do()
{
int m=STALL;
if(mode==PUT) {
max_buf=max_size+1;
if(Size()>max_size) {
SetError("buffer limit exceeded");
broken=true;
return MOVED;
}
}
return m;
}
// FileVerificator
void FileVerificator::Init0()
{
done=false;
if(!ResMgr::QueryBool("xfer:verify",0)
|| ResMgr::Query("xfer:verify-command",0).is_empty())
done=true;
}
void FileVerificator::InitVerify(const char *f)
{
if(done)
return;
ArgV *args=new ArgV(ResMgr::Query("xfer:verify-command",0));
args->Append(f);
Log::global->Format(9,"running %s %s\n",args->a0(),f);
verify_process=new InputFilter(args);
verify_process->StderrToStdout();
verify_buffer=new IOBufferFDStream(verify_process.Cast(),IOBuffer::GET);
}
FileVerificator::FileVerificator(const char *f)
{
Init0();
InitVerify(f);
}
FileVerificator::FileVerificator(const FDStream *stream)
{
Init0();
if(done)
return;
const char *f=stream->full_name;
if(!f)
{
done=true;
return;
}
const char *cwd=stream->GetCwd();
int cwd_len=xstrlen(cwd);
if(cwd && cwd_len>0 && !strncmp(f,cwd,cwd_len))
{
f+=cwd_len;
while(*f=='/')
f++;
if(*f==0)
f=".";
}
InitVerify(f);
if(verify_process)
{
verify_process->SetProcGroup(stream->GetProcGroup());
verify_process->SetCwd(cwd);
}
}
FileVerificator::FileVerificator(const FileAccess *session,const char *f)
{
Init0();
if(done)
return;
if(strcmp(session->GetProto(),"file"))
{
done=true;
return;
}
InitVerify(f);
verify_process->SetCwd(session->GetCwd());
}
FileVerificator::~FileVerificator() {}
int FileVerificator::Do()
{
int m=STALL;
if(done)
return m;
verify_process->Kill(SIGCONT);
if(!verify_buffer->Eof())
return m;
if(verify_process->GetProcState()!=ProcWait::TERMINATED)
return m;
done=true;
m=MOVED;
if(verify_process->GetProcExitCode()!=0)
{
error_text.set(verify_buffer->Get());
error_text.rtrim('\n');
if(error_text.length()==0)
error_text.set(_("Verify command failed without a message"));
const char *nl=strrchr(error_text,'\n');
if(nl)
error_text.set(nl+1);
}
return m;
}
// special pointer to creator of ftp/ftp copier. It is init'ed in Ftp class.
FileCopy *(*FileCopy::fxp_create)(FileCopyPeer *src,FileCopyPeer *dst,bool cont);