# Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
# Copyright (c) 2013-2016 Carbonite, Inc. All Rights Reserved.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Contact information: Zmanda Inc., 465 S Mathlida Ave, Suite 300
# Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
use strict;
use warnings;
=head1 PROTOCOl
server (amfetchdump) client (restore)
<= FEATURES string
TARGET target =>
HEADER-SEND-SIZE size => # fe_restore_header_send_size
<= HEADER-READY # fe_restore_header_ready
header (size:data) =>
<= HEADER-DONE # fe_restore_header_done
run application support command
INCLUDE-SEND size => # fe_restore_include
<= INCLUDE-READY # fe_restore_include
include (size:data) # fe_restore_include
<= INCLUDE-DONE # fe_restore_include
INCLUDE-GLOB-SEND size => # fe_restore_include_glob
<= INCLUDE-GLOB-READY # fe_restore_include_glob
include-glob (size:data) # fe_restore_include_glob
<= INCLUDE-GLOB-DONE # fe_restore_include_glob
EXCLUDE-SEND size => # fe_restore_exclude
<= EXCLUDE-READY # fe_restore_exclude
exclude (size:data) # fe_restore_exclude
<= EXCLUDE-DONE # fe_restore_exclude
EXCLUDE-GLOB-SEND size => # fe_restore_exclude_glob
<= EXCLUDE-GLOB-READY # fe_restore_exclude_glob
exclude-glob (size:data) # fe_restore_exclude_glob
<= EXCLUDE-GLOB-DONE # fe_restore_exclude_glob
INCLUDE-EXCLUDE-DONE =>
PREV-NEXT-LEVEL p n => # fe_restore_prev_next_level
STATE-SEND => # fe_restore_state_send
<= STATE-READY # fe_restore_state_ready
statefile (EOF:state) =>
<= STATE-DONE # fe_restore_state_done
USE-DAR (YES|NO) => # fe_restore_dar
<= USE-DAR (YES|NO) # fe_restore_dar
AVAIL-DATAPATH AMANDA => # fe_restore_datapath
<= USE-DATAPATH AMANDA # fe_restore_datapath
DATAPATH-OK => # fe_restore_datapath
DATA-SEND => # fe_restore_data_send
run application restore command
<= DATA-READY # fe_restore_data_ready
<= DAR x:y # fe_restore_dar and USE-DAR is YES
data (EOF:data) =>
<= DAR-DONE # fe_restore_dar and USE-DAR is YES
#<= DATA-DONE # fe_restore_data_done
=head1 FEATURES
=cut
package Amanda::Service::Restore;
##
# ClientService class
use vars qw( @ISA );
use Amanda::ClientService;
use Amanda::Recovery::Clerk;
use parent -norequire, qw( Amanda::ClientService Amanda::Recovery::Clerk::Feedback);
use Sys::Hostname;
use IPC::Open2;
use JSON -convert_blessed_universally;
use Amanda::Debug qw( debug info warning );
use Amanda::Paths;
use Amanda::MainLoop qw( :GIOCondition );
use Amanda::Util qw( :constants match_disk match_host );
use Amanda::Feature;
use Amanda::Config qw( :init :getconf );
use Amanda::Storage;
use Amanda::Changer;
use Amanda::Xfer qw( :constants );
use Amanda::Cmdline;
use Amanda::Recovery::Clerk;
use Amanda::Disklist;
use Amanda::Restore;
use Amanda::FetchDump; # for Amanda::FetchDump::Message
# Note that this class performs its control IO synchronously. This is adequate
# for this service, as it never receives unsolicited input from the remote
# system.
sub run {
my $self = shift;
$self->{'my_features'} = Amanda::Feature::Set->mine();
$self->{'their_features'} = Amanda::Feature::Set->old();
$self->{'all_filter'} = {};
$self->setup_streams();
}
sub set_feedback {
}
sub user_message {
my $self = shift;
my $message = shift;
debug("user_message feedback: $message");
$self->sendmessage($message);
}
sub setup_streams {
my $self = shift;
my $req = $self->get_req();
# make some sanity checks
my $errors = [];
if (defined $req->{'options'}{'auth'} and defined $self->amandad_auth()
and $req->{'options'}{'auth'} ne $self->amandad_auth()) {
my $reqauth = $req->{'options'}{'auth'};
my $amauth = $self->amandad_auth();
push @$errors, "recover program requested auth '$reqauth', " .
"but amandad is using auth '$amauth'";
$main::exit_status = 1;
}
# and pull out the features, if given
if (defined($req->{'options'}{'features'})) {
$self->{'their_features'} = Amanda::Feature::Set->from_string($req->{'options'}{'features'});
}
if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_state_stream)) {
$self->send_rep(['CTL' => 'rw', 'DATA' => 'r', 'MESG' => 'w', 'STATE' => 'r'], $errors);
$self->{'state_stream' } = 'STATE';
} else {
$self->send_rep(['CTL' => 'rw', 'DATA' => 'r', 'MESG' => 'w'], $errors);
}
return $self->quit() if (@$errors);
$self->{'ctl_stream' } = 'CTL';
$self->{'data_stream'} = 'DATA';
$self->{'mesg_stream'} = 'MESG';
if ($req->{'options'}->{'config'}) {
config_init($CONFIG_INIT_CLIENT | $CONFIG_INIT_EXPLICIT_NAME | $CONFIG_INIT_OVERLAY,
$req->{'options'}->{'config'});
my ($cfgerr_level, @cfgerr_errors) = config_errors();
if ($cfgerr_level >= $CFGERR_ERRORS) {
die "configuration errors; aborting connection";
}
Amanda::Util::finish_setup($RUNNING_AS_DUMPUSER_PREFERRED);
}
$self->sendctlline("FEATURES " . $self->{'my_features'}->as_string() . "\r\n");
$self->read_command();
}
sub read_command {
my $self = shift;
my $ctl_stream = $self->{'ctl_stream'};
my $command = $self->{'command'} = {};
my $line = $self->getline($ctl_stream);
$line =~ s/\r?\n$//g;
if ($line !~ /^TARGET (.*)/) {
chomp $line;
chop $line;
$self->user_message(Amanda::FetchDump::Message->new(
source_filename => __FILE__,
source_line => __LINE__,
code => 3300064,
severity => $Amanda::Message::ERROR,
expect => "TARGET",
line => $line));
exit 1;
}
$self->{'target'} = Amanda::Util::unquote_string($1);
($self->{'restore'}, my $result_message) = Amanda::Restore->new();
$self->{'restore'}->restore(
'source-fd' => $self->rfd($self->{'data_stream'}),
'target' => $self->{'target'},
'extract' => 1,
'decompress' => 1,
'decrypt' => 1,
'feedback' => $self,
'their_features' => $self->{'their_features'},
'finished_cb' => sub {
$main::exit_status = shift;
$self->quit();
});
return;
}
sub get_header {
my $self = shift;
$self->{'header-size'} = 32768;
if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_header_send_size)) {
my $line = $self->getline('CTL');
$line =~ s/\r?\n$//g;
if ($line !~ /^HEADER-SEND-SIZE (.*)/) {
chomp $line;
chop $line;
return Amanda::FetchDump::Message->new(
source_filename => __FILE__,
source_line => __LINE__,
code => 3300064,
severity => $Amanda::Message::ERROR,
expect => "HEADER-SEND-SIZE",
line => $line);
}
$self->{'header-size'} = 0 + $1;
}
if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_header_ready)) {
$self->sendctlline("HEADER-READY\r\n");
}
# read header from DATA
my $header = Amanda::Util::full_read($self->rfd($self->{'data_stream'}), $self->{'header-size'});
# print HEADER-DONE to CTL
$self->sendctlline("HEADER-DONE\r\n");
# parse header
$self->{'hdr'} = Amanda::Header->from_string($header);
if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_include) ||
$self->{'their_features'}->has($Amanda::Feature::fe_restore_include_glob) ||
$self->{'their_features'}->has($Amanda::Feature::fe_restore_exclude) ||
$self->{'their_features'}->has($Amanda::Feature::fe_restore_exclude_glob)) {
my $line = $self->getline('CTL');
$line =~ s/\r?\n$//g;
while ($line ne "INCLUDE-EXCLUDE-DONE") {
if ($line =~ /^INCLUDE-SEND (\d*)$/) {
my $size = $1;
$self->sendctlline("INCLUDE-READY\r\n");
my $include = Amanda::Util::full_read($self->rfd($self->{'data_stream'}), $size);
$self->{'include-name'} = "$AMANDA_TMPDIR/restore-include-$$";
$self->{'include-list'} = [ $self->{'include-name'} ];
open INCLUDE, ">$self->{'include-name'}";
print INCLUDE $include;
close INCLUDE;
$self->sendctlline("INCLUDE-DONE\r\n");
} elsif ($line =~ /^INCLUDE-GLOB-SEND (\d*)$/) {
my $size = $1;
$self->sendctlline("INCLUDE-GLOB-READY\r\n");
my $include_glob = Amanda::Util::full_read($self->rfd($self->{'data_stream'}), $size);
$self->{'include-glob-name'} = "$AMANDA_TMPDIR/restore-include-globi-$$";
$self->{'include-glob-list'} = [ $self->{'include-glob-name'} ];
open INCLUDE, ">$self->{'include-glob-name'}";
print INCLUDE $include_glob;
close INCLUDE;
$self->sendctlline("INCLUDE-GLOB-DONE\r\n");
} elsif ($line =~ /^EXCLUDE-SEND (\d*)$/) {
my $size = $1;
$self->sendctlline("EXCLUDE-READY\r\n");
my $exclude = Amanda::Util::full_read($self->rfd($self->{'data_stream'}), $size);
$self->{'exclude-name'} = "$AMANDA_TMPDIR/restore-exclude-$$";
$self->{'exclude-list'} = [ $self->{'exclude-name'} ];
open EXCLUDE, ">$self->{'exclude-name'}";
print EXCLUDE $exclude;
close EXCLUDE;
$self->sendctlline("EXCLUDE-DONE\r\n");
} elsif ($line =~ /^EXCLUDE-GLOB-SEND (\d*)$/) {
my $size = $1;
$self->sendctlline("EXCLUDE-GLOB-READY\r\n");
my $exclude_glob = Amanda::Util::full_read($self->rfd($self->{'data_stream'}), $size);
$self->{'exclude-glob-name'} = "$AMANDA_TMPDIR/restore-exclude-glob-$$";
$self->{'exclude-glob-list'} = [ $self->{'exclude-glob-name'} ];
open EXCLUDE, ">$self->{'exclude-glob-name'}";
print EXCLUDE $exclude_glob;
close EXCLUDE;
$self->sendctlline("EXCLUDE-GLOB-DONE\r\n");
}
$line = $self->getline('CTL');
$line =~ s/\r?\n$//g;
}
}
if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_prev_next_level)) {
my $line = $self->getline('CTL');
if ($line !~ /^PREV-NEXT-LEVEL (\-?\d*) (\-?\d*)\r\n/) {
chomp $line;
chop $line;
return Amanda::FetchDump::Message->new(
source_filename => __FILE__,
source_line => __LINE__,
code => 3300064,
severity => $Amanda::Message::ERROR,
expect => "PREV-NEXT-LEVEL",
line => $line);
}
$self->{'prev-level'} = $1 if $1 >= 0;
$self->{'next-level'} = $2 if $2 >= 0;
}
return $self->{'hdr'};
}
sub send_dar_data {
my $self = shift;
my $line = shift;
chomp $line;
$self->sendctlline("$line\r\n");
return undef;
}
sub transmit_state_file {
my $self = shift;
my $header = shift;
return if !$self->{'their_features'}->has($Amanda::Feature::fe_restore_state_stream);
if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_state_send)) {
my $line = $self->getline('CTL');
$line =~ s/\r?\n$//g;
if ($line =~ /^NO-STATE-SEND/) {
$self->close($self->{'state_stream'}, 'r');
return;
}
if ($line !~ /^STATE-SEND/) {
chomp $line;
chop $line;
return Amanda::FetchDump::Message->new(
source_filename => __FILE__,
source_line => __LINE__,
code => 3300064,
severity => $Amanda::Message::ERROR,
expect => "STATE-SEND",
line => $line);
}
}
# print STATE-READY to CTL
if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_state_ready)) {
$self->sendctlline("STATE-READY\r\n");
}
my $host = Amanda::Util::sanitise_filename("" . $header->{'name'});
my $disk = Amanda::Util::sanitise_filename("" . $header->{'disk'});
my $state_filename = getconf($CNF_TMPDIR) . '/' . $host .
'-' . $disk . '-' . $header->{'datestamp'} . '_' .
$header->{'dumplevel'} . '.state';
open (STATEFILE, '>', $state_filename) || die("ERR");
my $block;
my $length;
while ($block = Amanda::Util::full_read($self->rfd($self->{'state_stream'}),
32768)) {
Amanda::Util::full_write(fileno(STATEFILE),
$block, length($block))
or die "writing to $state_filename: $!";
}
close(STATEFILE);
$self->close($self->{'state_stream'}, 'r');
$self->{'state_filename'} = $state_filename;
# print STATE-DONE to CTL
if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_state_done)) {
$self->sendctlline("STATE-DONE\r\n");
}
return undef;
}
sub set {
my $self = shift;
my $hdr = shift;;
my $dle = shift;;
my $application_property = shift;;
$self->{'hdr'} = $hdr;
$self->{'dle'} = $dle;
$self->{'application_property'} = $application_property;
$self->{'extract'} = Amanda::Extract->new(
hdr => $hdr,
dle => $dle,
'include-list' => $self->{'include-list'},
'include-list-glob' => $self->{'include-list-glob'},
'exclude-list' => $self->{'exclude-list'},
'exclude-list-glob' => $self->{'exclude-list-glob'});
die("$self->{'extract'}") if $self->{'extract'}->isa('Amanda::Message');
($self->{'bsu'}, my $err) = $self->{'extract'}->BSU();
if (@$err) {
die("BSU err " . join("\n", @$err));
}
return undef;
}
sub run_pre_scripts {
my $self = shift;
if (!defined $self->{'prev-level'}) {
$self->{'extract'}->run_scripts($Amanda::Config::EXECUTE_ON_PRE_RECOVER);
} else {
$self->{'extract'}->run_scripts($Amanda::Config::EXECUTE_ON_INTER_LEVEL_RECOVER,
'prev-level' => $self->{'prev-level'});
}
$self->{'extract'}->run_scripts($Amanda::Config::EXECUTE_ON_PRE_LEVEL_RECOVER);
}
sub run_post_scripts {
my $self = shift;
# run post-level-recover scripts
$self->{'extract'}->run_scripts($Amanda::Config::EXECUTE_ON_POST_LEVEL_RECOVER);
if (!defined $self->{'next-level'}) {
$self->{'extract'}->run_scripts($Amanda::Config::EXECUTE_ON_POST_RECOVER);
}
}
sub get_xfer_dest {
my $self = shift;
$self->{'extract'}->set_restore_argv(
target => $self->{'target'},
use_dar => $self->{'use_dar'},
state_filename => $self->{'state_filename'},
application_property => $self->{'application_property'});
if ($self->{'use_directtcp'}) {
$self->{'xfer_dest'} = Amanda::Xfer::Dest::DirectTCPListen->new();
} else {
$self->{'xfer_dest'} = Amanda::Xfer::Dest::Application->new($self->{'extract'}->{'restore_argv'}, 0, 0, 0, 1);
}
return $self->{'xfer_dest'};
}
sub new_dest_fh {
my $self = shift;
my $new_dest_fh= \*STDOUT;
return $new_dest_fh;
}
sub transmit_dar {
my $self = shift;
my $use_dar = shift;
return 0 if !$self->{'their_features'}->has($Amanda::Feature::fe_restore_dar);
my $line = $self->getline($self->{'ctl_stream'});
$line =~ /^USE-DAR (.*)\r\n$/;
my $darspec = $1;
if ($darspec ne "YES" && $darspec ne "NO") {
chomp $line;
chop $line;
return Amanda::FetchDump::Message->new(
source_filename => __FILE__,
source_line => __LINE__,
code => 3300064,
severity => $Amanda::Message::ERROR,
expect => "USE-DAR [YES|NO]",
line => $line);
}
$use_dar &= ($darspec eq 'YES');
$use_dar &= $self->{'bsu'}->{'dar'};
if ($use_dar) {
$self->sendctlline("USE-DAR YES\r\n");
} else {
$self->sendctlline("USE-DAR NO\r\n");
}
$self->{'use_dar'} = $use_dar;
return $use_dar;
}
sub notify_start_backup {
my $self = shift;
if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_data_send)) {
my $line = $self->getline($self->{'ctl_stream'});
if ($line ne "DATA-SEND\r\n") {
chomp $line;
chop $line;
return Amanda::FetchDump::Message->new(
source_filename => __FILE__,
source_line => __LINE__,
code => 3300064,
severity => $Amanda::Message::ERROR,
expect => "DATA-SEND",
line => $line);
}
}
if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_data_ready)) {
$self->sendctlline("DATA-READY\r\n");
}
return undef;
}
sub start_read_dar {
my $self = shift;
my $xfer_dest = shift;
my $cb_data = shift;
my $cb_done = shift;
my $text = shift;
my $fd = $xfer_dest->get_dar_fd();
$fd.="";
$fd = int($fd);
my $src = Amanda::MainLoop::fd_source($fd,
$G_IO_IN|$G_IO_HUP|$G_IO_ERR);
my $buffer = "";
$self->{'fetchdump'}->{'all_filter'}->{$src} = 1;
$src->set_callback( sub {
my $b;
my $n_read = POSIX::read($fd, $b, 1);
if (!defined $n_read) {
return;
} elsif ($n_read == 0) {
delete $self->{'fetchdump'}->{'all_filter'}->{$src};
$cb_data->("DAR -1:0");
$src->remove();
POSIX::close($fd);
if (!%{$self->{'fetchdump'}->{'all_filter'}} and $self->{'recovery_done'}) {
$cb_done->();
}
} else {
$buffer .= $b;
if ($b eq "\n") {
my $line = $buffer;
chomp $line;
if (length($line) > 1) {
$cb_data->($line);
}
$buffer = "";
}
}
});
return undef;
}
sub get_datapath {
my $self = shift;
$self->{'datapath'} = 'none';
if (!$self->{'their_features'}->has($Amanda::Feature::fe_restore_datapath)) {
$self->{'datapath'} = 'amanda';
return;
}
my $line = $self->getline($self->{'ctl_stream'});
my ($dpspec) = ($line =~ /^AVAIL-DATAPATH (.*)\r\n$/);
if (!defined $dpspec) {
chomp $line;
chop $line;
return Amanda::FetchDump::Message->new(
source_filename => __FILE__,
source_line => __LINE__,
code => 3300064,
severity => $Amanda::Message::ERROR,
expect => "AVAIL-DATAPATH",
line => $line);
}
my @avail_dps = split / /, $dpspec;
if (grep /^DIRECT-TCP$/, @avail_dps) {
# remote can handle a directtcp transfer .. can we?
# BUG: Must check application BSU
if ($self->{'xfer_src_supports_directtcp'}) {
$self->{'datapath'} = 'directtcp';
} else {
$self->{'datapath'} = 'amanda';
}
} else {
# remote can at least handle AMANDA
die "remote cannot handle AMANDA datapath??"
unless grep /^AMANDA$/, @avail_dps;
$self->{'datapath'} = 'amanda';
}
return undef;
}
sub send_amanda_datapath {
my $self = shift;
return if !$self->{'their_features'}->has($Amanda::Feature::fe_restore_datapath);
if ($self->{'datapath'} eq 'amanda') {
$self->sendctlline("USE-DATAPATH AMANDA\r\n");
my $line = $self->getline($self->{'ctl_stream'});
if ($line !~ /^DATAPATH-OK\r?$/) {
chomp $line;
chop $line;
return Amanda::FetchDump::Message->new(
source_filename => __FILE__,
source_line => __LINE__,
code => 3300064,
severity => $Amanda::Message::ERROR,
expect => "DATAPATH-OK",
line => $line);
}
}
return undef;
}
sub send_directtcp_datapath {
my $self = shift;
return if !$self->{'their_features'}->has($Amanda::Feature::fe_restore_datapath);
# send the data-path response, if we have a datapath
if ($self->{'datapath'} eq 'directtcp') {
my $addrs = $self->{'fetchdump'}->{'xfer_dest'}->get_addrs();
$addrs = [ map { $_->[0] . ":" . $_->[1] } @$addrs ];
$addrs = join(" ", @$addrs);
$self->sendctlline("USE-DATAPATH DIRECT-TCP $addrs\r\n");
my $line = $self->getline($self->{'ctl_stream'});
if ($line !~ /^DATAPATH-OK\r?$/) {
chomp $line;
chop $line;
return Amanda::FetchDump::Message->new(
source_filename => __FILE__,
source_line => __LINE__,
code => 3300064,
severity => $Amanda::Message::ERROR,
expect => "DATAPATH-OK",
line => $line);
}
}
return undef;
}
sub quit {
my $self = shift;
if ($self->{'clerk'}) {
$self->{'clerk'}->quit(finished_cb => sub {
my ($err) = @_;
$self->{'chg'}->quit() if defined $self->{'chg'};
if ($err) {
# it's *way* too late to report this to amrecover now!
warning("while quitting clerk: $err");
}
$self->quit1();
});
} else {
$self->{'scan'}->quit() if defined $self->{'scan'};
$self->{'chg'}->quit() if defined $self->{'chg'};
$self->quit1();
}
}
sub quit1 {
my $self = shift;
$self->{'storage'}->quit() if defined($self->{'storage'});
$self->{'fetch_done'} = 1;
if (!%{$self->{'all_filter'}}) {
Amanda::MainLoop::quit();
}
}
## utilities
sub get_req {
my $self = shift;
my $req_str = '';
while (1) {
my $buf = Amanda::Util::full_read($self->rfd('main'), 1024);
last unless $buf;
$req_str .= $buf;
}
# we've read main to EOF, so close it
$self->close('main', 'r');
$self->{'req'} = $self->parse_req($req_str);
return $self->{'req'};
}
sub send_rep {
my $self = shift;
my ($streams, $errors) = @_;
my $rep = '';
# first, if there were errors in the REQ, report them
if (@$errors) {
for my $err (@$errors) {
$rep .= "ERROR $err\n";
}
} else {
my $connline = $self->connect_streams(@$streams);
$rep .= "$connline\n";
}
$rep .= "OPTIONS ";
if ($self->{'their_features'}->has($Amanda::Feature::fe_rep_options_features)) {
$rep .= 'features=' . $self->{'my_features'}->as_string() . ';';
}
if ($self->{'their_features'}->has($Amanda::Feature::fe_rep_options_hostname)) {
$rep .= 'hostname=' . $self->{'req'}->{'hostsname'} . ';';
}
if (!$self->{'their_features'}->has($Amanda::Feature::fe_rep_options_features) ||
!$self->{'their_features'}->has($Amanda::Feature::fe_rep_options_hostname)) {
$rep .= ";";
}
# rep needs a empty-line terminator, I think
$rep .= "\n";
# write the whole rep packet, and close main to signal the end of the packet
$self->senddata('main', $rep);
$self->close('main', 'w');
}
# helper function to get a line, including the trailing '\n', from a stream. This
# reads a character at a time to ensure that no extra characters are consumed. This
# could certainly be more efficient! (TODO)
sub getline {
my $self = shift;
my ($stream) = @_;
my $fd = $self->rfd($stream);
my $line = '';
while (1) {
my $c;
my $bytes = POSIX::read($fd, $c, 1)
or last;
last if $bytes == 0;
$line .= $c;
last if $c eq "\n";
}
$line =~ /^(.*)$/;
my $chopped = $1;
$chopped =~ s/[\r\n]*$//g;
debug("CTL << $chopped");
return $line;
}
# like getline, but async; TODO:
# - make all uses of getline async
# - use buffering to read more than one character at a time
sub getline_async {
my $self = shift;
my ($stream, $async_read_cb) = @_;
my $fd = $self->rfd($stream);
my $data_in;
my $buf = '';
$data_in = sub {
my ($err, $data) = @_;
return $async_read_cb->($err, undef) if $err;
$buf .= $data;
if ($buf =~ /^(.*\r\n)$/) {
my $chopped = $1;
$chopped =~ s/[\r\n]*$//g;
debug("CTL << $chopped");
$async_read_cb->(undef, $buf);
} else {
Amanda::MainLoop::async_read(fd => $fd, size => 1, async_read_cb => $data_in);
}
};
Amanda::MainLoop::async_read(fd => $fd, size => 1, async_read_cb => $data_in);
}
# helper function to write a data to a stream. This does not add newline characters.
# If the callback is given, this is async (TODO: all calls should be async)
sub senddata {
my $self = shift;
my ($stream, $data, $async_write_cb) = @_;
my $fd = $self->wfd($stream);
if (defined $async_write_cb) {
return Amanda::MainLoop::async_write(
fd => $fd,
data => $data,
async_write_cb => $async_write_cb);
} else {
Amanda::Util::full_write($fd, $data, length($data))
or die "writing to $stream: $!";
}
}
# send a line on the control stream, or just log it if the ctl stream is gone;
# async callback is just like for senddata
sub sendctlline {
my $self = shift;
my ($msg, $async_write_cb) = @_;
my $chopped = $msg;
$chopped =~ s/[\r\n]*$//g;
if ($self->{'ctl_stream'}) {
debug("CTL >> $chopped");
return $self->senddata($self->{'ctl_stream'}, $msg, $async_write_cb);
} else {
debug("not sending CTL message as CTL is closed >> $chopped");
if (defined $async_write_cb) {
$async_write_cb->(undef, length($msg));
}
}
}
# send a MESSAGE on the MESG stream, but only if the remote has
# fe_amrecover_message
sub sendmessage {
my $self = shift;
my $msg = shift;
my $async_write_cb = shift;
if (!defined $self->{'json'}) {
$self->{'json'} = JSON->new->allow_nonref->convert_blessed;
}
if ($self->{'mesg_stream'}) {
debug("MESG >> $msg");
if ($self->{'their_features'}->has($Amanda::Feature::fe_restore_mesg_json)) {
return $self->senddata($self->{'mesg_stream'}, $self->{'json'}->pretty->encode($msg), $async_write_cb);
} else {
return $self->senddata($self->{'mesg_stream'}, "$msg\n", $async_write_cb);
}
} else {
debug("not sending MESG message as MESG is closed >> $msg");
if (defined $async_write_cb) {
$async_write_cb->(undef, length($msg));
}
}
}
1;