Blame src/OutputJob.cc

Packit 8f70b4
/*
Packit 8f70b4
 * lftp - file transfer program
Packit 8f70b4
 *
Packit 8f70b4
 * Copyright (c) 1996-2013 by Alexander V. Lukyanov (lav@yars.free.net)
Packit 8f70b4
 *
Packit 8f70b4
 * This program is free software; you can redistribute it and/or modify
Packit 8f70b4
 * it under the terms of the GNU General Public License as published by
Packit 8f70b4
 * the Free Software Foundation; either version 3 of the License, or
Packit 8f70b4
 * (at your option) any later version.
Packit 8f70b4
 *
Packit 8f70b4
 * This program is distributed in the hope that it will be useful,
Packit 8f70b4
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit 8f70b4
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit 8f70b4
 * GNU General Public License for more details.
Packit 8f70b4
 *
Packit 8f70b4
 * You should have received a copy of the GNU General Public License
Packit 8f70b4
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
Packit 8f70b4
 */
Packit 8f70b4
Packit 8f70b4
/* Usage notes:
Packit 8f70b4
 *
Packit 8f70b4
 * Call PreFilter() to add a filter to the beginning of the chain; these
Packit 8f70b4
 * filters are initialized only once for all data.  For example,
Packit 8f70b4
 * PreFilter("wc -l")
Packit 8f70b4
 *
Packit 8f70b4
 */
Packit 8f70b4
Packit 8f70b4
/*
Packit 8f70b4
 * Implementation notes:
Packit 8f70b4
 *
Packit 8f70b4
 * Background things we can't get around:
Packit 8f70b4
 * We must buffer (via FileCopy) output to a filter, since it might block.
Packit 8f70b4
 *
Packit 8f70b4
 * We must buffer the output from the filter to an output FileCopyPeer (ie.
Packit 8f70b4
 * a URL), for the same reason.
Packit 8f70b4
 *
Packit 8f70b4
 * So, we're stuck with having up to two FileCopy's.  (One to send, one to filter.)
Packit 8f70b4
 *
Packit 8f70b4
 * In some cases, we only need one: if the output is an FD, the filter can
Packit 8f70b4
 * hook up directly and we can forget about that stage.
Packit 8f70b4
 *
Packit 8f70b4
 * In the case where we're outputting to a URL, we set up a FileCopy from a
Packit 8f70b4
 * pipe to the URL, and then pretend we're just outputting to an FD (the
Packit 8f70b4
 * pipe.) This means in the simple case of having no filters at all, writing
Packit 8f70b4
 * to a URL or file, we send the data an extra time through a FileCopy and a
Packit 8f70b4
 * pipe.  That's a bit inefficient, but that's "cat file1 > file2"; that's
Packit 8f70b4
 * normally done with "get file1 -o file2", so this shouldn't happen often.
Packit 8f70b4
 *
Packit 8f70b4
 * It's very important that if the output is stdout, any filters point directly
Packit 8f70b4
 * at it, not through an extra copy: a pager, for example, will expect the output
Packit 8f70b4
 * to be a TTY.
Packit 8f70b4
 *
Packit 8f70b4
 */
Packit 8f70b4
#include <config.h>
Packit 8f70b4
Packit 8f70b4
#include "OutputJob.h"
Packit 8f70b4
#include "ArgV.h"
Packit 8f70b4
#include "FileCopy.h"
Packit 8f70b4
#include "CopyJob.h"
Packit 8f70b4
#include "url.h"
Packit 8f70b4
#include "misc.h"
Packit 8f70b4
#include "StatusLine.h"
Packit 8f70b4
#include "DummyProto.h"
Packit 8f70b4
Packit 8f70b4
#include <assert.h>
Packit 8f70b4
#include <unistd.h>
Packit 8f70b4
#include <errno.h>
Packit 8f70b4
#include <sys/ioctl.h>
Packit 8f70b4
#include <fcntl.h>
Packit 8f70b4
#include <stddef.h>
Packit 8f70b4
Packit 8f70b4
#define super Job
Packit 8f70b4
Packit 8f70b4
void OutputJob::InitCopy()
Packit 8f70b4
{
Packit 8f70b4
   if(error)
Packit 8f70b4
      return;
Packit 8f70b4
Packit 8f70b4
   if(initialized)
Packit 8f70b4
      return;
Packit 8f70b4
Packit 8f70b4
   if(fa)
Packit 8f70b4
   {
Packit 8f70b4
      /* Set up a pipe sending data at the peer, so we can act like the FDStream
Packit 8f70b4
       * constructor. */
Packit 8f70b4
      int filter_pipe[2];
Packit 8f70b4
Packit 8f70b4
      if(pipe(filter_pipe) == -1) {
Packit 8f70b4
	 // retry later
Packit 8f70b4
	 current->TimeoutS(1);
Packit 8f70b4
	 return;
Packit 8f70b4
      }
Packit 8f70b4
Packit 8f70b4
      FileCopyPeerFA *dst_peer = FileCopyPeerFA::New(fa.borrow(), fa_path, FA::STORE);
Packit 8f70b4
Packit 8f70b4
      /* Status only for remote outputs. */
Packit 8f70b4
      if(!strcmp(dst_peer->GetProto(), "file"))
Packit 8f70b4
	 no_status=true;
Packit 8f70b4
Packit 8f70b4
      fcntl(filter_pipe[0],F_SETFL,O_NONBLOCK);
Packit 8f70b4
      fcntl(filter_pipe[1],F_SETFL,O_NONBLOCK);
Packit 8f70b4
Packit 8f70b4
      /* The output of the pipe (0) goes to the output FileCopy. */
Packit 8f70b4
      FDStream *pipe_output = new FDStream(filter_pipe[0],"<filter-out>");
Packit 8f70b4
Packit 8f70b4
      FileCopy *output_fc=FileCopy::New(new FileCopyPeerFDStream(pipe_output, FileCopyPeer::GET), dst_peer,false);
Packit 8f70b4
      output=new CopyJob(output_fc, fa_path, a0);
Packit 8f70b4
      output->NoStatus(no_status);
Packit 8f70b4
      output_fd=new FDStream(filter_pipe[1],"<filter-in>");
Packit 8f70b4
Packit 8f70b4
      pipe_output->CloseWhenDone();
Packit 8f70b4
      output_fd->CloseWhenDone();
Packit 8f70b4
Packit 8f70b4
      fa_path.set(0);
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   initialized=true;
Packit 8f70b4
Packit 8f70b4
   if(Error())
Packit 8f70b4
      return;
Packit 8f70b4
Packit 8f70b4
   /* Clear the statusline, since we might change the pgrp if we create filters. */
Packit 8f70b4
   ClearStatus();
Packit 8f70b4
Packit 8f70b4
   /* Some legitimate uses produce broken pipe condition (cat|head).
Packit 8f70b4
    * We still want to produce broken pipe if we're not piping, eg
Packit 8f70b4
    * cat > pipe. */
Packit 8f70b4
   if(IsFiltered())
Packit 8f70b4
      fail_if_broken=false;
Packit 8f70b4
Packit 8f70b4
   if(filter)
Packit 8f70b4
   {
Packit 8f70b4
      /* Create the global filter: */
Packit 8f70b4
      OutputFilter *global = new OutputFilter(filter, output_fd.borrow());
Packit 8f70b4
      output_fd=global;
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   /* Use a FileCopy to buffer our output to the filter: */
Packit 8f70b4
   FileCopyPeerFDStream *out = new FileCopyPeerFDStream(output_fd.borrow(), FileCopyPeer::PUT);
Packit 8f70b4
   FileCopy *input_fc = FileCopy::New(new FileCopyPeer(FileCopyPeer::GET), out, false);
Packit 8f70b4
Packit 8f70b4
   if(!fail_if_broken)
Packit 8f70b4
      input_fc->DontFailIfBroken();
Packit 8f70b4
Packit 8f70b4
   input=new CopyJob(input_fc, xstring::format(_("%s (filter)"),a0.get()), filter?filter:a0);
Packit 8f70b4
Packit 8f70b4
   if(!output)
Packit 8f70b4
      output=input;
Packit 8f70b4
Packit 8f70b4
   input->SetParentFg(this);
Packit 8f70b4
   InputPeer()->SetDate(NO_DATE);
Packit 8f70b4
   InputPeer()->SetSize(NO_SIZE);
Packit 8f70b4
   input->GetCopy()->DontCopyDate();
Packit 8f70b4
   input->NoStatus();
Packit 8f70b4
Packit 8f70b4
   if(input != output)
Packit 8f70b4
   {
Packit 8f70b4
      output->SetParentFg(this);
Packit 8f70b4
      OutputPeer()->SetDate(NO_DATE);
Packit 8f70b4
      OutputPeer()->SetSize(NO_SIZE);
Packit 8f70b4
      output->GetCopy()->DontCopyDate();
Packit 8f70b4
      output->NoStatus();
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   if(is_stdout)
Packit 8f70b4
   {
Packit 8f70b4
      output->ClearStatusOnWrite();
Packit 8f70b4
      output->GetCopy()->LineBuffered();
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   Timeout(0);
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void OutputJob::Init(const char *_a0)
Packit 8f70b4
{
Packit 8f70b4
   input=output=0;
Packit 8f70b4
   initialized=false;
Packit 8f70b4
   error=false;
Packit 8f70b4
   no_status=false;
Packit 8f70b4
   a0.set(_a0);
Packit 8f70b4
   is_stdout=false;
Packit 8f70b4
   fail_if_broken=true;
Packit 8f70b4
   is_a_tty=false;
Packit 8f70b4
   width=-1;
Packit 8f70b4
   statusbar_redisplay=true;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
/* Local (fd) output. */
Packit 8f70b4
OutputJob::OutputJob(FDStream *output_, const char *a0)
Packit 8f70b4
   : output_fd(output_ ? output_ : new FDStream(1,"<stdout>"))
Packit 8f70b4
{
Packit 8f70b4
   Init(a0);
Packit 8f70b4
Packit 8f70b4
   if(output_)
Packit 8f70b4
   {
Packit 8f70b4
      /* We don't want to produce broken pipe when we're actually
Packit 8f70b4
       * piping, since some legitimate uses produce broken pipe, eg
Packit 8f70b4
       * cat|head.  However, that's actually handled in InitCopy().
Packit 8f70b4
       * User pipes aren't handled by us yet: instead of being set with
Packit 8f70b4
       * SetFilter, they're being set up ahead of time and passed to
Packit 8f70b4
       * us as an FDStream, so we don't really know if we're being filtered.
Packit 8f70b4
       *
Packit 8f70b4
       * So, until we handle pipes directly, disable broken pipe whenever
Packit 8f70b4
       * we're being sent anywhere but stdout. */
Packit 8f70b4
      fail_if_broken=false;
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   is_stdout=output_fd->usesfd(1);
Packit 8f70b4
   is_a_tty=isatty(output_fd->fd);
Packit 8f70b4
   width=fd_width(output_fd->fd);
Packit 8f70b4
Packit 8f70b4
   /* We don't output status when outputting locally. */
Packit 8f70b4
   no_status=true;
Packit 8f70b4
Packit 8f70b4
   /* Make sure that if the output is going to fail, it fails early, so
Packit 8f70b4
    * the parent doesn't start anything expensive (like begin downloading
Packit 8f70b4
    * a file.) */
Packit 8f70b4
   if(output_fd->getfd()==-1 && output_fd->error())
Packit 8f70b4
   {
Packit 8f70b4
      eprintf("%s: %s\n", a0, output_fd->error_text.get());
Packit 8f70b4
      error=true;
Packit 8f70b4
   }
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
OutputJob::OutputJob(const char *path, const char *a0, FileAccess *fa0)
Packit 8f70b4
   : fa(fa0 ? fa0->Clone() : FileAccess::New("file")), fa_path(path)
Packit 8f70b4
{
Packit 8f70b4
   Init(a0);
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void OutputJob::PrepareToDie()
Packit 8f70b4
{
Packit 8f70b4
   Bg();
Packit 8f70b4
   AcceptSig(SIGTERM);
Packit 8f70b4
Packit 8f70b4
   Delete(input);
Packit 8f70b4
   if(input != output)
Packit 8f70b4
      Delete(output);
Packit 8f70b4
   input=0;
Packit 8f70b4
   output=0;
Packit 8f70b4
Packit 8f70b4
   super::PrepareToDie();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
/* This is called to ask us "permission" to display a status line. */
Packit 8f70b4
bool OutputJob::ShowStatusLine(const SMTaskRef<StatusLine>& s)
Packit 8f70b4
{
Packit 8f70b4
   /* If our output file is gone, or isn't stdout, we don't care. */
Packit 8f70b4
   if(!output || !is_stdout)
Packit 8f70b4
      return true;
Packit 8f70b4
Packit 8f70b4
   /* If we're filtered, we never display at all.  (We don't know anything about
Packit 8f70b4
    * the filter's output; the only time we do is when we're outputting to a URL
Packit 8f70b4
    * or a file, and that doesn't apply here.) */
Packit 8f70b4
   if(IsFiltered())
Packit 8f70b4
      return false;
Packit 8f70b4
Packit 8f70b4
   /* If we're not line buffered, display only if the output CopyJob says to. */
Packit 8f70b4
   if(!output->GetCopy()->IsLineBuffered())
Packit 8f70b4
      return output->HasStatus();
Packit 8f70b4
Packit 8f70b4
   /* We're line buffered, so we can output a status line without stomping
Packit 8f70b4
    * on a partially output line.
Packit 8f70b4
    *
Packit 8f70b4
    * If we've output something recently, only send the output to the title,
Packit 8f70b4
    * to avoid flickering status for no reason.
Packit 8f70b4
    */
Packit 8f70b4
   if(!update_timer.Stopped()) {
Packit 8f70b4
      s->NextUpdateTitleOnly();
Packit 8f70b4
      return true;
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   /* If we're not reenabling the status bar, and the statusbar has
Packit 8f70b4
    * been turned off (due to output being reenabled), only send to
Packit 8f70b4
    * the title. */
Packit 8f70b4
   if(!statusbar_redisplay && output->GetCopy()->WriteAllowed())
Packit 8f70b4
   {
Packit 8f70b4
      s->NextUpdateTitleOnly();
Packit 8f70b4
      return true;
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   /* Don't disable write if there are data to be written in buffer */
Packit 8f70b4
   if(output->GetCopy()->WriteAllowed() && output->GetCopy()->WritePending())
Packit 8f70b4
      return false;
Packit 8f70b4
Packit 8f70b4
   /* There hasn't been output in a while.  Stop the output again,
Packit 8f70b4
    * so the FileCopy will clear the StatusLine when there's more data. */
Packit 8f70b4
   output->GetCopy()->AllowWrite(false);
Packit 8f70b4
Packit 8f70b4
   return true;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
/* Get our contribution to the status line, which is just the output
Packit 8f70b4
 * status, if any.  Input status is the job of the user object. */
Packit 8f70b4
const char *OutputJob::Status(const StatusLine *s)
Packit 8f70b4
{
Packit 8f70b4
   if(no_status)
Packit 8f70b4
      return "";
Packit 8f70b4
Packit 8f70b4
   /* Never show anything if we havn't even received any data yet; it won't
Packit 8f70b4
    * start connecting until then, so it's not interesting. */
Packit 8f70b4
   if(!initialized)
Packit 8f70b4
      return "";
Packit 8f70b4
Packit 8f70b4
   /* Use the status from the output CopyJob.  It'll be the one that's connecting
Packit 8f70b4
    * to a host, if applicable. */
Packit 8f70b4
   return output->Status(s,true);
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void OutputJob::PutEOF()
Packit 8f70b4
{
Packit 8f70b4
   if(Error())
Packit 8f70b4
      return;
Packit 8f70b4
Packit 8f70b4
   /* Make sure we've sent at least one (empty) block.  This ensures
Packit 8f70b4
    * that we always start the input->output code path. */
Packit 8f70b4
   Put("", 0);
Packit 8f70b4
Packit 8f70b4
   /* Send an EOF to the input peer; it'll send an EOF to the output peer
Packit 8f70b4
    * when all of its data is actually sent. */
Packit 8f70b4
   if(InputPeer())
Packit 8f70b4
      InputPeer()->PutEOF();
Packit 8f70b4
   else if(tmp_buf)
Packit 8f70b4
      tmp_buf->PutEOF();
Packit 8f70b4
   else
Packit 8f70b4
      abort();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
/* add a filter to the beginning of the list */
Packit 8f70b4
void OutputJob::PreFilter(const char *newfilter)
Packit 8f70b4
{
Packit 8f70b4
   if(!filter)
Packit 8f70b4
      filter.set(newfilter);
Packit 8f70b4
   else
Packit 8f70b4
   {
Packit 8f70b4
      char *old_filter=alloca_strdup(filter);
Packit 8f70b4
      filter.vset(newfilter," | ",old_filter,NULL);
Packit 8f70b4
   }
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
/* Return the width of the output.  If there's a filter, we can either
Packit 8f70b4
 * return -1 (we might be piping through "sed", changing the width),
Packit 8f70b4
 * or the width we know (which is sane for most pagers.)  I'm not sure
Packit 8f70b4
 * which is better. */
Packit 8f70b4
int OutputJob::GetWidth() const
Packit 8f70b4
{
Packit 8f70b4
   if(IsFiltered())
Packit 8f70b4
      return -1;
Packit 8f70b4
   return width;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
/* Return true if the output is going directly to a TTY. */
Packit 8f70b4
bool OutputJob::IsTTY() const
Packit 8f70b4
{
Packit 8f70b4
   if(IsFiltered())
Packit 8f70b4
      return false;
Packit 8f70b4
   return is_a_tty;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
static const SMTaskRef<FileCopyPeer> null_peer;
Packit 8f70b4
Packit 8f70b4
/* Get the input FileCopyPeer; this is the buffer we write to. */
Packit 8f70b4
const SMTaskRef<FileCopyPeer>& OutputJob::InputPeer() const
Packit 8f70b4
{
Packit 8f70b4
   if(input)
Packit 8f70b4
      return input->GetGet();
Packit 8f70b4
   return null_peer;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
/* Get the output FileCopyPeer (the FileCopyPeer that's doing the final output). */
Packit 8f70b4
const SMTaskRef<FileCopyPeer>& OutputJob::OutputPeer() const
Packit 8f70b4
{
Packit 8f70b4
   if(output)
Packit 8f70b4
      return output->GetPut();
Packit 8f70b4
   return null_peer;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
/* We're done if the output is finished, or on error. */
Packit 8f70b4
int OutputJob::Done()
Packit 8f70b4
{
Packit 8f70b4
   if(Error())
Packit 8f70b4
      return true;
Packit 8f70b4
Packit 8f70b4
   if(!initialized)
Packit 8f70b4
      return false;
Packit 8f70b4
Packit 8f70b4
   if(input && !input->Done())
Packit 8f70b4
     return false;
Packit 8f70b4
   if(output && !output->Done())
Packit 8f70b4
     return false;
Packit 8f70b4
Packit 8f70b4
   return true;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
int OutputJob::Do()
Packit 8f70b4
{
Packit 8f70b4
   if(!initialized && tmp_buf)
Packit 8f70b4
      InitCopy();
Packit 8f70b4
   return STALL;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
/* Don't register errors until they're actually printed by
Packit 8f70b4
 * the sub-job (ie. it's also Done()). */
Packit 8f70b4
bool OutputJob::Error()
Packit 8f70b4
{
Packit 8f70b4
   if(error)
Packit 8f70b4
      return true;
Packit 8f70b4
   if(input && input->Error() && input->Done())
Packit 8f70b4
      error=true;
Packit 8f70b4
   if(output && input != output && output->Error() && output->Done())
Packit 8f70b4
      error=true;
Packit 8f70b4
   return error;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void OutputJob::Fg()
Packit 8f70b4
{
Packit 8f70b4
   super::Fg();
Packit 8f70b4
   if(input)
Packit 8f70b4
      input->Fg();
Packit 8f70b4
   if(output && input != output)
Packit 8f70b4
      output->Fg();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void OutputJob::Bg()
Packit 8f70b4
{
Packit 8f70b4
   if(output && input != output)
Packit 8f70b4
      output->Bg();
Packit 8f70b4
   if(input)
Packit 8f70b4
      input->Bg();
Packit 8f70b4
   super::Bg();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void OutputJob::SuspendInternal()
Packit 8f70b4
{
Packit 8f70b4
   super::SuspendInternal();
Packit 8f70b4
   if(input)
Packit 8f70b4
      input->SuspendSlave();
Packit 8f70b4
   if(output && input != output)
Packit 8f70b4
      output->SuspendSlave();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void OutputJob::ResumeInternal()
Packit 8f70b4
{
Packit 8f70b4
   if(input)
Packit 8f70b4
      input->ResumeSlave();
Packit 8f70b4
   if(output && input != output)
Packit 8f70b4
      output->ResumeSlave();
Packit 8f70b4
   super::ResumeInternal();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
bool OutputJob::Full()
Packit 8f70b4
{
Packit 8f70b4
   /* It'd be nicer to just check copy->GetGet()->IsSuspended(), since
Packit 8f70b4
    * the FileCopy will suspend the Get end if the Put end gets filled.
Packit 8f70b4
    * However, it won't do that until it actually tries to send something. */
Packit 8f70b4
   int size = 0;
Packit 8f70b4
   if(input)
Packit 8f70b4
   {
Packit 8f70b4
      if(input->GetPut())
Packit 8f70b4
	 size += input->GetPut()->Buffered();
Packit 8f70b4
      if(input->GetGet())
Packit 8f70b4
	 size += input->GetGet()->Buffered();
Packit 8f70b4
      if(input != output)
Packit 8f70b4
      {
Packit 8f70b4
	 if(output->GetPut())
Packit 8f70b4
	    size += output->GetPut()->Buffered();
Packit 8f70b4
	 if(output->GetGet())
Packit 8f70b4
	    size += output->GetGet()->Buffered();
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
   if(tmp_buf)
Packit 8f70b4
      size += tmp_buf->Size();
Packit 8f70b4
Packit 8f70b4
   return size >= 0x10000;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
/* We'll actually go over the buffer limit here; that's OK; it's not a
Packit 8f70b4
 * strict value.  (It's not convenient to prevent that completely with
Packit 8f70b4
 * Format(), either.) */
Packit 8f70b4
void OutputJob::Put(const char *buf,int size)
Packit 8f70b4
{
Packit 8f70b4
   InitCopy();
Packit 8f70b4
   if(Error())
Packit 8f70b4
      return;
Packit 8f70b4
   if(!InputPeer())
Packit 8f70b4
   {
Packit 8f70b4
      if(!tmp_buf)
Packit 8f70b4
	 tmp_buf=new Buffer;
Packit 8f70b4
      tmp_buf->Put(buf,size);
Packit 8f70b4
      return;
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   // InputPeer was inited, flush tmp_buf.
Packit 8f70b4
   if(InputPeer() && tmp_buf)
Packit 8f70b4
   {
Packit 8f70b4
      Ref<Buffer> saved_buf(tmp_buf.borrow());
Packit 8f70b4
      const char *b=0;
Packit 8f70b4
      int s=0;
Packit 8f70b4
      saved_buf->Get(&b,&s);
Packit 8f70b4
      if(b && s>0)
Packit 8f70b4
	 Put(b,s);
Packit 8f70b4
      if(saved_buf->Eof())
Packit 8f70b4
	 PutEOF();
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   update_timer.SetResource("cmd:status-interval",0);
Packit 8f70b4
Packit 8f70b4
   off_t oldpos = InputPeer()->GetPos();
Packit 8f70b4
   InputPeer()->Put(buf, size);
Packit 8f70b4
   InputPeer()->SetPos(oldpos);
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void OutputJob::Format(const char *f,...)
Packit 8f70b4
{
Packit 8f70b4
   va_list v;
Packit 8f70b4
   va_start(v,f);
Packit 8f70b4
   const xstring& str=xstring::vformat(f,v);
Packit 8f70b4
   va_end(v);
Packit 8f70b4
   Put(str);
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
/* Propagate signals down to our child processes. */
Packit 8f70b4
int OutputJob::AcceptSig(int sig)
Packit 8f70b4
{
Packit 8f70b4
   int m=MOVED;
Packit 8f70b4
   if(sig == SIGTERM || sig == SIGINT)
Packit 8f70b4
      m=WANTDIE;
Packit 8f70b4
Packit 8f70b4
   /* If we have an input copier right now, it'll contain the top filter
Packit 8f70b4
    * (which is linked to all other filters), so send it the signal. */
Packit 8f70b4
   if(input)
Packit 8f70b4
      input->AcceptSig(sig);
Packit 8f70b4
   /* Otherwise, the only filters we have running are in output_fd. */
Packit 8f70b4
   else if(output_fd)
Packit 8f70b4
      output_fd->Kill(sig);
Packit 8f70b4
   if(sig!=SIGCONT)
Packit 8f70b4
      AcceptSig(SIGCONT);
Packit 8f70b4
   return m;
Packit 8f70b4
}