# Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
# Copyright (c) 2013-2016 Carbonite, Inc. All Rights Reserved.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Contact information: Zmanda Inc., 465 S Mathlida Ave, Suite 300
# Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
use strict;
use warnings;
package Amanda::FetchDump::Application;
use base 'Amanda::FetchDump';
use Amanda::MainLoop qw( :GIOCondition );
use Amanda::Xfer qw( :constants );
use IPC::Open2;
sub DESTROY {
my $self = shift;
if ($self->{'uncompressed_state_filename'}) {
# unlink $self->{'state_filename'};
}
}
sub set {
my $self = shift;
my $hdr = shift;
my $dle = shift;
my $application_property = shift;
$self->{'hdr'} = $hdr;
$self->{'dle'} = $dle;
$self->{'application_property'} = $application_property;
$self->{'extract'} = Amanda::Extract->new(
hdr => $hdr,
dle => $dle,
'include-file' => $self->{'include-file'},
'include-list' => $self->{'include-list'},
'include-list-glob' => $self->{'include-list-glob'},
'exclude-file' => $self->{'exclude-file'},
'exclude-list' => $self->{'exclude-list'},
'exclude-list-glob' => $self->{'exclude-list-glob'});
die("$self->{'extract'}") if $self->{'extract'}->isa('Amanda::Message');
($self->{'bsu'}, my $err) = $self->{'extract'}->BSU();
if ($err && @$err) {
die("BSU err " . join("\n", @$err));
}
return undef;
}
sub start_read_dar
{
my $self = shift;
my $xfer_dest = shift;
my $cb_data = shift;
my $cb_done = shift;
my $text = shift;
my $dar_end = 0;
my $fd = $xfer_dest->get_dar_fd();
$fd.="";
$fd = int($fd);
my $src = Amanda::MainLoop::fd_source($fd,
$G_IO_IN|$G_IO_HUP|$G_IO_ERR);
my $buffer = "";
$self->{'fetchdump'}->{'all_filter'}->{$src} = 1;
$src->set_callback( sub {
my $b;
my $n_read = POSIX::read($fd, $b, 1);
if (!defined $n_read) {
return;
} elsif ($n_read == 0) {
delete $self->{'fetchdump'}->{'all_filter'}->{$src};
$cb_data->("DAR -1:0") if $dar_end == 0;;
$dar_end = 1;
$src->remove();
POSIX::close($fd);
if (!%{$self->{'fetchdump'}->{'all_filter'}} and $self->{'recovery_done'}) {
$cb_done->();
}
} else {
$buffer .= $b;
if ($b eq "\n") {
my $line = $buffer;
chomp $line;
if (length($line) > 1) {
$dar_end = 1 if $line eq "DAR -1:0";
$cb_data->($line);
}
$buffer = "";
}
}
});
return undef;
}
sub transmit_dar {
my $self = shift;
my $use_dar = shift;
$self->{'use_dar'} = $use_dar && $self->{'bsu'}->{'dar'};
return $self->{'use_dar'};
}
sub get_datapath {
my $self = shift;
my $directtcp_supported = shift;
$self->{'use_directtcp'} = $directtcp_supported && !$self->{'bsu'}->{'data-path-directtcp'};
return $self->{'use_directtcp'};
}
sub get_xfer_dest {
my $self = shift;
$self->{'extract'}->set_restore_argv(
target => $self->{'target'},
use_dar => $self->{'use_dar'},
state_filename => $self->{'state_filename'},
application_property => $self->{'application_property'});
if ($self->{'use_directtcp'}) {
$self->{'xfer_dest'} = Amanda::Xfer::Dest::DirectTCPListen->new();
} elsif ($self->{'extract'}->{'restore_argv'}) {
$self->{'xfer_dest'} = Amanda::Xfer::Dest::Application->new($self->{'extract'}->{'restore_argv'}, 0, 0, 0, 1);
} else {
return Amanda::FetchDump::Message->new(
source_filename => __FILE__,
source_line => __LINE__,
code => 3300005,
severity => $Amanda::Message::ERROR);
}
return $self->{'xfer_dest'};
}
sub transmit_state_file {
my $self = shift;
my $header = shift;
my $state_filename = Amanda::Logfile::getstatefname(
"".$header->{'name'}, "".$header->{'disk'},
$header->{'datestamp'}, $header->{'dumplevel'});
my $state_filename_gz = $state_filename . $Amanda::Constants::COMPRESS_SUFFIX;
if (-e $state_filename) {
$self->{'state_filename'} = $state_filename;
} elsif (-e $state_filename_gz) {
my $pid;
open STATEFILE, '>', $state_filename;
$pid = open2(">&STATEFILE", undef,
$Amanda::Constants::UNCOMPRESS_PATH,
$Amanda::Constants::UNCOMPRESS_OPT,
$state_filename_gz);
close STATEFILE;
waitpid($pid, 0);
$self->{'state_filename'} = $state_filename;
$self->{'uncompressed_state_filename'} = 1;
}
return undef;
}
sub new_dest_fh {
my $self = shift;
if (!$self->{'use_directtcp'}) {
my $new_dest_fh = \*STDOUT;
return $new_dest_fh;
}
return;
}
sub run_directtcp_application {
my $self = shift;
my $xfer = shift;
return if !$self->{'use_directtcp'};
my $addr = $self->{'xfer_dest'}->get_addrs();
my @directtcp_command = $self->{'extract'}->{'restore_argv'};
push @directtcp_command, "--data-path", "DIRECTTCP";
push @directtcp_command, "--direct-tcp", "$addr->[0]->[0]:$addr->[0]->[1]";
debug("Running: ". join(' ', @directtcp_command));
my ($wtr, $rdr);
my $err = Symbol::gensym;
my $amndmp_pid = open3($wtr, $rdr, $err, @directtcp_command);
$amndmp_pid = $amndmp_pid;
my $file_to_close = 2;
my $amndmp_stdout_src = Amanda::MainLoop::fd_source($rdr,
$G_IO_IN|$G_IO_HUP|$G_IO_ERR);
my $amndmp_stderr_src = Amanda::MainLoop::fd_source($err,
$G_IO_IN|$G_IO_HUP|$G_IO_ERR);
$amndmp_stdout_src->set_callback( sub {
my $line = <$rdr>;
if (!defined $line) {
$file_to_close--;
$amndmp_stdout_src->remove();
if ($file_to_close == 0) {
#abort the xfer
$xfer->cancel() if $xfer->get_status != $XFER_DONE;
}
return;
}
chomp $line;
debug("amndmp stdout: $line");
print "$line\n";
});
$amndmp_stderr_src->set_callback( sub {
my $line = <$err>;
if (!defined $line) {
$file_to_close--;
$amndmp_stderr_src->remove();
if ($file_to_close == 0) {
#abort the xfer
$xfer->cancel() if $xfer->get_status != $XFER_DONE;
}
return;
}
chomp $line;
debug("amndmp stderr: $line");
print STDERR "$line\n";
$self->{'last_is_size'} = 0;
});
}
1;