Blob Blame History Raw
#! @PERL@
# Copyright (c) 2010-2012 Zmanda, Inc.  All Rights Reserved.
# Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
#
# Contact information: Carbonite Inc., 756 N Pastoria Ave
# Sunnyvale, CA 94086, USA, or: http://www.zmanda.com

use lib '@amperldir@';
use strict;
use warnings;

use Data::Dumper;

my $exit_status = 0;

package Amanda::Ambackupd::Message;
use Amanda::Message;
use vars qw( @ISA );
@ISA = qw( Amanda::Message );

sub local_message {
    my $self = shift;

    if ($self->{'code'} == 5000000) {
        return "DLE $self->{'diskname'}";
    } else {
        return "No mesage for code '$self->{'code'}'";
    }
}

##
# ClientService class

package main::ClientService;
use base 'Amanda::ClientService';

use Symbol;
use IPC::Open3;
use POSIX qw(WIFEXITED WEXITSTATUS strftime);
use FileHandle;
use XML::Simple;
use Fcntl;
use Sys::Hostname;
use JSON -convert_blessed_universally;

use Amanda::MainLoop qw( :GIOCondition );
use Amanda::MainLoop;
use Amanda::Debug qw( debug info warning );
use Amanda::Util qw( :constants );
use Amanda::Feature;
use Amanda::Constants;
use Amanda::Config qw( :init :getconf config_dir_relative );
use Amanda::Cmdline;
use Amanda::Paths;
use Amanda::Disklist;
use Amanda::Util qw( match_disk match_host quote_string );
use Amanda::Xfer qw( :constants );
use Amanda::Logfile qw( :logtype_t log_add log_add_full make_chunker_stats log_start_multiline log_end_multiline);
use Amanda::Chunker::Scribe;
use Amanda::Holding;
use Amanda::Curinfo;

# Note that this class performs its control IO synchronously.  This is adequate
# for this service, as it never receives unsolicited input from the remote
# system.

sub run {
    my $self = shift;
    my $finished_cb = \&Amanda::MainLoop::quit;

    $self->{'my_features'} = Amanda::Feature::Set->mine();
    $self->{'their_features'} = Amanda::Feature::Set->old();

    $self->setup_streams($finished_cb);
}

sub setup_streams {
    my $self = shift;
    my $finished_cb = shift;

    my $backup = 0;
    my $disk;

    # always started from amandad.
    my $req = $self->get_req();

    my $peer = $ENV{'AMANDA_AUTHENTICATED_PEER'};
    # make some sanity checks
    my $errors = [];
    if (defined $req->{'options'}{'auth'} and defined $self->amandad_auth()
	    and $req->{'options'}{'auth'} ne $self->amandad_auth()) {
	my $reqauth = $req->{'options'}{'auth'};
	my $amauth = $self->amandad_auth();
	push @$errors, "ambackup program requested auth '$reqauth', " .
		       "but amandad is using auth '$amauth'";
	$main::exit_status = 1;
    }

    if (!defined $req->{'options'}{'config'}) {
	push @$errors, "no config";
	$main::exit_status = 1;
    } else {
	$self->{'config'} = $req->{'options'}{'config'};
	config_init($CONFIG_INIT_EXPLICIT_NAME, $self->{'config'});
	my ($cfgerr_level, @cfgerr_errors) = config_errors();
	if ($cfgerr_level >= $CFGERR_ERRORS) {
	    push @$errors, "configuration errors; aborting connection";
	    $main::exit_status = 1;
	}
	Amanda::Util::finish_setup($RUNNING_AS_DUMPUSER_PREFERRED);

	# and the disklist
	my $diskfile = Amanda::Config::config_dir_relative(getconf($CNF_DISKFILE));
	$cfgerr_level = Amanda::Disklist::read_disklist('filename' => $diskfile);
	if ($cfgerr_level >= $CFGERR_ERRORS) {
	    push @$errors, "Errors processing disklist";
	    $main::exit_status = 1;
	}
    }

    # and pull out the features, if given
    if (defined($req->{'features'})) {
	$self->{'their_features'} = $req->{'features'};
    }

    my $rep = '';
    my @result_messages;
    my $hostname = $peer;
    my $cmd_line = $req->{'lines'}[1];
  if (!@$errors) {
    if (!$cmd_line) {
	push @$errors, "No command specified in REQ packet";
    } else {
	my ($cmd, $data) = split(' ', $cmd_line);
	if ($cmd eq "LISTDLE") {
	    debug("LISTDLE command for host '$hostname'");
	    my $host = Amanda::Disklist::get_host($hostname);
	    if ($host) {
		my $matched = 0;
		for my $diskname (@{$host->{'disks'}}) {
		    my $disk = Amanda::Disklist::get_disk($hostname, $diskname);
		    if ($disk and
			dumptype_getconf($disk->{'config'}, $DUMPTYPE_STRATEGY) != $DS_SKIP and
			dumptype_getconf($disk->{'config'}, $DUMPTYPE_PROGRAM) eq "APPLICATION") {
			push @result_messages, Amanda::Ambackupd::Message->new(
				source_filename => __FILE__,
				source_line     => __LINE__,
				code            => 5000000,
				severity        => $Amanda::Message::INFO,
				host            => $hostname,
				diskname        => $diskname,
				device          => $disk->{'device'});
			$rep .= "DLE " . Amanda::Util::quote_string($diskname) . "\n";
			$matched = 1;
		    }
		}
		if (!$matched) {
		    push @$errors, "No dle for host '$hostname'";
		}
	    } else {
		push @$errors, "No host '$hostname'";
	    }
	} elsif ($cmd eq "CHECK") {
	    debug("CHECK command for host '$hostname'");
	    my $host = Amanda::Disklist::get_host($hostname);
	    if ($host) {
		my $matched = 0;
		for my $diskname (@{$host->{'disks'}}) {
		    my $disk = Amanda::Disklist::get_disk($hostname, $diskname);
		    if ($disk and
			dumptype_getconf($disk->{'config'}, $DUMPTYPE_STRATEGY) != $DS_SKIP and
			dumptype_getconf($disk->{'config'}, $DUMPTYPE_PROGRAM) eq "APPLICATION") {
			if ($self->{'their_features'}->has($Amanda::Feature::fe_req_xml)) {
			    my $program = dumptype_getconf($disk->{'config'},
							   $DUMPTYPE_PROGRAM);
			    my $o = $disk->xml_optionstr();
			    $rep .= "<dle>\n";
			    $rep .= "  <program>APPLICATION</program>\n";
			    $rep .= $disk->xml_application($self->{'their_features'});
			    $rep .= $disk->xml_estimate($self->{'their_features'}) . "\n";
			    $rep .= "  " . Amanda::Util::amxml_format_tag("disk", $disk->{'name'}) . "\n";
			    if ($disk->{'device'}) {
				$rep .= "  " . Amanda::Util::amxml_format_tag("diskdevice", $disk->{'device'}) . "\n";
			    }
			    $rep .= $o;
			    $rep .= "</dle>";
			} else {
			    #$rep .= "DLE " . Amanda::Util::quote_string($diskname) . "\n";
			}
			$matched = 1;
		    }
		}
		if ($matched) {
		    $rep = "GOPTIONS features=" . $self->{'my_features'}->as_string() .
			   ";maxdumps=$host->{'maxdumps'}" .
			   ";hostname=$hostname" .
			   ";config=$req->{'options'}{'config'}\n" .
			   $rep;
		} else {
		    push @$errors, "No dle for host '$hostname'";
		}
	    } else {
		push @$errors, "No host '$hostname'";
	    }
	} elsif ($cmd eq "BACKUP") {
	    my $diskname = $data;
	    debug("BACKUP command for host: $hostname disk: $diskname");
	    $self->{'hostname'} = $hostname;
	    $self->{'diskname'} = $diskname;
	    $self->{'qdiskname'} = Amanda::Util::quote_string($diskname);
	    my $host = Amanda::Disklist::get_host($hostname);
	    if ($host) {
		my $matched = 0;
		$disk = Amanda::Disklist::get_disk($hostname, $diskname);
		if ($disk) {
		    my $program = dumptype_getconf($disk->{'config'}, $DUMPTYPE_PROGRAM);
		    if ($program eq "APPLICATION") {
			my $strategy = dumptype_getconf($disk->{'config'}, $DUMPTYPE_STRATEGY);
			if ($strategy != $DS_SKIP) {
			    if ($self->{'their_features'}->has($Amanda::Feature::fe_req_xml)) {
				my $skip_full = dumptype_getconf($disk->{'config'}, $DUMPTYPE_SKIP_FULL);
				my $skip_incr = dumptype_getconf($disk->{'config'}, $DUMPTYPE_SKIP_INCR);
				my $dumpcycle = 0+dumptype_getconf($disk->{'config'}, $DUMPTYPE_DUMPCYCLE);
				my $infodir = getconf($CNF_INFOFILE);
				my $ci = Amanda::Curinfo->new($infodir);
				my $info = $ci->get_info($hostname, $diskname);
				my $level;
debug("info " . Data::Dumper::Dumper($info));
				if (!$info) {
				    $level = 0;
				} else {
				    my $next_level0;
				    my $last_level = $info->{'last_level'};
				    $last_level = 0 if !defined $last_level;
				    $last_level += 0;
				    if ($info->isset($Amanda::Curinfo::Info::FORCE_FULL)) {
					$level = 0;
				    }
				    if ($info->isset($Amanda::Curinfo::Info::FORCE_LEVEL_1)) {
					$level = 1;
				    }
				    if ($strategy == $DS_NOFULL){
					$level = 1;
				    } elsif($strategy == $DS_INCRONLY) {
					if ($info->isset($Amanda::Curinfo::Info::FORCE_FULL)) {
					    $level = 1;
					    $self->{"remove_force_full"} = 1;
					} else {
					    $next_level0 = 1;
					}
				    } elsif ($strategy == $DS_NOINC) {
					$level = 0;
				    } else { #$strategy == $DS_STANDARD
					if (!defined $info->{'inf'}->[0]->{'date'} || $info->{'inf'}->[0]->{'date'} < 0) {
					    $next_level0 = 0;
					} else {
					    my $time0 = $info->{'inf'}->[0]->{'date'};
					    my $today = time;
					    my $days_diff = int((($today - $time0) + (86400/2)) / 86400);
					    $next_level0 = $dumpcycle - $days_diff;
					}
				    }
				    if (!defined $level) {
					if ($next_level0 > 0) {
					    if ($info->isset($Amanda::Curinfo::Info::FORCE_BUMP)) {
						$level = $last_level + 1;
					    } elsif ($info->isset($Amanda::Curinfo::Info::FORCE_NO_BUMP)) {
						$level = $last_level;
					    } else {
						# should check server estimate to find if we must bump
						$level = $last_level;
					    }
					} else {
					    $level = 0;
					}
				    }
				}
				$self->{'level'} = $level;
				if ($level == 0 && $skip_full) {
				    push @$errors, "Skipping because of SKIP-FULL";
				} elsif ($level != 0 && $skip_incr) {
				    push @$errors, "Skipping because of SKIP-INCR";
				} else {
				    my $o = $disk->xml_optionstr();
				    my $connline;
				    if ($self->{'their_features'}->has($Amanda::Feature::fe_sendbackup_stream_state)) {
					$connline = $self->connect_streams('DATA' => 'rw',
								       'MESG' => 'rw',
								       'INDEX' => 'rw',
								       'STATE' => 'rw');
				    } else {
					$connline = $self->connect_streams('DATA' => 'rw',
								       'MESG' => 'rw',
								       'INDEX' => 'rw');
				    }
				    $rep .= "$connline\n";
				    $rep .= "GOPTIONS features=" . $self->{'my_features'}->as_string() .
					";maxdumps=$host->{'maxdumps'}" .
					";hostname=$hostname" .
					";config=$req->{'options'}{'config'}\n";
				    my $dle .= "<dle>\n";
				    $dle .= "  <program>APPLICATION</program>\n";
				    $dle .= $disk->xml_application($self->{'their_features'});
				    $dle .= "  " . Amanda::Util::amxml_format_tag("disk", $disk->{'name'}) . "\n";
				    if ($disk->{'device'}) {
					$dle .= "  " . Amanda::Util::amxml_format_tag("diskdevice", $disk->{'device'}) . "\n";
				    }
				    $dle .= "  <level>$self->{'level'}</level>\n";
				    $dle .= $o;
				    $dle .= "</dle>";
				    $rep .= $dle;
				    $self->{'dle_str'} = $dle;
				    $backup = 1;
				}
			    } else {
				push @$errors, "ambackupd wortks only with newer amanda client";
			    }
			} else {
			    push @$errors, "No backup do to, strategy SKIP is set";
			}
		    } else {
			push @$errors, "ambackup works only with application, it doesn't works with GNUTAR or DUMP";
			if ($program eq "GNUTAR") {
			    push @$errors, "Use the 'amgtar' application instead og the 'GNUTAR' program";
			}
		    }
		    $matched = 1;
		}
		if (!$matched) {
		    push @$errors, "No '$diskname' dle for host '$hostname'";
		}
	    } else {
		push @$errors, "No host '$hostname'";
	    }
	} else {
	    push @$errors, "Invalid command '$cmd' specified in REQ packet";
	}
    }
  }
  if (@result_messages) {
    if (@$errors || @{$req->{'errors'}}) {
	for my $err (@$errors) {
	    $rep .= "ERROR $err\n";
	}
	for my $err (@{$req->{'errors'}}) {
	    $rep .= "ERROR $err\n";
	}
	$rep .= "\n";
    }
  } else {
    if (@$errors || @{$req->{'errors'}}) {
	for my $err (@$errors) {
	    $rep .= "ERROR $err\n";
	}
	for my $err (@{$req->{'errors'}}) {
	    $rep .= "ERROR $err\n";
	}
	$rep .= "\n";
    }
  }

  if (@result_messages) {
    my $json = JSON->new->allow_nonref->convert_blessed;
    my $reply = $json->pretty->encode(\@result_messages);;
    $self->senddata('main', $reply);
  } else {
    $self->senddata('main', $rep);
  }
    $self->close('main', 'w');

    if ($backup) {
	return $self->do_backup($rep, $disk, $finished_cb);
    } else {
	$finished_cb->();
    }
}

sub do_backup {
    my $self = shift;
    my $rep = shift;
    my $disk = shift;
    my $finished_cb = shift;
    my $file_to_close = 0;

    my $src_mesg;
    my $mesg_buf;
    my ($xfer_mesg, $xfer_src_mesg, $xfer_dest_mesg);
    my ($xfer_data, $xfer_src_data, $xfer_dest_data);
    my ($xfer_index, $xfer_src_index, $xfer_dest_index);
    my ($xfer_state, $xfer_src_state, $xfer_dest_state);
    my $xfer_compress_data;
    my $xfer_encrypt_data;
    my @now = localtime;
    my $timestamp = strftime "%Y%m%d%H%M%S", @now;
    my $trace_log_filename;
    my $amdump_log_pathname;
    my $amdump_log;
    my $logdir = getconf($CNF_LOGDIR);
    my $index;
    my $state;
    $self->{'mesg_fh'} = FileHandle->new();
    $self->{'mesg_fh'}->fdopen($self->wfd('MESG'), "w");
    $self->{'mesg_fh'}->autoflush(1);

    my $steps = define_steps
	cb_ref => \$finished_cb;

    step start => sub {
	$timestamp = Amanda::Logfile::make_logname("ambackupd", $timestamp);
	$self->{'timestamp'} = $timestamp;
	$trace_log_filename = Amanda::Logfile::get_logname();
	debug("beginning trace log: $trace_log_filename");
	log_add($L_START, "date $timestamp");
	my $hostname = hostname;
	log_add($L_STATS, "hostname $hostname");
	log_add($L_DISK, "$self->{'hostname'} $self->{'qdiskname'}");

	$self->{'hdr'} = Amanda::Header->new();
	$self->{'hdr'}->{'type'} = $Amanda::Header::F_DUMPFILE;
	$self->{'hdr'}->{'datestamp'} = $timestamp;
	$self->{'hdr'}->{'name'} = $self->{'hostname'};
	$self->{'hdr'}->{'disk'} = $self->{'diskname'};
	$self->{'hdr'}->{'blocksize'} = Amanda::Holding::DISK_BLOCK_BYTES;
	$self->{'hdr'}->{'dumplevel'} = $self->{'level'};
	#$self->{'hdr'}->{'compressed'} =
	#$self->{'hdr'}->{'encrypted'} =
	$self->{'hdr'}->{'dle_str'} = $self->{'dle_str'};

	my $mesg_handle = FileHandle->new();
	$mesg_handle->fdopen($self->rfd('MESG'), "r");

	$src_mesg = Amanda::MainLoop::fd_source($mesg_handle,
				 $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
#	$file_to_close++;
	$src_mesg->set_callback(sub {
	    my $buf;
	    my $blocksize = -1;
	    $blocksize = sysread($mesg_handle, $buf, 32768);
	    if ($blocksize) {
		$self->{'mesg'} .= $buf;
		$mesg_buf .= $buf;
		while ($mesg_buf =~ /\n/gm) {
		    my ($line, $b) = split ("\n", $mesg_buf, 2);
		    debug("mesg: $line");
		    if ($line =~ /^sendbackup: /) {
			if ($line =~ /^sendbackup: info/) {
			    if ($line eq "sendbackup: info end") {
				$steps->{'start_backup'}->();
			    } elsif ($line =~ /^sendbackup: info BACKUP=(.*)/) {
				$self->{'hdr'}->{'program'} = $1;
			    } elsif ($line =~ /^sendbackup: info APPLICATION=(.*)/) {
				$self->{'hdr'}->{'application'} = $1;
			    } elsif ($line =~ /^sendbackup: info RECOVER_CMD=(.*)/) {
				$self->{'hdr'}->{'recover_cmd'} = $1;
			    } elsif ($line =~ /^sendbackup: info COMPRESS_SUFFIX=(.*)/) {
				$self->{'hdr'}->{'comp_suffix'} = $1;
			    } elsif ($line =~ /^sendbackup: info SERVER_CUSTOM_COMPRESS=(.*)/) {
				$self->{'hdr'}->{'srvcompprog'} = $1;
			    } elsif ($line =~ /^sendbackup: info CLIENT_CUSTOM_COMPRESS=(.*)/) {
				$self->{'hdr'}->{'clntcompprog'} = $1;
			    } elsif ($line =~ /^sendbackup: info SERVER_ENCRYPT=(.*)/) {
				$self->{'hdr'}->{'srv_encrypt'} = $1;
			    } elsif ($line =~ /^sendbackup: info CLIENT_ENCRYPT=(.*)/) {
				$self->{'hdr'}->{'clnt_encrypt'} = $1;
			    } elsif ($line =~ /^sendbackup: info SERVER_DECRYPT_OPTION=(.*)/) {
				$self->{'hdr'}->{'srv_decrypt_opt'} = $1;
			    } elsif ($line =~ /^sendbackup: info CLIENT_DECRYPT_OPTION=(.*)/) {
				$self->{'hdr'}->{'clnt_decrypt_opt'} = $1;
			    }
			} elsif ($line =~ /^sendbackup: size (.*)/) {
			    $self->{'orig_size'} = 0+$1;
			    $self->{'got_size'} = 1;
			} elsif ($line =~ /^sendbackup: end/) {
			    $self->{'got_end'} = 1;
			} elsif ($line =~ /^sendbackup: native-CRC (.*)/) {
			    $self->{'native_crc'} = $1;
			} elsif ($line =~ /^sendbackup: client-CRC (.*)/) {
			    $self->{'client_crc'} = $1;
			} elsif ($line =~ /^sendbackup: start/) {
			} elsif ($line =~ /^sendbackup: no-op/) {
			} elsif ($line =~ /^sendbackup: state/) {
			    # TODO
			    die("inline state is Unsupported");
			} elsif ($line =~ /^sendbackup: retry/) {
			    $line =~ /.*delay (\d*)/;
			    $self->{'retry_delay'} = $1;
			    $line =~ /.*level (\d*)/;
			    $self->{'retry_level'} = $1;
			    $line =~ /.*message (.*)/;
			    $self->{'retry_message'} = $1;
			    if ($self->{'retry_level'}) {
				# TODO Add a command to force the level on next run
			    }
			    if ($self->{'retry_message'}) {
				print {$self->{'mesg_fh'}} "$self->{'retry_message'}\n";
			    }
			    if ($self->{'retry_delay'} &&
				$self->{'retry_level'}) {
				print {$self->{'mesg_fh'}} "Retry the backup at level $self->{'retry_level'} in $self->{'retry_delay'} seconds\n";
			    } elsif ($self->{'retry_delay'}) {
				print {$self->{'mesg_fh'}} "Retry the backup in $self->{'retry_delay'} seconds\n";
			    } elsif ($self->{'retry_level'}) {
				print {$self->{'mesg_fh'}} "Retry the backup at level $self->{'retry_level'}\n";
			    }
			} elsif ($line =~ /^sendbackup: warning/) {
			    $self->{'strange'}++;
			} elsif ($line =~ /^sendbackup: error \[(.*)\]/) {
			    $self->{'error_message'} = $1;
			    $self->{'backup_failed'} = 1;
			} elsif ($line =~ /^sendbackup: error (.*)/) {
			    $self->{'error_message'} = $1;
			    $self->{'backup_failed'} = 1;
			} else {
			    debug("XX: $line");
			}
		    } elsif ($line =~ /^\|/) {
			$self->{'normal'}++;
		    } elsif ($line =~ /^\? (.*)/) {
			$self->{'strange'}++;
			$self->{'strange_line'} = $1 if !defined $self->{'strange_line'};
		    } else {
			$self->{'error'}++;
		    }
		    $mesg_buf = $b;
		}
	    } else {
		if (!defined $blocksize) {
		    debug("read from mesg: $!");
		}
		if ($mesg_buf) {
		    debug("unprocessed MESG stream: $mesg_buf");
		}

		close $mesg_handle;
		return $steps->{'abort_mesg_close'}->() if !$self->{'got_end'};
		$steps->{'mesg_close'};
	    }
	});
    };

    step start_backup => sub {
	my $xml = XML::Simple->new();
	my $dle_xml = $xml->XMLin($self->{'dle_str'});
	$self->{'dle_xml'} = $dle_xml;
	my @data_compress;
	my @data_encrypt;

	print {$self->{'mesg_fh'}} "start backup: " . $self->{'qdiskname'} . "\n";
	$index = dumptype_getconf($disk->{'config'}, $DUMPTYPE_INDEX);

	my $compress = dumptype_getconf($disk->{'config'}, $DUMPTYPE_COMPRESS);
	if ($compress != $COMP_NONE) {
	    $self->{'hdr'}->{'compressed'} = 1;
	    if ($compress == $COMP_SERVER_CUST) {
		$self->{'hdr'}->{'srvcompprog'} = dumptype_getconf($disk->{'config'}, $DUMPTYPE_SRVCOMPPROG);
		$self->{'hdr'}->{'uncompress_cmd'} = " " . $self->{'hdr'}->{'srvcompprog'} . " -d |";
		$self->{'hdr'}->{'comp_suffix'} = "cust";
		push @data_compress, $self->{'hdr'}->{'srvcompprog'};
	    } elsif ($compress == $COMP_CUST) {
		$self->{'hdr'}->{'clntcompprog'} = dumptype_getconf($disk->{'config'}, $DUMPTYPE_CLNTCOMPPROG);
		$self->{'hdr'}->{'uncompress_cmd'} = " " . $self->{'hdr'}->{'clntcompprog'} . " -d |";
		$self->{'hdr'}->{'comp_suffix'} = "cust";
	    } elsif ($compress == $COMP_SERVER_BEST) {
		$self->{'hdr'}->{'uncompress_cmd'} = " $Amanda::Constants::UNCOMPRESS_PATH $Amanda::Constants::UNCOMPRESS_OPT |";
		$self->{'hdr'}->{'comp_suffix'} = $Amanda::Constants::COMPRESS_SUFFIX;
		push @data_compress, $Amanda::Constants::COMPRESS_PATH, $Amanda::Constants::COMPRESS_BEST_OPT;
	    } elsif ($compress == $COMP_SERVER_FAST) {
		$self->{'hdr'}->{'uncompress_cmd'} = " $Amanda::Constants::UNCOMPRESS_PATH $Amanda::Constants::UNCOMPRESS_OPT |";
		$self->{'hdr'}->{'comp_suffix'} = $Amanda::Constants::COMPRESS_SUFFIX;
		push @data_compress, $Amanda::Constants::COMPRESS_PATH, $Amanda::Constants::COMPRESS_FAST_OPT;
	    }
	} elsif ($self->{'hdr'}->{'comp_suffix'}) {
	    $self->{'hdr'}->{'compressed'} = 1;
	} else {
	    $self->{'hdr'}->{'compressed'} = 0;
	    $self->{'hdr'}->{'comp_suffix'} = "N";
	}

	my $encrypt = dumptype_getconf($disk->{'config'}, $DUMPTYPE_ENCRYPT);
	if ($encrypt != $ENCRYPT_NONE) {
	    $self->{'hdr'}->{'encrypted'} = 1;
	    $self->{'hdr'}->{'encrypt_suffix'} = "enc";
	    if ($encrypt ==  $ENCRYPT_SERV_CUST) {
		$self->{'hdr'}->{'srv_encrypt'} = dumptype_getconf($disk->{'config'}, $DUMPTYPE_SRV_ENCRYPT);
		$self->{'hdr'}->{'srv_decrypt_opt'} = dumptype_getconf($disk->{'config'}, $DUMPTYPE_SRV_DECRYPT_OPT);
		$self->{'hdr'}->{'decrypt_cmd'} = " " . $self->{'hdr'}->{'srv_encrypt'} . " " . $self->{'hdr'}->{'srv_decrypt_opt'} . " |";
		push @data_encrypt, $self->{'hdr'}->{'srv_encrypt'};
	    } elsif ($encrypt == $ENCRYPT_CUST) {
		$self->{'hdr'}->{'clnt_encrypt'} = dumptype_getconf($disk->{'config'}, $DUMPTYPE_CLNT_ENCRYPT);
		$self->{'hdr'}->{'clnt_decrypt_opt'} = dumptype_getconf($disk->{'config'}, $DUMPTYPE_CLNT_DECRYPT_OPT);
		$self->{'hdr'}->{'decrypt_cmd'} = " " . $self->{'hdr'}->{'clnt_encrypt'} . " " . $self->{'hdr'}->{'clnt_decrypt_opt'} . " |";
	    }
	} else {
	    $self->{'hdr'}->{'encrypted'} = 0;
	    $self->{'hdr'}->{'encrypt_suffix'} = "N";
	}
	$self->write_header_to_index();

	$xfer_src_data = Amanda::Xfer::Source::Fd->new($self->rfd('DATA'));
	POSIX::close($self->rfd('DATA'));
	$self->{'chunker_scribe'} = Amanda::Chunker::Scribe->new(
				feedback => $self,
				debug => $Amanda::Config::debug_chunker);
	$self->{'chunker_scribe'}->start(write_timestamp => $self->{'timestamp'});
	my @xfer_link_data;
	push @xfer_link_data, $xfer_src_data;

	if (@data_compress) {
	    $xfer_compress_data = Amanda::Xfer::Filter::Process->new(\@data_compress, 0, 0, 0, 0);
	    push @xfer_link_data, $xfer_compress_data;
	}

	if (@data_encrypt) {
	    $xfer_encrypt_data = Amanda::Xfer::Filter::Process->new(\@data_encrypt, 0, 0, 0, 0);
	    push @xfer_link_data, $xfer_encrypt_data;
	}

	$xfer_dest_data = $self->{'chunker_scribe'}->get_xfer_dest(max_memory => 1048576);
	push @xfer_link_data, $xfer_dest_data;
	$xfer_data = Amanda::Xfer->new(\@xfer_link_data);

	my $xfer_compress_index;
	if ($index) {
	    if (getconf($CNF_COMPRESS_INDEX)) {
		$self->{'index_filename'} = Amanda::Logfile::getindex_unsorted_gz_fname(
						$self->{'hostname'},
						$self->{'diskname'},
						$self->{'timestamp'},
						$self->{'hdr'}->{'dumplevel'});
	    } else {
		$self->{'index_filename'} = Amanda::Logfile::getindex_unsorted_fname(
						$self->{'hostname'},
						$self->{'diskname'},
						$self->{'timestamp'},
						$self->{'hdr'}->{'dumplevel'});
	    }
	    $self->{'index_filename_tmp'} = $self->{'index_filename'} . ".tmp";
	    open(INDEX, ">$self->{'index_filename_tmp'}") || die("C");
	    $xfer_src_index = Amanda::Xfer::Source::Fd->new($self->rfd('INDEX'));
	    POSIX::close($self->rfd('INDEX'));
	    $xfer_dest_index = Amanda::Xfer::Dest::Fd->new(fileno(INDEX));
	    if (getconf($CNF_COMPRESS_INDEX)) {
		$xfer_compress_index = Amanda::Xfer::Filter::Process->new(
			[$Amanda::Constants::COMPRESS_PATH,
			 $Amanda::Constants::COMPRESS_BEST_OPT],
			 0, 0, 0, 0);
		$xfer_index = Amanda::Xfer->new([$xfer_src_index, $xfer_compress_index, $xfer_dest_index]);
	    } else {
		$xfer_index = Amanda::Xfer->new([$xfer_src_index, $xfer_dest_index]);
	    }
	}

	my $xfer_compress_state;
	if ($self->{'their_features'}->has($Amanda::Feature::fe_sendbackup_stream_state)) {
	    $self->{'state_filename'} = Amanda::Logfile::getstatefname(
						$self->{'hostname'},
						$self->{'diskname'},
						$self->{'timestamp'},
						$self->{'hdr'}->{'dumplevel'});
	    $self->{'state_filename_gz'} = $self->{'state_filename'} . $Amanda::Constants::COMPRESS_SUFFIX;
	    open(STATE, ">$self->{'state_filename_gz'}") || die("C");
	    $xfer_src_state = Amanda::Xfer::Source::Fd->new($self->rfd('STATE'));
	    POSIX::close($self->rfd('STATE'));
	    $xfer_compress_state = Amanda::Xfer::Filter::Process->new(
		[$Amanda::Constants::COMPRESS_PATH,
		 $Amanda::Constants::COMPRESS_BEST_OPT],
		0, 0, 0, 0);
	    $xfer_dest_state = Amanda::Xfer::Dest::Fd->new(fileno(STATE));
	    $xfer_state = Amanda::Xfer->new([$xfer_src_state, $xfer_compress_state, $xfer_dest_state]);
	}


	$file_to_close++;
	$xfer_data->start(sub {
	    my ($src, $msg, $xfer) = @_;

	    $self->{'chunker_scribe'}->handle_xmsg($src, $msg, $xfer);
	    if ($msg->{'type'} == $XMSG_DONE) {
		$file_to_close--;
		return $steps->{'backup_done'}->() if $file_to_close == 0;
	    } elsif ($msg->{'type'} == $XMSG_ERROR) {
		$self->{'backup_failed'} = 1;
		$steps->{'add_to_mesg'}("$msg->{'message'}");
	    } elsif ($msg->{'type'} == $XMSG_CANCEL) {
		$self->{'backup_failed'} = 1;
	    } elsif ($msg->{'type'} == $XMSG_CRC) {
		if ($msg->{'elt'} == $xfer_dest_data) {
		    $self->{'server_crc'} = $msg->{'crc'}.":".$msg->{'size'};
		}
	    }
	});

	if ($index) {
	    if (defined $xfer_compress_index && $xfer_compress_index->can('get_stderr_fd')) {
		$file_to_close++;
		my $fd = $xfer_compress_index->get_stderr_fd();
		$fd.="";
		$fd = int($fd);
		my $src = Amanda::MainLoop::fd_source($fd,
						$G_IO_IN|$G_IO_HUP|$G_IO_ERR);
		$src->set_callback( sub {
		    my $b;
		    my $n_read = POSIX::read($fd, $b, 1);
		    if (!defined $n_read) {
			return;
		    } elsif ($n_read == 0) {
			$src->remove();
			POSIX::close($fd);
			$file_to_close--;
			return $steps->{'backup_done'}->() if $file_to_close == 0;
		    } else {
			$self->{'index_buffer'} .= $b;
			if ($b eq "\n") {
			    my $line = $self->{'index_buffer'};
			    chomp $line;
			    if (length($line) > 1) {
				$steps->{'add_to_mesg'}("index compression stderr: $line");
				debug("index compression stderr: $line");
			    }
			    $self->{'index_buffer'} = "";
			}
		    }
		});
	    }

	    $file_to_close++;
	    $xfer_index->start(sub {
		my ($src, $msg, $xfer) = @_;

		if ($msg->{'type'} == $XMSG_DONE) {
		    $file_to_close--;
		    return $steps->{'backup_done'}->() if $file_to_close == 0;
		}
	    });
	}
	if ($self->{'their_features'}->has($Amanda::Feature::fe_sendbackup_stream_state)) {
	    if (defined $xfer_compress_state && $xfer_compress_state->can('get_stderr_fd')) {
		$file_to_close++;
		my $fd = $xfer_compress_state->get_stderr_fd();
		$fd.="";
		$fd = int($fd);
		my $src = Amanda::MainLoop::fd_source($fd,
						$G_IO_IN|$G_IO_HUP|$G_IO_ERR);
		$src->set_callback( sub {
		    my $b;
		    my $n_read = POSIX::read($fd, $b, 1);
		    if (!defined $n_read) {
			return;
		    } elsif ($n_read == 0) {
			$src->remove();
			POSIX::close($fd);
			$file_to_close--;
			return $steps->{'backup_done'}->() if $file_to_close == 0;
		    } else {
			$self->{'state_buffer'} .= $b;
			if ($b eq "\n") {
			    my $line = $self->{'state_buffer'};
			    chomp $line;
			    if (length($line) > 1) {
				$steps->{'add_to_mesg'}("state compression stderr: $line");
				debug("state compression stderr: $line");
			    }
			    $self->{'state_buffer'} = "";
			}
		    }
		});
	    }

	    $file_to_close++;
	    $xfer_state->start(sub {
		my ($src, $msg, $xfer) = @_;

		if ($msg->{'type'} == $XMSG_DONE) {
		    $file_to_close--;
		    # empty compressed file have a sizeof 20
		    if (-s $self->{'state_filename_gz'} == 20) {
			unlink($self->{'state_filename_gz'});
		    }
		    return $steps->{'backup_done'}->() if $file_to_close == 0;
		}
	    });
	}

	if ($xfer_compress_data) {
	    if ($xfer_compress_data->can('get_stderr_fd')) {
		$file_to_close++;
		my $fd = $xfer_compress_data->get_stderr_fd();
		$fd.="";
		$fd = int($fd);
		my $src = Amanda::MainLoop::fd_source($fd,
						$G_IO_IN|$G_IO_HUP|$G_IO_ERR);
		$src->set_callback( sub {
		    my $b;
		    my $n_read = POSIX::read($fd, $b, 1);
		    if (!defined $n_read) {
			return;
		    } elsif ($n_read == 0) {
			$src->remove();
			POSIX::close($fd);
			$file_to_close--;
			return $steps->{'backup_done'}->() if $file_to_close == 0;
		    } else {
			$self->{'compress_buffer'} .= $b;
			if ($b eq "\n") {
			    my $line = $self->{'compress_buffer'};
			    chomp $line;
			    if (length($line) > 1) {
				$steps->{'add_to_mesg'}("server data compression stderr: $line");
				debug("server data compression stderr: $line");
			    }
			    $self->{'compress_buffer'} = "";
			}
		    }
		});
	    }
	}

	if ($xfer_encrypt_data) {
	    if ($xfer_encrypt_data->can('get_stderr_fd')) {
		$file_to_close++;
		my $fd = $xfer_encrypt_data->get_stderr_fd();
		$fd.="";
		$fd = int($fd);
		my $src = Amanda::MainLoop::fd_source($fd,
						$G_IO_IN|$G_IO_HUP|$G_IO_ERR);
		$src->set_callback( sub {
		    my $b;
		    my $n_read = POSIX::read($fd, $b, 1);
		    if (!defined $n_read) {
			return;
		    } elsif ($n_read == 0) {
			$src->remove();
			POSIX::close($fd);
			$file_to_close--;
			return $steps->{'backup_done'}->() if $file_to_close == 0;
		    } else {
			$self->{'encrypt_buffer'} .= $b;
			if ($b eq "\n") {
			    my $line = $self->{'encrypt_buffer'};
			    chomp $line;
			    if (length($line) > 1) {
				$steps->{'add_to_mesg'}->("server data encryption stderr: $line");
				debug("server data encryption stderr: $line");
			    }
			    $self->{'encrypt_buffer'} = "";
			}
		    }
		});
	    }
	}

	$self->{'first_holding_file'} = $self->{'holding_file'} = $self->get_holding_file();
	$self->{'chunker_scribe'}->start_dump(
		xfer        => $xfer_data,
		dump_header => $self->{'hdr'},
		dump_cb     => $steps->{'dump_cb'},
		filename    => $self->{'holding_file'},
		use_bytes   => $self->{'hdisk'}->{'avail'} * 1024,
		chunk_size  => $self->{'hdisk'}->{'chunksize'} * 1024);
    };

    step backup_done => sub {
	$self->{'backup_done'} = 1;
	$steps->{'backup_done1'}->();
    };

    step backup_done1 => sub {

	if ($self->{'backup_failed'}) {
	    $steps->{'remove'}->();
	} else {
	    return if !$self->{'backup_done'} || !$self->{'got_end'};
	    $steps->{'backup_good'}->();
	}
    };

    step backup_good => sub {
	$self->write_header_to_index();
	if ($index) {
	    rename $self->{'index_filename_tmp'}, $self->{'index_filename'};
	}

	Amanda::Holding::rename_tmp($self->{'first_holding_file'}, 1);
	$self->rewrite_holding_header();

	# set infofile
	my $infodir = getconf($CNF_INFOFILE);
	my $ci = Amanda::Curinfo->new($infodir);
	my $info = $ci->get_info($self->{'hostname'}, $self->{'diskname'});
	$info->update_dumper($self->{'orig_size'}, $self->{'dump_size'}, $self->{'dump_time'}, $self->{'hdr'}->{'dumplevel'}, $self->{'timestamp'});
	$ci->put_info($self->{'hostname'}, $self->{'diskname'}, $info);
	$self->{'dump_time'} = 1 if !$self->{'dump_time'};
	my $kb = $self->{'dump_size'} / 1024;
	my $time = $self->{'dump_time'};
	my $kps = "$kb" / $time;
	if ($self->{'strange'}) {
	    print {$self->{'mesg_fh'}} "backup strange " . Amanda::Util::quote_string($self->{'diskname'}) . ": " . $self->{'strange_line'} . "\n";
	    log_start_multiline();
	    log_add_full($L_STRANGE, "dumper", "$self->{'hostname'} $self->{'qdiskname'} $self->{'hdr'}->{'dumplevel'} $self->{'native_crc'} $self->{'client_crc'} [sec $self->{'dump_time'} kb $kb kps $kps orig-kb $self->{'orig_size'}]");
	    my @lines = split /\n/, $self->{'mesg'};
	    foreach my $line (@lines) {
		log_add_full($L_STRANGE, "dumper", $line);
	    }
	    log_end_multiline();
	} else {
	    print {$self->{'mesg_fh'}} "backup done: " . Amanda::Util::quote_string($self->{'diskname'}) . "\n";
	    log_add_full($L_SUCCESS, "dumper", "$self->{'hostname'} $self->{'qdiskname'} $self->{'timestamp'} $self->{'hdr'}->{'dumplevel'} $self->{'native_crc'} $self->{'client_crc'} [sec $self->{'dump_time'} kb $kb kps $kps orig-kb $self->{'orig_size'}]");
	}
	log_add_full($L_SUCCESS, "chunker", "$self->{'hostname'} $self->{'qdiskname'} $self->{'timestamp'} $self->{'hdr'}->{'dumplevel'} $self->{'server_crc'} [sec $self->{'dump_time'} kb $kb kps $kps]");
	$steps->{'quit'}->();
	print {$self->{'mesg_fh'}} "MESG END\n";
	close $self->{'mesg_fh'};
    };

    step dump_cb => sub {
	my %params = @_;

	debug("dump_cb: " . Data::Dumper::Dumper(\%params));
	if ($params{'result'} eq 'DONE') {
	    $self->{'dump_size'} = $params{'data_size'};
	    $self->{'dump_time'} = $params{'total_duration'};
	} elsif ($params{'result'} eq 'PARTIAL') {
	} elsif ($params{'result'} eq 'FAILED') {
	    $self->{'chunker_failed'} = "CHUNKER ERROR MESSAGE" if defined $self->{'chunker_failed'};  #JLM
	}
    };

    step abort_mesg_close => sub {
	debug("mesg closed prematurely");
	$steps->{'remove'}->();
    };

    step mesg_close => sub {
	debug("mesg closed");
	if ($self->{'got_end'} and !$self->{'backup_failed'}) {
	    $steps->{'backup_done1'}->();
	} else {
	    $steps->{'remove'}->();
	}
    };

    step remove => sub {
	# remove index file
	unlink $self->{'index_filename_tmp'};

	# remove header in index
	my $header_filename = Amanda::Logfile::getheaderfname(
						$self->{'hostname'},
						$self->{'diskname'},
						$self->{'timestamp'},
						$self->{'hdr'}->{'dumplevel'});
	unlink $header_filename;

	# holding files
	Amanda::Holding::filetmp_unlink($self->{'first_holding_file'}.".tmp");
	Amanda::Holding::dir_unlink(); # should handle 'pid' files.

	log_start_multiline();
	log_add_full($L_FAIL, "dumper", "$self->{'hostname'} $self->{'qdiskname'} $self->{'timestamp'} $self->{'hdr'}->{'dumplevel'} [$self->{'error_message'}]");
	my @lines = split /\n/, $self->{'mesg'};
	foreach my $line (@lines) {
	    log_add_full($L_STRANGE, "dumper", $line);
	}
	log_end_multiline();

	$self->{'dump_time'} = 1 if !$self->{'dump_time'};
	my $kb = $self->{'dump_size'} / 1024;
	my $time = $self->{'dump_time'};
	my $kps = "$kb" / $time;
	if (defined $self->{'chunker_failed'}) {
	    log_add_full($L_FAIL, "chunker", sprintf("%s %s %s %s %s",
		quote_string($self->{'hostname'}.""), # " is required for SWIG..
		$self->{'qdiskname'},
		$self->{'timestamp'},
		$self->{'hdr'}->{'dumplevel'},
		"[$self->{'chunker_failed'}]"));
	} else {
	    log_add_full($L_SUCCESS, "chunker",
		sprintf("%s%s %s %s %s [sec %s kb %s kps %s]",
			quote_string($self->{'hostname'}.""),
			$self->{'qdiskname'},
			$self->{'timestamp'},
			$self->{'hdr'}->{'dumplevel'},
			$self->{'server_crc'},
			$self->{'dump_time'},
			$kb,
			$kps));
	}

	$exit_status = 1;
	my $errmsg = $self->{'error_message'};
	$errmsg .= ", ".$self->{'chunker_failed'} if defined $self->{'chunker_failed'};
	print {$self->{'mesg_fh'}} "backup failed: " . Amanda::Util::quote_string($self->{'diskname'}) . ": " . $errmsg . "\n";
	print {$self->{'mesg_fh'}} "MESG END\n";
	close $self->{'mesg_fh'};
	$steps->{'quit'}->();
    };

    step quit => sub {
	log_add($L_FINISH, "date $self->{'timestamp'} time $self->{'dump_time'}");
	log_add($L_INFO, "pid-done $$");
	my @report = ("$sbindir/amreport", $self->{'config'}, '--from-amdump', '-l', $trace_log_filename);
	system(@report);
	$finished_cb->();
    };

    step add_to_mesg => sub {
	my $msg = shift;

	debug("add_to_mesg: $msg");
	print {$self->{'mesg_fh'}} "$msg\n";
    };

}

sub get_holding_file {
    my $self = shift;
    debug("get_holding_file");

    if (!$self->{'hdisks'}) {
	for my $hdname (@{getconf($CNF_HOLDINGDISK)}) {
	    my $cfg = lookup_holdingdisk($hdname);
	    next unless defined $cfg;

	    my $dir = holdingdisk_getconf($cfg, $HOLDING_DISKDIR);
	    next unless defined $dir;
	    next unless -d $dir;

	    my $hdisk;
	    $hdisk->{'dir'} = $dir;
	    $hdisk->{'disksize'} = holdingdisk_getconf($cfg, $HOLDING_DISKSIZE);
	    $hdisk->{'chunksize'} = holdingdisk_getconf($cfg, $HOLDING_CHUNKSIZE);

	    $hdisk->{'avail'} = Amanda::Util::get_fsusage($dir);
	    if ($hdisk->{'disksize'} >= 0) {
		if ($hdisk->{'disksize'} < $hdisk->{'avail'}) {
		    $hdisk->{'avail'} = $hdisk->{'disksize'};
		}
	    } else {
		$hdisk->{'avail'} += $hdisk->{'disksize'};
	    }

	    push @{$self->{'hdisks'}}, $hdisk;
	 }
    }
    debug("hdisks: " . Data::Dumper::Dumper($self->{'hdisks'}));

    foreach my $hdisk (@{$self->{'hdisks'}}) {
	if ($hdisk->{'avail'} > 0) {
	    $self->{'hdisk'} = $hdisk;
	    return $hdisk->{'dir'} . "/" .
		   $self->{'timestamp'} . "/" .
		   $self->{'hostname'} . "." .
		   Amanda::Util::sanitise_filename($self->{'diskname'}) . "." .
		   $self->{'hdr'}->{'dumplevel'};
	}
    }
    delete $self->{'hdisk'};
    return undef;
}

##
## Chunker Scribe feedback

sub request_more_disk {
    my $self = shift;
    debug("request_more_disk");

    $self->{'hdisk'}->{'avail'} = 0;
    $self->{'holding_file'} = $self->get_holding_file();

    if ($self->{'holding_file'}) {
	$self->{'chunker_scribe'}->continue_chunk(
				filename => $self->{'holding_file'},
				use_bytes => $self->{'hdisk'}->{'avail'} * 1024,
				chunk_size => $self->{'hdisk'}->{'chunksize'} * 1024);
    } else {
	$self->{'chunker_failed'} = "No more holding disk space";
	debug($self->{'chunker_failed'});
	print {$self->{'mesg_fh'}} "$self->{'chunker_failed'}\n";
	$self->{'error_message'} = "chunker failed" if !defined $self->{'error_message'};
	$self->{'backup_failed'} = 1;
	my $quit_cb = sub {
	};
	$self->{'chunker_scribe'}->quit(finished_cb => $quit_cb);
    }
}

sub notify_no_room {
    my $self = shift;
    my $use_bytes = shift;
    my $mesg = shift;

    debug("notify_no_room $use_bytes");
    $self->{'hdisk'}->{'avail'} = 0;
}

sub scribe_notif_log_info {
    my $self = shift;
    my %params = @_;

    log_add_full($L_INFO, "chunker", $params{'message'});
    debug("scribe_notif_log_info: $params{'message'}");
}

sub scribe_notif_done {
}

sub rewrite_holding_header {
    my $self = shift;

    my $holding_fh;
    open $holding_fh, "+<$self->{'first_holding_file'}";
    my $block;
    sysread($holding_fh, $block, 31768);
    my $hdr = Amanda::Header->new($block);

    $hdr->{'orig_size'} = $self->{'orig_size'};
    $hdr->{'native_crc'} = $self->{'native_crc'};
    $hdr->{'client_crc'} = $self->{'client_crc'};
    $hdr->{'server_crc'} = $self->{'server_crc'};

    $block = $hdr->to_string(0, 32768);
    seek $holding_fh, 0, Fcntl::SEEK_SET;
    print $holding_fh $block;
    close $holding_fh;
}

sub write_header_to_index {
    my $self = shift;

    my $header_filename = Amanda::Logfile::getheaderfname($self->{'hostname'},
					     $self->{'diskname'},
					     $self->{'timestamp'},
					     $self->{'hdr'}->{'dumplevel'});
    my $header_fh;
    open $header_fh, ">$header_filename";
    my $block = $self->{'hdr'}->to_string(0, 32768);
    print $header_fh $block;
    close $header_fh;
}


sub get_req {
    my $self = shift;

    my $req_str = '';
    while (1) {
	my $buf = Amanda::Util::full_read($self->rfd('main'), 1024);
	last unless $buf;
	$req_str .= $buf;
    }
    # we've read main to EOF, so close it
    $self->close('main', 'r');

    return $self->{'req'} = $self->parse_req($req_str);
}


# helper function to write a data to a stream.  This does not add newline characters.
sub senddata {
    my $self = shift;
    my ($stream, $data) = @_;
    my $fd = $self->wfd($stream);

    Amanda::Util::full_write($fd, $data, length($data))
	    or die "writing to $stream: $!";
}

# send a line on the control stream, or just log it if the ctl stream is gone;
# async callback is just like for senddata
sub sendctlline {
    my $self = shift;
    my ($msg) = @_;

    if ($self->{'ctl_stream'}) {
	debug("CTL >> $msg");
	return $self->senddata($self->{'ctl_stream'}, $msg . "\n");
    } else {
	debug("not sending CTL message as CTL is closed >> $msg");
    }
}

##
# main driver

package main;
use Amanda::Debug qw( debug );
use Amanda::Util qw( :constants );
use Amanda::Config qw( :init );
use Getopt::Long;

my $fd1_in;
my $fd1_out;
my $fd2_in;
my $fd2_out;
my $fd3_in;
my $fd3_out;

Amanda::Util::setup_application("ambackupd", "server", $CONTEXT_DAEMON, "amanda", "amanda");
config_init(0, undef);

Amanda::Debug::debug_dup_stderr_to_debug();

debug("Arguments: " . join(' ', @ARGV));
Getopt::Long::Configure(qw(bundling));
GetOptions(
    'version'   => \&Amanda::Util::version_opt,
    'fd1-in=s'  => \$fd1_in,
    'fd1-out=s' => \$fd1_out,
    'fd2-in=s'  => \$fd2_in,
    'fd2-out=s' => \$fd2_out,
    'fd3-in=s'  => \$fd3_in,
    'fd3-out=s' => \$fd3_out
) or usage();

sub main {

    my $cs = main::ClientService->new();

    $cs->run();
}

Amanda::MainLoop::call_later(\&main);
Amanda::MainLoop::run();
debug("exiting with $exit_status");
Amanda::Util::finish_application();
exit($exit_status);