Blame src/OutputJob.cc

Packit Service a2489d
/*
Packit Service a2489d
 * lftp - file transfer program
Packit Service a2489d
 *
Packit Service a2489d
 * Copyright (c) 1996-2013 by Alexander V. Lukyanov (lav@yars.free.net)
Packit Service a2489d
 *
Packit Service a2489d
 * This program is free software; you can redistribute it and/or modify
Packit Service a2489d
 * it under the terms of the GNU General Public License as published by
Packit Service a2489d
 * the Free Software Foundation; either version 3 of the License, or
Packit Service a2489d
 * (at your option) any later version.
Packit Service a2489d
 *
Packit Service a2489d
 * This program is distributed in the hope that it will be useful,
Packit Service a2489d
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service a2489d
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit Service a2489d
 * GNU General Public License for more details.
Packit Service a2489d
 *
Packit Service a2489d
 * You should have received a copy of the GNU General Public License
Packit Service a2489d
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
Packit Service a2489d
 */
Packit Service a2489d
Packit Service a2489d
/* Usage notes:
Packit Service a2489d
 *
Packit Service a2489d
 * Call PreFilter() to add a filter to the beginning of the chain; these
Packit Service a2489d
 * filters are initialized only once for all data.  For example,
Packit Service a2489d
 * PreFilter("wc -l")
Packit Service a2489d
 *
Packit Service a2489d
 */
Packit Service a2489d
Packit Service a2489d
/*
Packit Service a2489d
 * Implementation notes:
Packit Service a2489d
 *
Packit Service a2489d
 * Background things we can't get around:
Packit Service a2489d
 * We must buffer (via FileCopy) output to a filter, since it might block.
Packit Service a2489d
 *
Packit Service a2489d
 * We must buffer the output from the filter to an output FileCopyPeer (ie.
Packit Service a2489d
 * a URL), for the same reason.
Packit Service a2489d
 *
Packit Service a2489d
 * So, we're stuck with having up to two FileCopy's.  (One to send, one to filter.)
Packit Service a2489d
 *
Packit Service a2489d
 * In some cases, we only need one: if the output is an FD, the filter can
Packit Service a2489d
 * hook up directly and we can forget about that stage.
Packit Service a2489d
 *
Packit Service a2489d
 * In the case where we're outputting to a URL, we set up a FileCopy from a
Packit Service a2489d
 * pipe to the URL, and then pretend we're just outputting to an FD (the
Packit Service a2489d
 * pipe.) This means in the simple case of having no filters at all, writing
Packit Service a2489d
 * to a URL or file, we send the data an extra time through a FileCopy and a
Packit Service a2489d
 * pipe.  That's a bit inefficient, but that's "cat file1 > file2"; that's
Packit Service a2489d
 * normally done with "get file1 -o file2", so this shouldn't happen often.
Packit Service a2489d
 *
Packit Service a2489d
 * It's very important that if the output is stdout, any filters point directly
Packit Service a2489d
 * at it, not through an extra copy: a pager, for example, will expect the output
Packit Service a2489d
 * to be a TTY.
Packit Service a2489d
 *
Packit Service a2489d
 */
Packit Service a2489d
#include <config.h>
Packit Service a2489d
Packit Service a2489d
#include "OutputJob.h"
Packit Service a2489d
#include "ArgV.h"
Packit Service a2489d
#include "FileCopy.h"
Packit Service a2489d
#include "CopyJob.h"
Packit Service a2489d
#include "url.h"
Packit Service a2489d
#include "misc.h"
Packit Service a2489d
#include "StatusLine.h"
Packit Service a2489d
#include "DummyProto.h"
Packit Service a2489d
Packit Service a2489d
#include <assert.h>
Packit Service a2489d
#include <unistd.h>
Packit Service a2489d
#include <errno.h>
Packit Service a2489d
#include <sys/ioctl.h>
Packit Service a2489d
#include <fcntl.h>
Packit Service a2489d
#include <stddef.h>
Packit Service a2489d
Packit Service a2489d
#define super Job
Packit Service a2489d
Packit Service a2489d
void OutputJob::InitCopy()
Packit Service a2489d
{
Packit Service a2489d
   if(error)
Packit Service a2489d
      return;
Packit Service a2489d
Packit Service a2489d
   if(initialized)
Packit Service a2489d
      return;
Packit Service a2489d
Packit Service a2489d
   if(fa)
Packit Service a2489d
   {
Packit Service a2489d
      /* Set up a pipe sending data at the peer, so we can act like the FDStream
Packit Service a2489d
       * constructor. */
Packit Service a2489d
      int filter_pipe[2];
Packit Service a2489d
Packit Service a2489d
      if(pipe(filter_pipe) == -1) {
Packit Service a2489d
	 // retry later
Packit Service a2489d
	 current->TimeoutS(1);
Packit Service a2489d
	 return;
Packit Service a2489d
      }
Packit Service a2489d
Packit Service a2489d
      FileCopyPeerFA *dst_peer = FileCopyPeerFA::New(fa.borrow(), fa_path, FA::STORE);
Packit Service a2489d
Packit Service a2489d
      /* Status only for remote outputs. */
Packit Service a2489d
      if(!strcmp(dst_peer->GetProto(), "file"))
Packit Service a2489d
	 no_status=true;
Packit Service a2489d
Packit Service a2489d
      fcntl(filter_pipe[0],F_SETFL,O_NONBLOCK);
Packit Service a2489d
      fcntl(filter_pipe[1],F_SETFL,O_NONBLOCK);
Packit Service a2489d
Packit Service a2489d
      /* The output of the pipe (0) goes to the output FileCopy. */
Packit Service a2489d
      FDStream *pipe_output = new FDStream(filter_pipe[0],"<filter-out>");
Packit Service a2489d
Packit Service a2489d
      FileCopy *output_fc=FileCopy::New(new FileCopyPeerFDStream(pipe_output, FileCopyPeer::GET), dst_peer,false);
Packit Service a2489d
      output=new CopyJob(output_fc, fa_path, a0);
Packit Service a2489d
      output->NoStatus(no_status);
Packit Service a2489d
      output_fd=new FDStream(filter_pipe[1],"<filter-in>");
Packit Service a2489d
Packit Service a2489d
      pipe_output->CloseWhenDone();
Packit Service a2489d
      output_fd->CloseWhenDone();
Packit Service a2489d
Packit Service a2489d
      fa_path.set(0);
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   initialized=true;
Packit Service a2489d
Packit Service a2489d
   if(Error())
Packit Service a2489d
      return;
Packit Service a2489d
Packit Service a2489d
   /* Clear the statusline, since we might change the pgrp if we create filters. */
Packit Service a2489d
   ClearStatus();
Packit Service a2489d
Packit Service a2489d
   /* Some legitimate uses produce broken pipe condition (cat|head).
Packit Service a2489d
    * We still want to produce broken pipe if we're not piping, eg
Packit Service a2489d
    * cat > pipe. */
Packit Service a2489d
   if(IsFiltered())
Packit Service a2489d
      fail_if_broken=false;
Packit Service a2489d
Packit Service a2489d
   if(filter)
Packit Service a2489d
   {
Packit Service a2489d
      /* Create the global filter: */
Packit Service a2489d
      OutputFilter *global = new OutputFilter(filter, output_fd.borrow());
Packit Service a2489d
      output_fd=global;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   /* Use a FileCopy to buffer our output to the filter: */
Packit Service a2489d
   FileCopyPeerFDStream *out = new FileCopyPeerFDStream(output_fd.borrow(), FileCopyPeer::PUT);
Packit Service a2489d
   FileCopy *input_fc = FileCopy::New(new FileCopyPeer(FileCopyPeer::GET), out, false);
Packit Service a2489d
Packit Service a2489d
   if(!fail_if_broken)
Packit Service a2489d
      input_fc->DontFailIfBroken();
Packit Service a2489d
Packit Service a2489d
   input=new CopyJob(input_fc, xstring::format(_("%s (filter)"),a0.get()), filter?filter:a0);
Packit Service a2489d
Packit Service a2489d
   if(!output)
Packit Service a2489d
      output=input;
Packit Service a2489d
Packit Service a2489d
   input->SetParentFg(this);
Packit Service a2489d
   InputPeer()->SetDate(NO_DATE);
Packit Service a2489d
   InputPeer()->SetSize(NO_SIZE);
Packit Service a2489d
   input->GetCopy()->DontCopyDate();
Packit Service a2489d
   input->NoStatus();
Packit Service a2489d
Packit Service a2489d
   if(input != output)
Packit Service a2489d
   {
Packit Service a2489d
      output->SetParentFg(this);
Packit Service a2489d
      OutputPeer()->SetDate(NO_DATE);
Packit Service a2489d
      OutputPeer()->SetSize(NO_SIZE);
Packit Service a2489d
      output->GetCopy()->DontCopyDate();
Packit Service a2489d
      output->NoStatus();
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   if(is_stdout)
Packit Service a2489d
   {
Packit Service a2489d
      output->ClearStatusOnWrite();
Packit Service a2489d
      output->GetCopy()->LineBuffered();
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   Timeout(0);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void OutputJob::Init(const char *_a0)
Packit Service a2489d
{
Packit Service a2489d
   input=output=0;
Packit Service a2489d
   initialized=false;
Packit Service a2489d
   error=false;
Packit Service a2489d
   no_status=false;
Packit Service a2489d
   a0.set(_a0);
Packit Service a2489d
   is_stdout=false;
Packit Service a2489d
   fail_if_broken=true;
Packit Service a2489d
   is_a_tty=false;
Packit Service a2489d
   width=-1;
Packit Service a2489d
   statusbar_redisplay=true;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
/* Local (fd) output. */
Packit Service a2489d
OutputJob::OutputJob(FDStream *output_, const char *a0)
Packit Service a2489d
   : output_fd(output_ ? output_ : new FDStream(1,"<stdout>"))
Packit Service a2489d
{
Packit Service a2489d
   Init(a0);
Packit Service a2489d
Packit Service a2489d
   if(output_)
Packit Service a2489d
   {
Packit Service a2489d
      /* We don't want to produce broken pipe when we're actually
Packit Service a2489d
       * piping, since some legitimate uses produce broken pipe, eg
Packit Service a2489d
       * cat|head.  However, that's actually handled in InitCopy().
Packit Service a2489d
       * User pipes aren't handled by us yet: instead of being set with
Packit Service a2489d
       * SetFilter, they're being set up ahead of time and passed to
Packit Service a2489d
       * us as an FDStream, so we don't really know if we're being filtered.
Packit Service a2489d
       *
Packit Service a2489d
       * So, until we handle pipes directly, disable broken pipe whenever
Packit Service a2489d
       * we're being sent anywhere but stdout. */
Packit Service a2489d
      fail_if_broken=false;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   is_stdout=output_fd->usesfd(1);
Packit Service a2489d
   is_a_tty=isatty(output_fd->fd);
Packit Service a2489d
   width=fd_width(output_fd->fd);
Packit Service a2489d
Packit Service a2489d
   /* We don't output status when outputting locally. */
Packit Service a2489d
   no_status=true;
Packit Service a2489d
Packit Service a2489d
   /* Make sure that if the output is going to fail, it fails early, so
Packit Service a2489d
    * the parent doesn't start anything expensive (like begin downloading
Packit Service a2489d
    * a file.) */
Packit Service a2489d
   if(output_fd->getfd()==-1 && output_fd->error())
Packit Service a2489d
   {
Packit Service a2489d
      eprintf("%s: %s\n", a0, output_fd->error_text.get());
Packit Service a2489d
      error=true;
Packit Service a2489d
   }
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
OutputJob::OutputJob(const char *path, const char *a0, FileAccess *fa0)
Packit Service a2489d
   : fa(fa0 ? fa0->Clone() : FileAccess::New("file")), fa_path(path)
Packit Service a2489d
{
Packit Service a2489d
   Init(a0);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void OutputJob::PrepareToDie()
Packit Service a2489d
{
Packit Service a2489d
   Bg();
Packit Service a2489d
   AcceptSig(SIGTERM);
Packit Service a2489d
Packit Service a2489d
   Delete(input);
Packit Service a2489d
   if(input != output)
Packit Service a2489d
      Delete(output);
Packit Service a2489d
   input=0;
Packit Service a2489d
   output=0;
Packit Service a2489d
Packit Service a2489d
   super::PrepareToDie();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
/* This is called to ask us "permission" to display a status line. */
Packit Service a2489d
bool OutputJob::ShowStatusLine(const SMTaskRef<StatusLine>& s)
Packit Service a2489d
{
Packit Service a2489d
   /* If our output file is gone, or isn't stdout, we don't care. */
Packit Service a2489d
   if(!output || !is_stdout)
Packit Service a2489d
      return true;
Packit Service a2489d
Packit Service a2489d
   /* If we're filtered, we never display at all.  (We don't know anything about
Packit Service a2489d
    * the filter's output; the only time we do is when we're outputting to a URL
Packit Service a2489d
    * or a file, and that doesn't apply here.) */
Packit Service a2489d
   if(IsFiltered())
Packit Service a2489d
      return false;
Packit Service a2489d
Packit Service a2489d
   /* If we're not line buffered, display only if the output CopyJob says to. */
Packit Service a2489d
   if(!output->GetCopy()->IsLineBuffered())
Packit Service a2489d
      return output->HasStatus();
Packit Service a2489d
Packit Service a2489d
   /* We're line buffered, so we can output a status line without stomping
Packit Service a2489d
    * on a partially output line.
Packit Service a2489d
    *
Packit Service a2489d
    * If we've output something recently, only send the output to the title,
Packit Service a2489d
    * to avoid flickering status for no reason.
Packit Service a2489d
    */
Packit Service a2489d
   if(!update_timer.Stopped()) {
Packit Service a2489d
      s->NextUpdateTitleOnly();
Packit Service a2489d
      return true;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   /* If we're not reenabling the status bar, and the statusbar has
Packit Service a2489d
    * been turned off (due to output being reenabled), only send to
Packit Service a2489d
    * the title. */
Packit Service a2489d
   if(!statusbar_redisplay && output->GetCopy()->WriteAllowed())
Packit Service a2489d
   {
Packit Service a2489d
      s->NextUpdateTitleOnly();
Packit Service a2489d
      return true;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   /* Don't disable write if there are data to be written in buffer */
Packit Service a2489d
   if(output->GetCopy()->WriteAllowed() && output->GetCopy()->WritePending())
Packit Service a2489d
      return false;
Packit Service a2489d
Packit Service a2489d
   /* There hasn't been output in a while.  Stop the output again,
Packit Service a2489d
    * so the FileCopy will clear the StatusLine when there's more data. */
Packit Service a2489d
   output->GetCopy()->AllowWrite(false);
Packit Service a2489d
Packit Service a2489d
   return true;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
/* Get our contribution to the status line, which is just the output
Packit Service a2489d
 * status, if any.  Input status is the job of the user object. */
Packit Service a2489d
const char *OutputJob::Status(const StatusLine *s)
Packit Service a2489d
{
Packit Service a2489d
   if(no_status)
Packit Service a2489d
      return "";
Packit Service a2489d
Packit Service a2489d
   /* Never show anything if we havn't even received any data yet; it won't
Packit Service a2489d
    * start connecting until then, so it's not interesting. */
Packit Service a2489d
   if(!initialized)
Packit Service a2489d
      return "";
Packit Service a2489d
Packit Service a2489d
   /* Use the status from the output CopyJob.  It'll be the one that's connecting
Packit Service a2489d
    * to a host, if applicable. */
Packit Service a2489d
   return output->Status(s,true);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void OutputJob::PutEOF()
Packit Service a2489d
{
Packit Service a2489d
   if(Error())
Packit Service a2489d
      return;
Packit Service a2489d
Packit Service a2489d
   /* Make sure we've sent at least one (empty) block.  This ensures
Packit Service a2489d
    * that we always start the input->output code path. */
Packit Service a2489d
   Put("", 0);
Packit Service a2489d
Packit Service a2489d
   /* Send an EOF to the input peer; it'll send an EOF to the output peer
Packit Service a2489d
    * when all of its data is actually sent. */
Packit Service a2489d
   if(InputPeer())
Packit Service a2489d
      InputPeer()->PutEOF();
Packit Service a2489d
   else if(tmp_buf)
Packit Service a2489d
      tmp_buf->PutEOF();
Packit Service a2489d
   else
Packit Service a2489d
      abort();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
/* add a filter to the beginning of the list */
Packit Service a2489d
void OutputJob::PreFilter(const char *newfilter)
Packit Service a2489d
{
Packit Service a2489d
   if(!filter)
Packit Service a2489d
      filter.set(newfilter);
Packit Service a2489d
   else
Packit Service a2489d
   {
Packit Service a2489d
      char *old_filter=alloca_strdup(filter);
Packit Service a2489d
      filter.vset(newfilter," | ",old_filter,NULL);
Packit Service a2489d
   }
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
/* Return the width of the output.  If there's a filter, we can either
Packit Service a2489d
 * return -1 (we might be piping through "sed", changing the width),
Packit Service a2489d
 * or the width we know (which is sane for most pagers.)  I'm not sure
Packit Service a2489d
 * which is better. */
Packit Service a2489d
int OutputJob::GetWidth() const
Packit Service a2489d
{
Packit Service a2489d
   if(IsFiltered())
Packit Service a2489d
      return -1;
Packit Service a2489d
   return width;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
/* Return true if the output is going directly to a TTY. */
Packit Service a2489d
bool OutputJob::IsTTY() const
Packit Service a2489d
{
Packit Service a2489d
   if(IsFiltered())
Packit Service a2489d
      return false;
Packit Service a2489d
   return is_a_tty;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
static const SMTaskRef<FileCopyPeer> null_peer;
Packit Service a2489d
Packit Service a2489d
/* Get the input FileCopyPeer; this is the buffer we write to. */
Packit Service a2489d
const SMTaskRef<FileCopyPeer>& OutputJob::InputPeer() const
Packit Service a2489d
{
Packit Service a2489d
   if(input)
Packit Service a2489d
      return input->GetGet();
Packit Service a2489d
   return null_peer;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
/* Get the output FileCopyPeer (the FileCopyPeer that's doing the final output). */
Packit Service a2489d
const SMTaskRef<FileCopyPeer>& OutputJob::OutputPeer() const
Packit Service a2489d
{
Packit Service a2489d
   if(output)
Packit Service a2489d
      return output->GetPut();
Packit Service a2489d
   return null_peer;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
/* We're done if the output is finished, or on error. */
Packit Service a2489d
int OutputJob::Done()
Packit Service a2489d
{
Packit Service a2489d
   if(Error())
Packit Service a2489d
      return true;
Packit Service a2489d
Packit Service a2489d
   if(!initialized)
Packit Service a2489d
      return false;
Packit Service a2489d
Packit Service a2489d
   if(input && !input->Done())
Packit Service a2489d
     return false;
Packit Service a2489d
   if(output && !output->Done())
Packit Service a2489d
     return false;
Packit Service a2489d
Packit Service a2489d
   return true;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
int OutputJob::Do()
Packit Service a2489d
{
Packit Service a2489d
   if(!initialized && tmp_buf)
Packit Service a2489d
      InitCopy();
Packit Service a2489d
   return STALL;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
/* Don't register errors until they're actually printed by
Packit Service a2489d
 * the sub-job (ie. it's also Done()). */
Packit Service a2489d
bool OutputJob::Error()
Packit Service a2489d
{
Packit Service a2489d
   if(error)
Packit Service a2489d
      return true;
Packit Service a2489d
   if(input && input->Error() && input->Done())
Packit Service a2489d
      error=true;
Packit Service a2489d
   if(output && input != output && output->Error() && output->Done())
Packit Service a2489d
      error=true;
Packit Service a2489d
   return error;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void OutputJob::Fg()
Packit Service a2489d
{
Packit Service a2489d
   super::Fg();
Packit Service a2489d
   if(input)
Packit Service a2489d
      input->Fg();
Packit Service a2489d
   if(output && input != output)
Packit Service a2489d
      output->Fg();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void OutputJob::Bg()
Packit Service a2489d
{
Packit Service a2489d
   if(output && input != output)
Packit Service a2489d
      output->Bg();
Packit Service a2489d
   if(input)
Packit Service a2489d
      input->Bg();
Packit Service a2489d
   super::Bg();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void OutputJob::SuspendInternal()
Packit Service a2489d
{
Packit Service a2489d
   super::SuspendInternal();
Packit Service a2489d
   if(input)
Packit Service a2489d
      input->SuspendSlave();
Packit Service a2489d
   if(output && input != output)
Packit Service a2489d
      output->SuspendSlave();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void OutputJob::ResumeInternal()
Packit Service a2489d
{
Packit Service a2489d
   if(input)
Packit Service a2489d
      input->ResumeSlave();
Packit Service a2489d
   if(output && input != output)
Packit Service a2489d
      output->ResumeSlave();
Packit Service a2489d
   super::ResumeInternal();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
bool OutputJob::Full()
Packit Service a2489d
{
Packit Service a2489d
   /* It'd be nicer to just check copy->GetGet()->IsSuspended(), since
Packit Service a2489d
    * the FileCopy will suspend the Get end if the Put end gets filled.
Packit Service a2489d
    * However, it won't do that until it actually tries to send something. */
Packit Service a2489d
   int size = 0;
Packit Service a2489d
   if(input)
Packit Service a2489d
   {
Packit Service a2489d
      if(input->GetPut())
Packit Service a2489d
	 size += input->GetPut()->Buffered();
Packit Service a2489d
      if(input->GetGet())
Packit Service a2489d
	 size += input->GetGet()->Buffered();
Packit Service a2489d
      if(input != output)
Packit Service a2489d
      {
Packit Service a2489d
	 if(output->GetPut())
Packit Service a2489d
	    size += output->GetPut()->Buffered();
Packit Service a2489d
	 if(output->GetGet())
Packit Service a2489d
	    size += output->GetGet()->Buffered();
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   if(tmp_buf)
Packit Service a2489d
      size += tmp_buf->Size();
Packit Service a2489d
Packit Service a2489d
   return size >= 0x10000;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
/* We'll actually go over the buffer limit here; that's OK; it's not a
Packit Service a2489d
 * strict value.  (It's not convenient to prevent that completely with
Packit Service a2489d
 * Format(), either.) */
Packit Service a2489d
void OutputJob::Put(const char *buf,int size)
Packit Service a2489d
{
Packit Service a2489d
   InitCopy();
Packit Service a2489d
   if(Error())
Packit Service a2489d
      return;
Packit Service a2489d
   if(!InputPeer())
Packit Service a2489d
   {
Packit Service a2489d
      if(!tmp_buf)
Packit Service a2489d
	 tmp_buf=new Buffer;
Packit Service a2489d
      tmp_buf->Put(buf,size);
Packit Service a2489d
      return;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   // InputPeer was inited, flush tmp_buf.
Packit Service a2489d
   if(InputPeer() && tmp_buf)
Packit Service a2489d
   {
Packit Service a2489d
      Ref<Buffer> saved_buf(tmp_buf.borrow());
Packit Service a2489d
      const char *b=0;
Packit Service a2489d
      int s=0;
Packit Service a2489d
      saved_buf->Get(&b,&s);
Packit Service a2489d
      if(b && s>0)
Packit Service a2489d
	 Put(b,s);
Packit Service a2489d
      if(saved_buf->Eof())
Packit Service a2489d
	 PutEOF();
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   update_timer.SetResource("cmd:status-interval",0);
Packit Service a2489d
Packit Service a2489d
   off_t oldpos = InputPeer()->GetPos();
Packit Service a2489d
   InputPeer()->Put(buf, size);
Packit Service a2489d
   InputPeer()->SetPos(oldpos);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void OutputJob::Format(const char *f,...)
Packit Service a2489d
{
Packit Service a2489d
   va_list v;
Packit Service a2489d
   va_start(v,f);
Packit Service a2489d
   const xstring& str=xstring::vformat(f,v);
Packit Service a2489d
   va_end(v);
Packit Service a2489d
   Put(str);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
/* Propagate signals down to our child processes. */
Packit Service a2489d
int OutputJob::AcceptSig(int sig)
Packit Service a2489d
{
Packit Service a2489d
   int m=MOVED;
Packit Service a2489d
   if(sig == SIGTERM || sig == SIGINT)
Packit Service a2489d
      m=WANTDIE;
Packit Service a2489d
Packit Service a2489d
   /* If we have an input copier right now, it'll contain the top filter
Packit Service a2489d
    * (which is linked to all other filters), so send it the signal. */
Packit Service a2489d
   if(input)
Packit Service a2489d
      input->AcceptSig(sig);
Packit Service a2489d
   /* Otherwise, the only filters we have running are in output_fd. */
Packit Service a2489d
   else if(output_fd)
Packit Service a2489d
      output_fd->Kill(sig);
Packit Service a2489d
   if(sig!=SIGCONT)
Packit Service a2489d
      AcceptSig(SIGCONT);
Packit Service a2489d
   return m;
Packit Service a2489d
}