Blob Blame History Raw
# Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
# Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
#
# Contact information: Zmanda Inc., 465 S Mathlida Ave, Suite 300
# Sunnyvale, CA 94086, USA, or: http://www.zmanda.com

use strict;
use warnings;

=head1 PROTOCOl
  server (amfetchdump)	       client (restore)

                            <= FEATURES string
  TARGET target	 =>
  HEADER-SEND-SIZE size	 =>				# fe_restore_header_send_size
			    <= HEADER-READY		# fe_restore_header_ready
  header (size:data)	 =>
			    <= HEADER-DONE		# fe_restore_header_done
			       run application support command

  INCLUDE-SEND size	 =>				# fe_restore_include
			    <= INCLUDE-READY		# fe_restore_include
  include (size:data)					# fe_restore_include
			    <= INCLUDE-DONE		# fe_restore_include
  INCLUDE-GLOB-SEND size =>				# fe_restore_include_glob
			    <= INCLUDE-GLOB-READY	# fe_restore_include_glob
  include-glob (size:data)				# fe_restore_include_glob
			    <= INCLUDE-GLOB-DONE	# fe_restore_include_glob
  EXCLUDE-SEND size	 =>				# fe_restore_exclude
			    <= EXCLUDE-READY		# fe_restore_exclude
  exclude (size:data)					# fe_restore_exclude
			    <= EXCLUDE-DONE		# fe_restore_exclude
  EXCLUDE-GLOB-SEND size =>				# fe_restore_exclude_glob
			    <= EXCLUDE-GLOB-READY	# fe_restore_exclude_glob
  exclude-glob (size:data)				# fe_restore_exclude_glob
			    <= EXCLUDE-GLOB-DONE	# fe_restore_exclude_glob
  INCLUDE-EXCLUDE-DONE   =>
  PREV-NEXT-LEVEL p n	 =>				# fe_restore_prev_next_level

  STATE-SEND		 =>				# fe_restore_state_send
			    <= STATE-READY		# fe_restore_state_ready
  statefile (EOF:state)	 =>
			    <= STATE-DONE		# fe_restore_state_done
  USE-DAR (YES|NO)	 =>				# fe_restore_dar
			    <= USE-DAR (YES|NO)		# fe_restore_dar
  AVAIL-DATAPATH AMANDA	 =>				# fe_restore_datapath
			    <= USE-DATAPATH AMANDA	# fe_restore_datapath
  DATAPATH-OK		 =>				# fe_restore_datapath
  DATA-SEND		 =>				# fe_restore_data_send
			       run application restore command
			    <= DATA-READY		# fe_restore_data_ready
			    <= DAR x:y			# fe_restore_dar and USE-DAR is YES
  data (EOF:data)	 =>
			    <= DAR-DONE			# fe_restore_dar and USE-DAR is YES
			    #<= DATA-DONE		# fe_restore_data_done

=head1 FEATURES

=cut

package Amanda::Service::Restore;

##
# ClientService class

use vars qw( @ISA );
use Amanda::ClientService;
use Amanda::Recovery::Clerk;

use parent -norequire, qw( Amanda::ClientService Amanda::Recovery::Clerk::Feedback);

use Sys::Hostname;
use IPC::Open2;
use JSON -convert_blessed_universally;

use Amanda::Debug qw( debug info warning );
use Amanda::Paths;
use Amanda::MainLoop qw( :GIOCondition );
use Amanda::Util qw( :constants match_disk match_host );
use Amanda::Feature;
use Amanda::Config qw( :init :getconf );
use Amanda::Storage;
use Amanda::Changer;
use Amanda::Xfer qw( :constants );
use Amanda::Cmdline;
use Amanda::Recovery::Clerk;
use Amanda::Disklist;
use Amanda::Restore;
use Amanda::FetchDump;  # for Amanda::FetchDump::Message

# Note that this class performs its control IO synchronously.  This is adequate
# for this service, as it never receives unsolicited input from the remote
# system.

sub run {
    my $self = shift;

    $self->{'my_features'} = Amanda::Feature::Set->mine();
    $self->{'their_features'} = Amanda::Feature::Set->old();
    $self->{'all_filter'} = {};

    $self->setup_streams();
}

sub set_feedback {
}

sub user_message {
    my $self = shift;
    my $message = shift;

    debug("user_message feedback: $message");
    $self->sendmessage($message);
}

sub setup_streams {
    my $self = shift;

    my $req = $self->get_req();

    # make some sanity checks
    my $errors = [];
    if (defined $req->{'options'}{'auth'} and defined $self->amandad_auth()
		and $req->{'options'}{'auth'} ne $self->amandad_auth()) {
	my $reqauth = $req->{'options'}{'auth'};
	my $amauth = $self->amandad_auth();
	push @$errors, "recover program requested auth '$reqauth', " .
		       "but amandad is using auth '$amauth'";
	$main::exit_status = 1;
    }

    # and pull out the features, if given
    if (defined($req->{'options'}{'features'})) {
	$self->{'their_features'} = Amanda::Feature::Set->from_string($req->{'options'}{'features'});
    }

    if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_state_stream)) {
	$self->send_rep(['CTL' => 'rw', 'DATA' => 'r', 'MESG' => 'w', 'STATE' => 'r'], $errors);
	$self->{'state_stream' } = 'STATE';
    } else {
	$self->send_rep(['CTL' => 'rw', 'DATA' => 'r', 'MESG' => 'w'], $errors);
    }

    return $self->quit() if (@$errors);

    $self->{'ctl_stream' } = 'CTL';
    $self->{'data_stream'} = 'DATA';
    $self->{'mesg_stream'} = 'MESG';

    if ($req->{'options'}->{'config'}) {
	config_init($CONFIG_INIT_CLIENT | $CONFIG_INIT_EXPLICIT_NAME | $CONFIG_INIT_OVERLAY,
                            $req->{'options'}->{'config'});
	my ($cfgerr_level, @cfgerr_errors) = config_errors();
	if ($cfgerr_level >= $CFGERR_ERRORS) {
	    die "configuration errors; aborting connection";
	}
	Amanda::Util::finish_setup($RUNNING_AS_DUMPUSER_PREFERRED);
    }

    $self->sendctlline("FEATURES " . $self->{'my_features'}->as_string() . "\r\n");
    $self->read_command();
}

sub read_command {
    my $self = shift;
    my $ctl_stream = $self->{'ctl_stream'};
    my $command = $self->{'command'} = {};

    my $line = $self->getline($ctl_stream);
    $line =~ s/\r?\n$//g;
    if ($line !~ /^TARGET (.*)/) {
	chomp $line;
	chop $line;
	$self->user_message(Amanda::FetchDump::Message->new(
			source_filename => __FILE__,
			source_line     => __LINE__,
			code            => 3300064,
			severity        => $Amanda::Message::ERROR,
			expect          => "TARGET",
			line            => $line));
	exit 1;
    }
    $self->{'target'} = Amanda::Util::unquote_string($1);

    ($self->{'restore'}, my $result_message) = Amanda::Restore->new();
    $self->{'restore'}->restore(
		'source-fd'	 => $self->rfd($self->{'data_stream'}),
		'target'	 => $self->{'target'},
		'extract'	 => 1,
		'decompress'     => 1,
		'decrypt'        => 1,
		'feedback'	 => $self,
		'their_features' => $self->{'their_features'},
		'finished_cb'	 => sub {
					$main::exit_status = shift;
					$self->quit();
				       });

    return;
}

sub get_header {
    my $self = shift;

    $self->{'header-size'} = 32768;

    if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_header_send_size)) {
	my $line = $self->getline('CTL');
	$line =~ s/\r?\n$//g;
	if ($line !~ /^HEADER-SEND-SIZE (.*)/) {
	    chomp $line;
	    chop $line;
	    return Amanda::FetchDump::Message->new(
			source_filename => __FILE__,
			source_line     => __LINE__,
			code            => 3300064,
			severity        => $Amanda::Message::ERROR,
			expect          => "HEADER-SEND-SIZE",
			line            => $line);
	}
	$self->{'header-size'} = 0 + $1;
    }
    if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_header_ready)) {
	$self->sendctlline("HEADER-READY\r\n");
    }

    # read header from DATA
    my $header = Amanda::Util::full_read($self->rfd($self->{'data_stream'}), $self->{'header-size'});

    # print HEADER-DONE to CTL
    $self->sendctlline("HEADER-DONE\r\n");

    # parse header
    $self->{'hdr'} = Amanda::Header->from_string($header);

    if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_include) ||
	$self->{'their_features'}->has($Amanda::Feature::fe_restore_include_glob) ||
	$self->{'their_features'}->has($Amanda::Feature::fe_restore_exclude) ||
	$self->{'their_features'}->has($Amanda::Feature::fe_restore_exclude_glob)) {

	my $line = $self->getline('CTL');
	$line =~ s/\r?\n$//g;
	while ($line ne "INCLUDE-EXCLUDE-DONE") {
	    if ($line =~ /^INCLUDE-SEND (\d*)$/) {
		my $size = $1;
		$self->sendctlline("INCLUDE-READY\r\n");
		my $include = Amanda::Util::full_read($self->rfd($self->{'data_stream'}), $size);
		$self->{'include-name'} = "$AMANDA_TMPDIR/restore-include-$$";
		$self->{'include-list'} =  [ $self->{'include-name'} ];
		open INCLUDE, ">$self->{'include-name'}";
		print INCLUDE $include;
		close INCLUDE;
		$self->sendctlline("INCLUDE-DONE\r\n");
	    } elsif ($line =~ /^INCLUDE-GLOB-SEND (\d*)$/) {
		my $size = $1;
		$self->sendctlline("INCLUDE-GLOB-READY\r\n");
		my $include_glob = Amanda::Util::full_read($self->rfd($self->{'data_stream'}), $size);
		$self->{'include-glob-name'} = "$AMANDA_TMPDIR/restore-include-globi-$$";
		$self->{'include-glob-list'} =  [ $self->{'include-glob-name'} ];
		open INCLUDE, ">$self->{'include-glob-name'}";
		print INCLUDE $include_glob;
		close INCLUDE;
		$self->sendctlline("INCLUDE-GLOB-DONE\r\n");
	    } elsif ($line =~ /^EXCLUDE-SEND (\d*)$/) {
		my $size = $1;
		$self->sendctlline("EXCLUDE-READY\r\n");
		my $exclude = Amanda::Util::full_read($self->rfd($self->{'data_stream'}), $size);
		$self->{'exclude-name'} = "$AMANDA_TMPDIR/restore-exclude-$$";
		$self->{'exclude-list'} =  [ $self->{'exclude-name'} ];
		open EXCLUDE, ">$self->{'exclude-name'}";
		print EXCLUDE $exclude;
		close EXCLUDE;
		$self->sendctlline("EXCLUDE-DONE\r\n");
	    } elsif ($line =~ /^EXCLUDE-GLOB-SEND (\d*)$/) {
		my $size = $1;
		$self->sendctlline("EXCLUDE-GLOB-READY\r\n");
		my $exclude_glob = Amanda::Util::full_read($self->rfd($self->{'data_stream'}), $size);
		$self->{'exclude-glob-name'} = "$AMANDA_TMPDIR/restore-exclude-glob-$$";
		$self->{'exclude-glob-list'} =  [ $self->{'exclude-glob-name'} ];
		open EXCLUDE, ">$self->{'exclude-glob-name'}";
		print EXCLUDE $exclude_glob;
		close EXCLUDE;
		$self->sendctlline("EXCLUDE-GLOB-DONE\r\n");
	    }
	    $line = $self->getline('CTL');
	    $line =~ s/\r?\n$//g;
	}
    }
    if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_prev_next_level)) {
	my $line = $self->getline('CTL');
	if ($line !~ /^PREV-NEXT-LEVEL (\-?\d*) (\-?\d*)\r\n/) {
	    chomp $line;
	    chop $line;
	    return Amanda::FetchDump::Message->new(
			source_filename => __FILE__,
			source_line     => __LINE__,
			code            => 3300064,
			severity        => $Amanda::Message::ERROR,
			expect          => "PREV-NEXT-LEVEL",
			line            => $line);
	}
	$self->{'prev-level'} = $1 if $1 >= 0;
	$self->{'next-level'} = $2 if $2 >= 0;
    }

    return $self->{'hdr'};
}

sub send_dar_data {
    my $self = shift;
    my $line = shift;

    chomp $line;
    $self->sendctlline("$line\r\n");
    return undef;
}

sub transmit_state_file {
    my $self = shift;
    my $header = shift;

    return if !$self->{'their_features'}->has($Amanda::Feature::fe_restore_state_stream);

    if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_state_send)) {
	my $line = $self->getline('CTL');
	$line =~ s/\r?\n$//g;
	if ($line =~ /^NO-STATE-SEND/) {
	    $self->close($self->{'state_stream'}, 'r');
	    return;
	}
	if ($line !~ /^STATE-SEND/) {
	    chomp $line;
	    chop $line;
	    return Amanda::FetchDump::Message->new(
			source_filename => __FILE__,
			source_line     => __LINE__,
			code            => 3300064,
			severity        => $Amanda::Message::ERROR,
			expect          => "STATE-SEND",
			line            => $line);
	}
    }

    # print STATE-READY to CTL
    if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_state_ready)) {
	$self->sendctlline("STATE-READY\r\n");
    }

    my $host = Amanda::Util::sanitise_filename("" . $header->{'name'});
    my $disk = Amanda::Util::sanitise_filename("" . $header->{'disk'});
    my $state_filename = getconf($CNF_TMPDIR) . '/' . $host .
                '-' . $disk . '-' . $header->{'datestamp'} . '_' .
                $header->{'dumplevel'} . '.state';
    open (STATEFILE, '>', $state_filename) || die("ERR");
    my $block;
    my $length;
    while ($block = Amanda::Util::full_read($self->rfd($self->{'state_stream'}),
				     32768)) {
	Amanda::Util::full_write(fileno(STATEFILE),
				 $block, length($block))
		or die "writing to $state_filename: $!";
    }
    close(STATEFILE);
    $self->close($self->{'state_stream'}, 'r');
    $self->{'state_filename'} = $state_filename;

    # print STATE-DONE to CTL
    if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_state_done)) {
	$self->sendctlline("STATE-DONE\r\n");
    }
    return undef;
}

sub set {
    my $self = shift;
    my $hdr = shift;;
    my $dle = shift;;
    my $application_property = shift;;

    $self->{'hdr'} = $hdr;
    $self->{'dle'} = $dle;
    $self->{'application_property'} = $application_property;

    $self->{'extract'} = Amanda::Extract->new(
		hdr => $hdr,
		dle => $dle,
		'include-list' => $self->{'include-list'},
		'include-list-glob' => $self->{'include-list-glob'},
		'exclude-list' => $self->{'exclude-list'},
		'exclude-list-glob' => $self->{'exclude-list-glob'});
    die("$self->{'extract'}") if $self->{'extract'}->isa('Amanda::Message');
    ($self->{'bsu'}, my $err) = $self->{'extract'}->BSU();
    if (@$err) {
        die("BSU err " . join("\n", @$err));
    }
    return undef;
}

sub run_pre_scripts {
    my $self = shift;

    if (!defined $self->{'prev-level'}) {
	$self->{'extract'}->run_scripts($Amanda::Config::EXECUTE_ON_PRE_RECOVER);
    } else {
	$self->{'extract'}->run_scripts($Amanda::Config::EXECUTE_ON_INTER_LEVEL_RECOVER,
			'prev-level' => $self->{'prev-level'});
    }

    $self->{'extract'}->run_scripts($Amanda::Config::EXECUTE_ON_PRE_LEVEL_RECOVER);
}

sub run_post_scripts {
    my $self = shift;

    # run post-level-recover scripts
    $self->{'extract'}->run_scripts($Amanda::Config::EXECUTE_ON_POST_LEVEL_RECOVER);

    if (!defined $self->{'next-level'}) {
	$self->{'extract'}->run_scripts($Amanda::Config::EXECUTE_ON_POST_RECOVER);
    }
}

sub get_xfer_dest {
    my $self = shift;

    $self->{'extract'}->set_restore_argv(
		target => $self->{'target'},
		use_dar   => $self->{'use_dar'},
		state_filename => $self->{'state_filename'},
		application_property => $self->{'application_property'});

    if ($self->{'use_directtcp'}) {
	$self->{'xfer_dest'} = Amanda::Xfer::Dest::DirectTCPListen->new();
    } else {
	$self->{'xfer_dest'} = Amanda::Xfer::Dest::Application->new($self->{'extract'}->{'restore_argv'}, 0, 0, 0, 1);
    }

    return $self->{'xfer_dest'};
}

sub new_dest_fh {
    my $self = shift;

    my $new_dest_fh= \*STDOUT;
    return $new_dest_fh;
}

sub transmit_dar {
    my $self = shift;
    my $use_dar = shift;

    return 0 if !$self->{'their_features'}->has($Amanda::Feature::fe_restore_dar);

    my $line = $self->getline($self->{'ctl_stream'});
    $line =~ /^USE-DAR (.*)\r\n$/;
    my $darspec = $1;
    if ($darspec ne "YES" && $darspec ne "NO") {
	chomp $line;
	chop $line;
	return Amanda::FetchDump::Message->new(
			source_filename => __FILE__,
			source_line     => __LINE__,
			code            => 3300064,
			severity        => $Amanda::Message::ERROR,
			expect          => "USE-DAR [YES|NO]",
			line            => $line);
    }
    $use_dar &= ($darspec eq 'YES');
    $use_dar &= $self->{'bsu'}->{'dar'};

    if ($use_dar) {
	$self->sendctlline("USE-DAR YES\r\n");
    } else {
	$self->sendctlline("USE-DAR NO\r\n");
    }

    $self->{'use_dar'} = $use_dar;
    return $use_dar;
}

sub notify_start_backup {
    my $self = shift;

    if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_data_send)) {
        my $line = $self->getline($self->{'ctl_stream'});
        if ($line ne "DATA-SEND\r\n") {
	    chomp $line;
	    chop $line;
	    return Amanda::FetchDump::Message->new(
			source_filename => __FILE__,
			source_line     => __LINE__,
			code            => 3300064,
			severity        => $Amanda::Message::ERROR,
			expect          => "DATA-SEND",
			line            => $line);
        }
    }

    if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_data_ready)) {
        $self->sendctlline("DATA-READY\r\n");
    }

    return undef;
}

sub start_read_dar {
    my $self = shift;
    my $xfer_dest = shift;
    my $cb_data = shift;
    my $cb_done = shift;
    my $text = shift;

    my $fd = $xfer_dest->get_dar_fd();
    $fd.="";
    $fd = int($fd);
    my $src = Amanda::MainLoop::fd_source($fd,
                                          $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
    my $buffer = "";
    $self->{'fetchdump'}->{'all_filter'}->{$src} = 1;
    $src->set_callback( sub {
	my $b;
	my $n_read = POSIX::read($fd, $b, 1);
	if (!defined $n_read) {
	    return;
	} elsif ($n_read == 0) {
	    delete $self->{'fetchdump'}->{'all_filter'}->{$src};
	    $cb_data->("DAR -1:0");
	    $src->remove();
	    POSIX::close($fd);
	    if (!%{$self->{'fetchdump'}->{'all_filter'}} and $self->{'recovery_done'}) {
		$cb_done->();
	    }
	} else {
	    $buffer .= $b;
	    if ($b eq "\n") {
		my $line = $buffer;
		chomp $line;
		if (length($line) > 1) {
		    $cb_data->($line);
		}
		$buffer = "";
	    }
	}
    });
    return undef;
}

sub get_datapath {
    my $self = shift;

    $self->{'datapath'} = 'none';

    if (!$self->{'their_features'}->has($Amanda::Feature::fe_restore_datapath)) {
	$self->{'datapath'} = 'amanda';
	return;
    }


    my $line = $self->getline($self->{'ctl_stream'});
    my ($dpspec) = ($line =~ /^AVAIL-DATAPATH (.*)\r\n$/);
    if (!defined $dpspec) {
	chomp $line;
	chop $line;
	return Amanda::FetchDump::Message->new(
			source_filename => __FILE__,
			source_line     => __LINE__,
			code            => 3300064,
			severity        => $Amanda::Message::ERROR,
			expect          => "AVAIL-DATAPATH",
			line            => $line);
    }
    my @avail_dps = split / /, $dpspec;

    if (grep /^DIRECT-TCP$/, @avail_dps) {
	# remote can handle a directtcp transfer .. can we?
	# BUG: Must check application BSU
	if ($self->{'xfer_src_supports_directtcp'}) {
	    $self->{'datapath'} = 'directtcp';
	} else {
	    $self->{'datapath'} = 'amanda';
	}
    } else {
	# remote can at least handle AMANDA
	die "remote cannot handle AMANDA datapath??"
	    unless grep /^AMANDA$/, @avail_dps;
	$self->{'datapath'} = 'amanda';
    }
    return undef;
}

sub send_amanda_datapath {
    my $self = shift;

    return if !$self->{'their_features'}->has($Amanda::Feature::fe_restore_datapath);

    if ($self->{'datapath'} eq 'amanda') {
	$self->sendctlline("USE-DATAPATH AMANDA\r\n");
	my $line = $self->getline($self->{'ctl_stream'});
	if ($line !~ /^DATAPATH-OK\r?$/) {
	    chomp $line;
	    chop $line;
	    return Amanda::FetchDump::Message->new(
			source_filename => __FILE__,
			source_line     => __LINE__,
			code            => 3300064,
			severity        => $Amanda::Message::ERROR,
			expect          => "DATAPATH-OK",
			line            => $line);
	}
    }
    return undef;
}

sub send_directtcp_datapath {
    my $self = shift;

    return if !$self->{'their_features'}->has($Amanda::Feature::fe_restore_datapath);

    # send the data-path response, if we have a datapath
    if ($self->{'datapath'} eq 'directtcp') {
	my $addrs = $self->{'fetchdump'}->{'xfer_dest'}->get_addrs();
	$addrs = [ map { $_->[0] . ":" . $_->[1] } @$addrs ];
	$addrs = join(" ", @$addrs);
	$self->sendctlline("USE-DATAPATH DIRECT-TCP $addrs\r\n");
	my $line = $self->getline($self->{'ctl_stream'});
	if ($line !~ /^DATAPATH-OK\r?$/) {
	    chomp $line;
	    chop $line;
	    return Amanda::FetchDump::Message->new(
			source_filename => __FILE__,
			source_line     => __LINE__,
			code            => 3300064,
			severity        => $Amanda::Message::ERROR,
			expect          => "DATAPATH-OK",
			line            => $line);
	}
    }
    return undef;
}

sub quit {
    my $self = shift;

    if ($self->{'clerk'}) {
	$self->{'clerk'}->quit(finished_cb => sub {
	    my ($err) = @_;
	    $self->{'chg'}->quit() if defined $self->{'chg'};
	    if ($err) {
		# it's *way* too late to report this to amrecover now!
		warning("while quitting clerk: $err");
	    }
	    $self->quit1();
	});
    } else {
	$self->{'scan'}->quit() if defined $self->{'scan'};
	$self->{'chg'}->quit() if defined $self->{'chg'};
	$self->quit1();
    }

}

sub quit1 {
    my $self = shift;

    $self->{'storage'}->quit() if defined($self->{'storage'});
    $self->{'fetch_done'} = 1;
    if (!%{$self->{'all_filter'}}) {
	Amanda::MainLoop::quit();
    }
}

## utilities

sub get_req {
    my $self = shift;

    my $req_str = '';
    while (1) {
	my $buf = Amanda::Util::full_read($self->rfd('main'), 1024);
	last unless $buf;
	$req_str .= $buf;
    }
    # we've read main to EOF, so close it
    $self->close('main', 'r');

    $self->{'req'} = $self->parse_req($req_str);
    return $self->{'req'};
}

sub send_rep {
    my $self = shift;
    my ($streams, $errors) = @_;
    my $rep = '';

    # first, if there were errors in the REQ, report them
    if (@$errors) {
	for my $err (@$errors) {
	    $rep .= "ERROR $err\n";
	}
    } else {
	my $connline = $self->connect_streams(@$streams);
	$rep .= "$connline\n";
    }

    $rep .= "OPTIONS ";
    if ($self->{'their_features'}->has($Amanda::Feature::fe_rep_options_features)) {
        $rep .= 'features=' . $self->{'my_features'}->as_string() . ';';
    }
    if ($self->{'their_features'}->has($Amanda::Feature::fe_rep_options_hostname)) {
        $rep .= 'hostname=' . $self->{'req'}->{'hostsname'} . ';';
    }
    if (!$self->{'their_features'}->has($Amanda::Feature::fe_rep_options_features) ||
	!$self->{'their_features'}->has($Amanda::Feature::fe_rep_options_hostname)) {
        $rep .= ";";
    }
    # rep needs a empty-line terminator, I think
    $rep .= "\n";

    # write the whole rep packet, and close main to signal the end of the packet
    $self->senddata('main', $rep);
    $self->close('main', 'w');
}

# helper function to get a line, including the trailing '\n', from a stream.  This
# reads a character at a time to ensure that no extra characters are consumed.  This
# could certainly be more efficient! (TODO)
sub getline {
    my $self = shift;
    my ($stream) = @_;
    my $fd = $self->rfd($stream);
    my $line = '';

    while (1) {
	my $c;
	my $bytes = POSIX::read($fd, $c, 1)
	    or last;
	last if $bytes == 0;
	$line .= $c;
	last if $c eq "\n";
    }

    $line =~ /^(.*)$/;
    my $chopped = $1;
    $chopped =~ s/[\r\n]*$//g;
    debug("CTL << $chopped");

    return $line;
}

# like getline, but async; TODO:
#  - make all uses of getline async
#  - use buffering to read more than one character at a time
sub getline_async {
    my $self = shift;
    my ($stream, $async_read_cb) = @_;
    my $fd = $self->rfd($stream);

    my $data_in;
    my $buf = '';

    $data_in = sub {
	my ($err, $data) = @_;

	return $async_read_cb->($err, undef) if $err;

	$buf .= $data;
	if ($buf =~ /^(.*\r\n)$/) {
	    my $chopped = $1;
	    $chopped =~ s/[\r\n]*$//g;
	    debug("CTL << $chopped");

	    $async_read_cb->(undef, $buf);
	} else {
	    Amanda::MainLoop::async_read(fd => $fd, size => 1, async_read_cb => $data_in);
	}
    };
    Amanda::MainLoop::async_read(fd => $fd, size => 1, async_read_cb => $data_in);
}

# helper function to write a data to a stream.  This does not add newline characters.
# If the callback is given, this is async (TODO: all calls should be async)
sub senddata {
    my $self = shift;
    my ($stream, $data, $async_write_cb) = @_;
    my $fd = $self->wfd($stream);

    if (defined $async_write_cb) {
	return Amanda::MainLoop::async_write(
		fd => $fd,
		data => $data,
		async_write_cb => $async_write_cb);
    } else {
	Amanda::Util::full_write($fd, $data, length($data))
	    or die "writing to $stream: $!";
    }
}

# send a line on the control stream, or just log it if the ctl stream is gone;
# async callback is just like for senddata
sub sendctlline {
    my $self = shift;
    my ($msg, $async_write_cb) = @_;

    my $chopped = $msg;
    $chopped =~ s/[\r\n]*$//g;

    if ($self->{'ctl_stream'}) {
	debug("CTL >> $chopped");
	return $self->senddata($self->{'ctl_stream'}, $msg, $async_write_cb);
    } else {
	debug("not sending CTL message as CTL is closed >> $chopped");
	if (defined $async_write_cb) {
	    $async_write_cb->(undef, length($msg));
	}
    }
}

# send a MESSAGE on the MESG stream, but only if the remote has
# fe_amrecover_message
sub sendmessage {
    my $self = shift;
    my $msg  = shift;
    my $async_write_cb  = shift;

    if (!defined $self->{'json'}) {
	$self->{'json'} = JSON->new->allow_nonref->convert_blessed;
    }
    if ($self->{'mesg_stream'}) {
	debug("MESG >> $msg");
	if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_mesg_json)) {
	    return $self->senddata($self->{'mesg_stream'}, $self->{'json'}->pretty->encode($msg), $async_write_cb);
	} else {
	    return $self->senddata($self->{'mesg_stream'}, "$msg\n", $async_write_cb);
	}
    } else {
	debug("not sending MESG message as MESG is closed >> $msg");
	if (defined $async_write_cb) {
	    $async_write_cb->(undef, length($msg));
	}
    }
}

1;