/*
* lftp - file transfer program
*
* Copyright (c) 1996-2013 by Alexander V. Lukyanov (lav@yars.free.net)
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
/* Usage notes:
*
* Call PreFilter() to add a filter to the beginning of the chain; these
* filters are initialized only once for all data. For example,
* PreFilter("wc -l")
*
*/
/*
* Implementation notes:
*
* Background things we can't get around:
* We must buffer (via FileCopy) output to a filter, since it might block.
*
* We must buffer the output from the filter to an output FileCopyPeer (ie.
* a URL), for the same reason.
*
* So, we're stuck with having up to two FileCopy's. (One to send, one to filter.)
*
* In some cases, we only need one: if the output is an FD, the filter can
* hook up directly and we can forget about that stage.
*
* In the case where we're outputting to a URL, we set up a FileCopy from a
* pipe to the URL, and then pretend we're just outputting to an FD (the
* pipe.) This means in the simple case of having no filters at all, writing
* to a URL or file, we send the data an extra time through a FileCopy and a
* pipe. That's a bit inefficient, but that's "cat file1 > file2"; that's
* normally done with "get file1 -o file2", so this shouldn't happen often.
*
* It's very important that if the output is stdout, any filters point directly
* at it, not through an extra copy: a pager, for example, will expect the output
* to be a TTY.
*
*/
#include
#include "OutputJob.h"
#include "ArgV.h"
#include "FileCopy.h"
#include "CopyJob.h"
#include "url.h"
#include "misc.h"
#include "StatusLine.h"
#include "DummyProto.h"
#include
#include
#include
#include
#include
#include
#define super Job
void OutputJob::InitCopy()
{
if(error)
return;
if(initialized)
return;
if(fa)
{
/* Set up a pipe sending data at the peer, so we can act like the FDStream
* constructor. */
int filter_pipe[2];
if(pipe(filter_pipe) == -1) {
// retry later
current->TimeoutS(1);
return;
}
FileCopyPeerFA *dst_peer = FileCopyPeerFA::New(fa.borrow(), fa_path, FA::STORE);
/* Status only for remote outputs. */
if(!strcmp(dst_peer->GetProto(), "file"))
no_status=true;
fcntl(filter_pipe[0],F_SETFL,O_NONBLOCK);
fcntl(filter_pipe[1],F_SETFL,O_NONBLOCK);
/* The output of the pipe (0) goes to the output FileCopy. */
FDStream *pipe_output = new FDStream(filter_pipe[0],"");
FileCopy *output_fc=FileCopy::New(new FileCopyPeerFDStream(pipe_output, FileCopyPeer::GET), dst_peer,false);
output=new CopyJob(output_fc, fa_path, a0);
output->NoStatus(no_status);
output_fd=new FDStream(filter_pipe[1],"");
pipe_output->CloseWhenDone();
output_fd->CloseWhenDone();
fa_path.set(0);
}
initialized=true;
if(Error())
return;
/* Clear the statusline, since we might change the pgrp if we create filters. */
ClearStatus();
/* Some legitimate uses produce broken pipe condition (cat|head).
* We still want to produce broken pipe if we're not piping, eg
* cat > pipe. */
if(IsFiltered())
fail_if_broken=false;
if(filter)
{
/* Create the global filter: */
OutputFilter *global = new OutputFilter(filter, output_fd.borrow());
output_fd=global;
}
/* Use a FileCopy to buffer our output to the filter: */
FileCopyPeerFDStream *out = new FileCopyPeerFDStream(output_fd.borrow(), FileCopyPeer::PUT);
FileCopy *input_fc = FileCopy::New(new FileCopyPeer(FileCopyPeer::GET), out, false);
if(!fail_if_broken)
input_fc->DontFailIfBroken();
input=new CopyJob(input_fc, xstring::format(_("%s (filter)"),a0.get()), filter?filter:a0);
if(!output)
output=input;
input->SetParentFg(this);
InputPeer()->SetDate(NO_DATE);
InputPeer()->SetSize(NO_SIZE);
input->GetCopy()->DontCopyDate();
input->NoStatus();
if(input != output)
{
output->SetParentFg(this);
OutputPeer()->SetDate(NO_DATE);
OutputPeer()->SetSize(NO_SIZE);
output->GetCopy()->DontCopyDate();
output->NoStatus();
}
if(is_stdout)
{
output->ClearStatusOnWrite();
output->GetCopy()->LineBuffered();
}
Timeout(0);
}
void OutputJob::Init(const char *_a0)
{
input=output=0;
initialized=false;
error=false;
no_status=false;
a0.set(_a0);
is_stdout=false;
fail_if_broken=true;
is_a_tty=false;
width=-1;
statusbar_redisplay=true;
}
/* Local (fd) output. */
OutputJob::OutputJob(FDStream *output_, const char *a0)
: output_fd(output_ ? output_ : new FDStream(1,""))
{
Init(a0);
if(output_)
{
/* We don't want to produce broken pipe when we're actually
* piping, since some legitimate uses produce broken pipe, eg
* cat|head. However, that's actually handled in InitCopy().
* User pipes aren't handled by us yet: instead of being set with
* SetFilter, they're being set up ahead of time and passed to
* us as an FDStream, so we don't really know if we're being filtered.
*
* So, until we handle pipes directly, disable broken pipe whenever
* we're being sent anywhere but stdout. */
fail_if_broken=false;
}
is_stdout=output_fd->usesfd(1);
is_a_tty=isatty(output_fd->fd);
width=fd_width(output_fd->fd);
/* We don't output status when outputting locally. */
no_status=true;
/* Make sure that if the output is going to fail, it fails early, so
* the parent doesn't start anything expensive (like begin downloading
* a file.) */
if(output_fd->getfd()==-1 && output_fd->error())
{
eprintf("%s: %s\n", a0, output_fd->error_text.get());
error=true;
}
}
OutputJob::OutputJob(const char *path, const char *a0, FileAccess *fa0)
: fa(fa0 ? fa0->Clone() : FileAccess::New("file")), fa_path(path)
{
Init(a0);
}
void OutputJob::PrepareToDie()
{
Bg();
AcceptSig(SIGTERM);
Delete(input);
if(input != output)
Delete(output);
input=0;
output=0;
super::PrepareToDie();
}
/* This is called to ask us "permission" to display a status line. */
bool OutputJob::ShowStatusLine(const SMTaskRef& s)
{
/* If our output file is gone, or isn't stdout, we don't care. */
if(!output || !is_stdout)
return true;
/* If we're filtered, we never display at all. (We don't know anything about
* the filter's output; the only time we do is when we're outputting to a URL
* or a file, and that doesn't apply here.) */
if(IsFiltered())
return false;
/* If we're not line buffered, display only if the output CopyJob says to. */
if(!output->GetCopy()->IsLineBuffered())
return output->HasStatus();
/* We're line buffered, so we can output a status line without stomping
* on a partially output line.
*
* If we've output something recently, only send the output to the title,
* to avoid flickering status for no reason.
*/
if(!update_timer.Stopped()) {
s->NextUpdateTitleOnly();
return true;
}
/* If we're not reenabling the status bar, and the statusbar has
* been turned off (due to output being reenabled), only send to
* the title. */
if(!statusbar_redisplay && output->GetCopy()->WriteAllowed())
{
s->NextUpdateTitleOnly();
return true;
}
/* Don't disable write if there are data to be written in buffer */
if(output->GetCopy()->WriteAllowed() && output->GetCopy()->WritePending())
return false;
/* There hasn't been output in a while. Stop the output again,
* so the FileCopy will clear the StatusLine when there's more data. */
output->GetCopy()->AllowWrite(false);
return true;
}
/* Get our contribution to the status line, which is just the output
* status, if any. Input status is the job of the user object. */
const char *OutputJob::Status(const StatusLine *s)
{
if(no_status)
return "";
/* Never show anything if we havn't even received any data yet; it won't
* start connecting until then, so it's not interesting. */
if(!initialized)
return "";
/* Use the status from the output CopyJob. It'll be the one that's connecting
* to a host, if applicable. */
return output->Status(s,true);
}
void OutputJob::PutEOF()
{
if(Error())
return;
/* Make sure we've sent at least one (empty) block. This ensures
* that we always start the input->output code path. */
Put("", 0);
/* Send an EOF to the input peer; it'll send an EOF to the output peer
* when all of its data is actually sent. */
if(InputPeer())
InputPeer()->PutEOF();
else if(tmp_buf)
tmp_buf->PutEOF();
else
abort();
}
/* add a filter to the beginning of the list */
void OutputJob::PreFilter(const char *newfilter)
{
if(!filter)
filter.set(newfilter);
else
{
char *old_filter=alloca_strdup(filter);
filter.vset(newfilter," | ",old_filter,NULL);
}
}
/* Return the width of the output. If there's a filter, we can either
* return -1 (we might be piping through "sed", changing the width),
* or the width we know (which is sane for most pagers.) I'm not sure
* which is better. */
int OutputJob::GetWidth() const
{
if(IsFiltered())
return -1;
return width;
}
/* Return true if the output is going directly to a TTY. */
bool OutputJob::IsTTY() const
{
if(IsFiltered())
return false;
return is_a_tty;
}
static const SMTaskRef null_peer;
/* Get the input FileCopyPeer; this is the buffer we write to. */
const SMTaskRef& OutputJob::InputPeer() const
{
if(input)
return input->GetGet();
return null_peer;
}
/* Get the output FileCopyPeer (the FileCopyPeer that's doing the final output). */
const SMTaskRef& OutputJob::OutputPeer() const
{
if(output)
return output->GetPut();
return null_peer;
}
/* We're done if the output is finished, or on error. */
int OutputJob::Done()
{
if(Error())
return true;
if(!initialized)
return false;
if(input && !input->Done())
return false;
if(output && !output->Done())
return false;
return true;
}
int OutputJob::Do()
{
if(!initialized && tmp_buf)
InitCopy();
return STALL;
}
/* Don't register errors until they're actually printed by
* the sub-job (ie. it's also Done()). */
bool OutputJob::Error()
{
if(error)
return true;
if(input && input->Error() && input->Done())
error=true;
if(output && input != output && output->Error() && output->Done())
error=true;
return error;
}
void OutputJob::Fg()
{
super::Fg();
if(input)
input->Fg();
if(output && input != output)
output->Fg();
}
void OutputJob::Bg()
{
if(output && input != output)
output->Bg();
if(input)
input->Bg();
super::Bg();
}
void OutputJob::SuspendInternal()
{
super::SuspendInternal();
if(input)
input->SuspendSlave();
if(output && input != output)
output->SuspendSlave();
}
void OutputJob::ResumeInternal()
{
if(input)
input->ResumeSlave();
if(output && input != output)
output->ResumeSlave();
super::ResumeInternal();
}
bool OutputJob::Full()
{
/* It'd be nicer to just check copy->GetGet()->IsSuspended(), since
* the FileCopy will suspend the Get end if the Put end gets filled.
* However, it won't do that until it actually tries to send something. */
int size = 0;
if(input)
{
if(input->GetPut())
size += input->GetPut()->Buffered();
if(input->GetGet())
size += input->GetGet()->Buffered();
if(input != output)
{
if(output->GetPut())
size += output->GetPut()->Buffered();
if(output->GetGet())
size += output->GetGet()->Buffered();
}
}
if(tmp_buf)
size += tmp_buf->Size();
return size >= 0x10000;
}
/* We'll actually go over the buffer limit here; that's OK; it's not a
* strict value. (It's not convenient to prevent that completely with
* Format(), either.) */
void OutputJob::Put(const char *buf,int size)
{
InitCopy();
if(Error())
return;
if(!InputPeer())
{
if(!tmp_buf)
tmp_buf=new Buffer;
tmp_buf->Put(buf,size);
return;
}
// InputPeer was inited, flush tmp_buf.
if(InputPeer() && tmp_buf)
{
Ref saved_buf(tmp_buf.borrow());
const char *b=0;
int s=0;
saved_buf->Get(&b,&s);
if(b && s>0)
Put(b,s);
if(saved_buf->Eof())
PutEOF();
}
update_timer.SetResource("cmd:status-interval",0);
off_t oldpos = InputPeer()->GetPos();
InputPeer()->Put(buf, size);
InputPeer()->SetPos(oldpos);
}
void OutputJob::Format(const char *f,...)
{
va_list v;
va_start(v,f);
const xstring& str=xstring::vformat(f,v);
va_end(v);
Put(str);
}
/* Propagate signals down to our child processes. */
int OutputJob::AcceptSig(int sig)
{
int m=MOVED;
if(sig == SIGTERM || sig == SIGINT)
m=WANTDIE;
/* If we have an input copier right now, it'll contain the top filter
* (which is linked to all other filters), so send it the signal. */
if(input)
input->AcceptSig(sig);
/* Otherwise, the only filters we have running are in output_fd. */
else if(output_fd)
output_fd->Kill(sig);
if(sig!=SIGCONT)
AcceptSig(SIGCONT);
return m;
}