# Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
# Copyright (c) 2013-2016 Carbonite, Inc. All Rights Reserved.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Contact information: Carbonite Inc., 756 N Pastoria Ave
# Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
use Test::More tests => 68;
use File::Path;
use Data::Dumper;
use strict;
use warnings;
use lib '@amperldir@';
use Installcheck;
use Installcheck::Run qw( load_vtape_res );
use Installcheck::Mock;
use Amanda::Xfer qw( :constants );
use Amanda::Header;
use Amanda::Debug;
use Amanda::MainLoop;
use Amanda::Paths;
use Amanda::Config qw( :init :getconf );
use Amanda::Constants;
use Fcntl 'SEEK_SET';
# get Amanda::Device only when we're building for server
BEGIN {
use Amanda::Util;
if (Amanda::Util::built_with_component("server")) {
eval "use Amanda::Device;";
die $@ if $@;
eval "use Amanda::Changer;";
die $@ if $@;
}
}
# set up debugging so debug output doesn't interfere with test results
Amanda::Debug::dbopen("installcheck");
Installcheck::log_test_output();
# and disable Debug's die() and warn() overrides
Amanda::Debug::disable_die_override();
my $r;
{
my $RANDOM_SEED = 0xD00D;
my $xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
Amanda::Xfer::Filter::Xor->new(0), # key of 0 -> no change, so random seeds match
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
]);
pass("Creating a transfer doesn't crash"); # hey, it's a start..
my $got_msg = "(not received)";
$xfer->start(sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
die $msg->{elt} . " failed: " . $msg->{message};
}
if ($msg->{type} == $XMSG_INFO) {
$got_msg = $msg->{message};
} elsif ($msg->{'type'} == $XMSG_DONE) {
Amanda::MainLoop::quit();
}
});
Amanda::MainLoop::run();
$xfer = undef;
pass("A simple transfer runs to completion");
is($got_msg, "Is this thing on?",
"XMSG_INFO from Amanda::Xfer::Dest::Null has correct message");
}
my $xfer1;
my $xfer2;
{
my $RANDOM_SEED = 0xDEADBEEF;
$xfer1 = Amanda::Xfer->new([
Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
]);
$xfer2 = Amanda::Xfer->new([
Amanda::Xfer::Source::Random->new(1024*1024*3, $RANDOM_SEED),
Amanda::Xfer::Filter::Xor->new(0xf0),
Amanda::Xfer::Filter::Xor->new(0xf0),
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
]);
my $cb = sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
die $msg->{elt} . " failed: " . $msg->{message};
} elsif ($msg->{'type'} == $XMSG_DONE) {
if ($xfer1->get_status() == $Amanda::Xfer::XFER_DONE
and $xfer2->get_status() == $Amanda::Xfer::XFER_DONE) {
Amanda::MainLoop::quit();
}
}
};
$xfer1->start($cb);
$xfer2->start($cb);
}
# let the already-started transfers go out of scope before they
# complete, as a memory management test..
Amanda::MainLoop::run();
$xfer1 = undef;
$xfer2 = undef;
pass("Two simultaneous transfers run to completion");
{
my $RANDOM_SEED = 0xD0DEEDAA;
my @elts;
# note that, because the Xor filter is flexible, assembling
# long pipelines can take an exponentially long time. A 10-elt
# pipeline exercises the linking algorithm without wasting
# too many CPU cycles
push @elts, Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED);
for my $i (1 .. 4) {
push @elts, Amanda::Xfer::Filter::Xor->new($i);
push @elts, Amanda::Xfer::Filter::Xor->new($i);
}
push @elts, Amanda::Xfer::Dest::Null->new($RANDOM_SEED);
my $xfer = Amanda::Xfer->new(\@elts);
my $cb = sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
die $msg->{elt} . " failed: " . $msg->{message};
} elsif ($msg->{'type'} == $XMSG_DONE) {
Amanda::MainLoop::quit();
}
};
$xfer->start($cb);
Amanda::MainLoop::run();
$xfer = undef;
pass("One 10-element transfer runs to completion");
}
{
my $read_filename = "$Installcheck::TMP/xfer-junk-src.tmp";
my $write_filename = "$Installcheck::TMP/xfer-junk-dest.tmp";
my ($rfh, $wfh);
mkdir($Installcheck::TMP) unless (-e $Installcheck::TMP);
# fill the file with some stuff
open($wfh, ">", $read_filename) or die("Could not open '$read_filename' for writing");
for my $i (1 .. 100) { print $wfh "line $i\n"; }
close($wfh);
open($rfh, "<", $read_filename) or die("Could not open '$read_filename' for reading");
open($wfh, ">", "$write_filename") or die("Could not open '$write_filename' for writing");
# now run a transfer out of it
my $xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Fd->new(fileno($rfh)),
Amanda::Xfer::Filter::Xor->new(0xde),
Amanda::Xfer::Filter::Xor->new(0xde),
Amanda::Xfer::Dest::Fd->new(fileno($wfh)),
]);
my $cb = sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
die $msg->{elt} . " failed: " . $msg->{message};
} elsif ($msg->{'type'} == $XMSG_DONE) {
Amanda::MainLoop::quit();
}
};
$xfer->start($cb);
Amanda::MainLoop::run();
$xfer = undef;
close($rfh);
close($wfh);
# now verify the file contents are identical
open($rfh, "<", $read_filename);
my $src = do { local $/; <$rfh> };
close($rfh);
open($rfh, "<", $write_filename);
my $dest = do { local $/; <$rfh> };
close($rfh);
is($src,$dest,"read from Source::Fd identical");
unlink($read_filename);
unlink($write_filename);
}
{
my $read_filename = "$Installcheck::TMP/xfer-junk-src.tmp";
my $write_filename = "$Installcheck::TMP/xfer-junk-dest.tmp";
my ($rfh, $wfh);
mkdir($Installcheck::TMP) unless (-e $Installcheck::TMP);
# fill the file with some stuff
open($wfh, ">", $read_filename) or die("Could not open '$read_filename' for writing");
for my $i (1 .. 100) { print $wfh "line $i\n"; }
close($wfh);
open($wfh, ">", "$write_filename") or die("Could not open '$write_filename' for writing");
# now run a transfer out of it
my $xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::File->new($read_filename),
Amanda::Xfer::Filter::Xor->new(0xde),
Amanda::Xfer::Filter::Xor->new(0xde),
Amanda::Xfer::Dest::Fd->new(fileno($wfh)),
]);
my $cb = sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
die $msg->{elt} . " failed: " . $msg->{message};
} elsif ($msg->{'type'} == $XMSG_DONE) {
Amanda::MainLoop::quit();
}
};
$xfer->start($cb);
Amanda::MainLoop::run();
$xfer = undef;
close($wfh);
# now verify the file contents are identical
open($rfh, "<", $read_filename);
my $src = do { local $/; <$rfh> };
close($rfh);
open($rfh, "<", $write_filename);
my $dest = do { local $/; <$rfh> };
close($rfh);
is($src,$dest,"read from Source::File identical");
unlink($read_filename);
unlink($write_filename);
}
{
my $RANDOM_SEED = 0x5EAF00D;
# build a transfer that will keep going forever
my $xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Random->new(0, $RANDOM_SEED),
Amanda::Xfer::Filter::Xor->new(14),
Amanda::Xfer::Filter::Xor->new(14),
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
]);
my $got_timeout = 0;
Amanda::MainLoop::timeout_source(200)->set_callback(sub {
my ($src) = @_;
$got_timeout = 1;
$src->remove();
$xfer->cancel();
});
$xfer->start(sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
die $msg->{elt} . " failed: " . $msg->{message};
} elsif ($msg->{'type'} == $XMSG_DONE) {
Amanda::MainLoop::quit();
}
});
Amanda::MainLoop::run();
$xfer = undef;
ok($got_timeout, "A neverending transfer finishes after being cancelled");
# (note that this does not test all of the cancellation possibilities)
}
{
# build a transfer that will write to a read-only fd
my $read_filename = "$Installcheck::TMP/xfer-junk-src.tmp";
my $rfh;
# create the file
open($rfh, ">", $read_filename) or die("Could not open '$read_filename' for writing");
# open it for reading
open($rfh, "<", $read_filename) or die("Could not open '$read_filename' for reading");;
my $xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Random->new(0, 1),
Amanda::Xfer::Dest::Fd->new(fileno($rfh)),
]);
my $got_error = 0;
$xfer->start(sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
$got_error = 1;
} elsif ($msg->{'type'} == $XMSG_DONE) {
Amanda::MainLoop::quit();
}
});
Amanda::MainLoop::run();
$xfer = undef;
ok($got_error, "A transfer with an error cancels itself after sending an error");
unlink($read_filename);
}
# test the Process filter
{
my $RANDOM_SEED = 0xD00D;
my $xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
Amanda::Xfer::Filter::Process->new(
[ $Amanda::Constants::COMPRESS_PATH, $Amanda::Constants::COMPRESS_BEST_OPT ], 0, 0, 0, 0),
Amanda::Xfer::Filter::Process->new(
[ $Amanda::Constants::UNCOMPRESS_PATH, $Amanda::Constants::UNCOMPRESS_OPT ], 0, 0, 0, 0),
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
]);
$xfer->get_source()->set_callback(sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
die $msg->{elt} . " failed: " . $msg->{message};
} elsif ($msg->{'type'} == $XMSG_DONE) {
$src->remove();
Amanda::MainLoop::quit();
}
});
$xfer->start();
Amanda::MainLoop::run();
$xfer = undef;
pass("compress | uncompress gets back the original stream");
}
# test the Process filter (gzip exit code 2 is not an error)
{
my $RANDOM_SEED = 0xD00D;
# create a compressed file
my $write_filename = "$Installcheck::TMP/xfer-junk-dest.tmp";
my $wfh;
open($wfh, ">", "$write_filename") or die("Could not open '$write_filename' for writing");
my $xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
Amanda::Xfer::Filter::Process->new(
[ $Amanda::Constants::COMPRESS_PATH, $Amanda::Constants::COMPRESS_BEST_OPT ], 0, 0, 0, 0),
Amanda::Xfer::Dest::Fd->new(fileno($wfh))
]);
$xfer->get_source()->set_callback(sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
die $msg->{elt} . " failed: " . $msg->{message};
} elsif ($msg->{'type'} == $XMSG_DONE) {
$src->remove();
Amanda::MainLoop::quit();
}
});
$xfer->start();
Amanda::MainLoop::run();
$xfer = undef;
close($wfh);
# add non-compressed random byte to the compressed file
open($wfh, ">>", "$write_filename") or die("Could not open '$write_filename' for writing");
print $wfh "abcdefghijklmnopqrstuvwxyzA";
close($wfh);
open($wfh, "<", "$write_filename") or die("Could not open '$write_filename' for writing");
# uncompress it.
$xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Fd->new(fileno($wfh)),
Amanda::Xfer::Filter::Process->new(
[ $Amanda::Constants::UNCOMPRESS_PATH, $Amanda::Constants::UNCOMPRESS_OPT ], 0, 0, 0, 0),
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
]);
$xfer->get_source()->set_callback(sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
die $msg->{elt} . " failed: " . $msg->{message};
} elsif ($msg->{'type'} == $XMSG_DONE) {
$src->remove();
Amanda::MainLoop::quit();
}
});
$xfer->start();
Amanda::MainLoop::run();
$xfer = undef;
pass("uncompress with trailling bytes succeed");
}
{
my $RANDOM_SEED = 0x5EAF00D;
# build a transfer that will keep going forever, using a source that
# cannot produce an EOF, so Filter::Process is forced to kill the
# compress process
open(my $zerofd, "<", "/dev/zero")
or die("could not open /dev/zero: $!");
my $xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Fd->new($zerofd),
Amanda::Xfer::Filter::Process->new(
[ $Amanda::Constants::COMPRESS_PATH, $Amanda::Constants::COMPRESS_BEST_OPT ], 0, 0, 0, 0),
Amanda::Xfer::Dest::Null->new(0),
]);
my $got_timeout = 0;
Amanda::MainLoop::timeout_source(200)->set_callback(sub {
my ($src) = @_;
$got_timeout = 1;
$src->remove();
$xfer->cancel();
});
$xfer->get_source()->set_callback(sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
die $msg->{elt} . " failed: " . $msg->{message};
} elsif ($msg->{'type'} == $XMSG_DONE) {
$src->remove();
Amanda::MainLoop::quit();
}
});
$xfer->start();
Amanda::MainLoop::run();
$xfer = undef;
ok($got_timeout, "Amanda::Xfer::Filter::Process can be cancelled");
# (note that this does not test all of the cancellation possibilities)
}
# Test Amanda::Xfer::Dest::Buffer
{
my $dest = Amanda::Xfer::Dest::Buffer->new(1025);
my $xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Pattern->new(1024, "ABCDEFGH"),
$dest,
]);
$xfer->get_source()->set_callback(sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
die $msg->{elt} . " failed: " . $msg->{message};
} elsif ($msg->{'type'} == $XMSG_DONE) {
$src->remove();
Amanda::MainLoop::quit();
}
});
$xfer->start();
Amanda::MainLoop::run();
$xfer = undef;
is($dest->get(), 'ABCDEFGH' x 128,
"buffer captures the right bytes");
}
# Test that Amanda::Xfer::Dest::Buffer terminates an xfer early
{
my $dest = Amanda::Xfer::Dest::Buffer->new(100);
my $xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Pattern->new(1024, "ABCDEFGH"),
$dest,
]);
my $got_err = 0;
$xfer->get_source()->set_callback(sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
$got_err = 1;
} elsif ($msg->{'type'} == $XMSG_DONE) {
$src->remove();
Amanda::MainLoop::quit();
}
});
$xfer->start();
Amanda::MainLoop::run();
$xfer = undef;
ok($got_err, "buffer stops the xfer if it doesn't have space");
}
SKIP: {
skip "not built with server", 45 unless Amanda::Util::built_with_component("server");
my $disk_cache_dir = "$Installcheck::TMP";
my $RANDOM_SEED = 0xFACADE;
# exercise device source and destination
{
my $RANDOM_SEED = 0xFACADE;
my $xfer;
my $quit_cb = make_cb(quit_cb => sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{'type'} == $XMSG_ERROR) {
die $msg->{'elt'} . " failed: " . $msg->{'message'};
} elsif ($msg->{'type'} == $XMSG_DONE) {
Amanda::MainLoop::quit();
}
});
# set up vtapes
my $testconf = Installcheck::Run::setup();
$testconf->write();
config_init($CONFIG_INIT_EXPLICIT_NAME, "TESTCONF");
my ($cfgerr_level, @cfgerr_errors) = config_errors();
if ($cfgerr_level >= $CFGERR_WARNINGS) {
config_print_errors();
BAIL_OUT("config errors");
}
# set up a device for slot 1
my $chg = Amanda::Changer->new();
my $res = load_vtape_res($chg, 1);
my $device = $res->{'device'};
die("Could not open VFS device: " . $device->error())
unless ($device->status() == $Amanda::Device::DEVICE_STATUS_VOLUME_UNLABELED);
# write to it
my $hdr = Amanda::Header->new();
$hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
$hdr->{'name'} = "installcheck";
$hdr->{'disk'} = "/";
$hdr->{'datestamp'} = "20080102030405";
$hdr->{'program'} = "INSTALLCHECK";
$device->finish();
$device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
$device->start_file($hdr);
$xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
Amanda::Xfer::Dest::Device->new($device, 0),
]);
$xfer->start($quit_cb);
Amanda::MainLoop::run();
$xfer = undef;
pass("write to a device (completed succesfully; data may not be correct)");
# finish up the file and device
ok(!$device->in_file(), "not in_file");
ok($device->finish(), "finish");
# now turn around and read from it
$device->start($Amanda::Device::ACCESS_READ, undef, undef);
$device->seek_file(1);
$xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Device->new($device),
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
]);
$xfer->start($quit_cb);
Amanda::MainLoop::run();
$xfer = undef;
$res->release(finished_cb => sub { Amanda::MainLoop::quit() });
Amanda::MainLoop::run();
$xfer = undef;
$chg->quit();
pass("read from a device succeeded, too, and data was correct");
}
# extra params:
# cancel_after_partnum - after this partnum is completed, cancel the xfer
# do_not_retry - do not retry a failed part - cancel the xfer instead
sub test_taper_dest {
my ($src, $dest_sub, $expected_messages, $expected_crcs, $msg_prefix, %params) = @_;
my $xfer;
my $vtape_num = 1;
my @crcs;
my @messages;
# set up vtapes
my $testconf = Installcheck::Run::setup();
$testconf->write();
config_init($CONFIG_INIT_EXPLICIT_NAME, "TESTCONF");
my ($cfgerr_level, @cfgerr_errors) = config_errors();
if ($cfgerr_level >= $CFGERR_WARNINGS) {
config_print_errors();
BAIL_OUT("config errors");
}
my $hdr = Amanda::Header->new();
$hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
$hdr->{'name'} = "installcheck";
$hdr->{'disk'} = "/";
$hdr->{'datestamp'} = "20080102030405";
$hdr->{'program'} = "INSTALLCHECK";
# set up a device for the taper dest
my $chg = Amanda::Changer->new();
my $res = load_vtape_res($chg, $vtape_num++);
my $device = $res->{'device'};
die("Could not open VFS device: " . $device->error())
unless ($device->status() == $Amanda::Device::DEVICE_STATUS_VOLUME_UNLABELED);
$r = $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
$r = $device->property_set("LEOM", $params{'disable_leom'}? 0 : 1);
$device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
my $dest = $dest_sub->($device);
my ($successful, $eof, $partnum, $eom);
# and create the xfer
$xfer = Amanda::Xfer->new([ $src, $dest ]);
my $start_new_part_1;
my $start_new_part_2;
my $start_new_part = sub {
($successful, $eof, $partnum, $eom) = @_;
if (exists $params{'cancel_after_partnum'}
and $params{'cancel_after_partnum'} == $partnum) {
push @messages, "CANCEL";
$xfer->cancel();
return;
}
if (!$device || $eom) {
# set up a device and start writing a part to it
$device->finish() if $device;
if ($res) {
return $res->release(finished_cb => $start_new_part_1);
}
return $start_new_part_1->();
}
$start_new_part_2->();
};
$start_new_part_1 = sub {
$chg->load(slot => $vtape_num++,
res_cb => sub {
(my $err, $res) = @_;
my $device = $res->{'device'};
die("Could not open VFS device: " . $device->error())
unless ($device->status() == $Amanda::Device::DEVICE_STATUS_VOLUME_UNLABELED);
$dest->use_device($device);
$r = $device->property_set("LEOM", $params{'disable_leom'}? 0 : 1);
$r = $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
$device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
$start_new_part_2->();
});
};
$start_new_part_2 = sub {
# bail out if we shouldn't retry this part
if (!$successful and $params{'do_not_retry'}) {
push @messages, "NOT-RETRYING";
$xfer->cancel();
return;
}
if (!$eof) {
if ($successful) {
$dest->start_part(0, $hdr);
} else {
$dest->start_part(1, $hdr);
}
}
};
$xfer->start(sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{'type'} == $XMSG_ERROR) {
die $msg->{'elt'} . " failed: " . $msg->{'message'};
} elsif ($msg->{'type'} == $XMSG_PART_DONE) {
push @messages, "PART-" . $msg->{'partnum'} . '-' . $msg->{'size'} . '-' . ($msg->{'successful'}? "OK" : "FAILED");
push @messages, "EOM" if $msg->{'eom'};
$start_new_part->($msg->{'successful'}, $msg->{'eof'}, $msg->{'partnum'}, $msg->{'eom'});
} elsif ($msg->{'type'} == $XMSG_DONE) {
push @messages, "DONE";
Amanda::MainLoop::quit();
} elsif ($msg->{'type'} == $XMSG_CANCEL) {
push @messages, "CANCELLED";
} elsif ($msg->{'type'} == $XMSG_CRC) {
push @crcs, "$msg->{'crc'}:$msg->{'size'}";
} else {
push @messages, "$msg->{'type'}";
}
});
if ($src->isa("Amanda::Xfer::Source::Holding")) {
$src->start_recovery();
}
Amanda::MainLoop::call_later(sub { $start_new_part->(1, 0, -1); });
Amanda::MainLoop::run();
$res->release(finished_cb => sub { Amanda::MainLoop::quit() });
Amanda::MainLoop::run();
$chg->quit();
$xfer = undef;
is_deeply([@messages],
$expected_messages,
"$msg_prefix: element produces the correct series of messages")
or diag(Dumper([@messages]));
if ($expected_crcs) {
# we sort because the crc can come in different order
@crcs = sort @crcs;
@$expected_crcs = sort @$expected_crcs;
is_deeply([@crcs],
$expected_crcs,
"$msg_prefix: element produces the correct crcs")
or diag(Dumper([@crcs]));
}
}
sub run_recovery_source {
my ($dest, $files, $expected_messages, $expected_crcs, $finished_cb) = @_;
my $device;
my @filenums;
my @crcs;
my @messages;
my $xfer;
my $dev;
my $src;
my $chg;
my $res;
my $steps = define_steps
finalize => sub { $xfer = undef; },
cb_ref => \$finished_cb;
step setup => sub {
# we need a device up front, so sneak a peek into @$files
$chg = Amanda::Changer->new();
$chg->load(slot => $files->[0],
res_cb => $steps->{'first_load'});
};
step first_load => sub {
(my $err, $res) = @_;
$dev = $res->{'device'};
$src = Amanda::Xfer::Source::Recovery->new($dev);
$xfer = Amanda::Xfer->new([ $src, $dest ]);
$xfer->start($steps->{'got_xmsg'});
# got_xmsg will call got_ready when the element is ready
};
step got_ready => sub {
return $res->release(finished_cb => $steps->{'load_slot'}) if $res;
$steps->{'load_slot'}->();
};
step load_slot => sub {
if (!@$files) {
return $src->start_part(undef);
# (will trigger an XMSG_DONE; see below)
}
my $slot = shift @$files;
@filenums = @{ shift @$files };
$chg->load(slot => $slot,
res_cb => $steps->{'loaded'});
};
step loaded => sub {
(my $err, $res) = @_;
$dev = $res->{'device'};
if ($dev->status != $Amanda::Device::DEVICE_STATUS_SUCCESS) {
die $dev->error_or_status();
}
$src->use_device($dev);
if (!$dev->start($Amanda::Device::ACCESS_READ, undef, undef)) {
die $dev->error_or_status();
}
$steps->{'seek_file'}->();
};
step seek_file => sub {
if (!@filenums) {
return $steps->{'got_ready'}->();
}
my $hdr = $dev->seek_file(shift @filenums);
if (!$hdr) {
die $dev->error_or_status();
}
push @messages, "PART";
$src->start_part($dev);
};
step got_xmsg => sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{'type'} == $XMSG_ERROR) {
die $msg->{'elt'} . " failed: " . $msg->{'message'};
} elsif ($msg->{'type'} == $XMSG_PART_DONE) {
push @messages, "BYTES-" . $msg->{'size'};
$steps->{'seek_file'}->();
} elsif ($msg->{'type'} == $XMSG_DONE) {
push @messages, "DONE";
$steps->{'quit'}->();
} elsif ($msg->{'type'} == $XMSG_READY) {
push @messages, "READY";
$steps->{'got_ready'}->();
} elsif ($msg->{'type'} == $XMSG_CANCEL) {
push @messages, "CANCELLED";
} elsif ($msg->{'type'} == $XMSG_CRC) {
push @crcs, "$msg->{'crc'}:$msg->{'size'}";
}
};
step quit => sub {
is_deeply([@messages],
$expected_messages,
"files read back and verified successfully with Amanda::Xfer::Recovery::Source")
or diag(Dumper([@messages]));
if ($expected_crcs) {
#@crcs = sort @crcs;
#@$expected_crcs = sort @$expected_crcs;
is_deeply([@crcs],
$expected_crcs,
"element produces the correct crcs")
or diag(Dumper([@crcs]));
}
$chg->quit();
$finished_cb->();
};
}
sub test_recovery_source {
run_recovery_source(@_, \&Amanda::MainLoop::quit);
Amanda::MainLoop::run();
}
my $holding_base = "$Installcheck::TMP/source-holding";
my $holding_file;
# create a sequence of holding chunks, each 2MB.
sub make_holding_files {
my ($nchunks) = @_;
my $block = 'a' x 32768;
rmtree($holding_base);
mkpath($holding_base);
for (my $i = 0; $i < $nchunks; $i++) {
my $filename = "$holding_base/file$i";
open(my $fh, ">", "$filename");
my $hdr = Amanda::Header->new();
$hdr->{'type'} = ($i == 0)?
$Amanda::Header::F_DUMPFILE : $Amanda::Header::F_CONT_DUMPFILE;
$hdr->{'datestamp'} = "20070102030405";
$hdr->{'dumplevel'} = 0;
$hdr->{'compressed'} = 1;
$hdr->{'name'} = "localhost";
$hdr->{'disk'} = "/home";
$hdr->{'program'} = "INSTALLCHECK";
if ($i != $nchunks-1) {
$hdr->{'cont_filename'} = "$holding_base/file" . ($i+1);
}
print $fh $hdr->to_string(32768,32768);
for (my $b = 0; $b < 64; $b++) {
print $fh $block;
}
close($fh);
}
return "$holding_base/file0";
}
# first, test the simpler Splitter class
test_taper_dest(
Amanda::Xfer::Source::Random->new(1024*1951, $RANDOM_SEED),
sub {
my ($first_dev) = @_;
Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
520*1024, 0);
},
[ "PART-1-557056-OK", "PART-2-557056-OK", "PART-3-557056-OK",
"PART-4-326656-OK", "DONE" ],
[ 'e896f9b1:1997824', 'e896f9b1:1997824' ],
"Amanda::Xfer::Dest::Taper::Splitter - simple splitting");
test_recovery_source(
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
[ 1 => [ 1, 2, 3, 4 ], ],
[
'READY',
'PART',
'BYTES-557056',
'PART',
'BYTES-557056',
'PART',
'BYTES-557056',
'PART',
'BYTES-326656',
'DONE'
],
[ 'afce8b74:557056',
'fe33b4e9:1114112',
'869c563f:1671168',
'e896f9b1:1997824',
'e896f9b1:1997824' ]
);
test_taper_dest(
Amanda::Xfer::Source::Random->new(1024*1024*3.1, $RANDOM_SEED),
sub {
my ($first_dev) = @_;
Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
1024*1024, 0);
},
[ "PART-1-1048576-OK", "PART-2-1048576-OK", "PART-3-294912-OK", "EOM",
"PART-4-858521-OK",
"DONE" ],
[ 'eda70336:3250585', 'eda70336:3250585' ],
"Amanda::Xfer::Dest::Taper::Splitter - splitting and spanning with LEOM");
test_recovery_source(
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
[ 1 => [ 1, 2, 3 ], 2 => [ 1, ], ],
[
'READY',
'PART',
'BYTES-1048576',
'PART',
'BYTES-1048576',
'PART',
'BYTES-294912',
'PART',
'BYTES-858521',
'DONE'
],
[ 'd4b66333:1048576',
'ab6d231b:2097152',
'0e0acec2:2392064',
'eda70336:3250585',
'eda70336:3250585' ],
);
test_taper_dest(
Amanda::Xfer::Source::Random->new(1024*1024*1.5, $RANDOM_SEED),
sub {
my ($first_dev) = @_;
Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
0, 0);
},
[ "PART-1-1572864-OK",
"DONE" ],
[ '102b1445:1572864', '102b1445:1572864' ],
"Amanda::Xfer::Dest::Taper::Splitter - no splitting");
test_recovery_source(
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
[ 1 => [ 1, ], ],
[
'READY',
'PART',
'BYTES-1572864',
'DONE'
],
[ '102b1445:1572864',
'102b1445:1572864' ],
);
test_taper_dest(
Amanda::Xfer::Source::Random->new(1024*1024*3.1, $RANDOM_SEED),
sub {
my ($first_dev) = @_;
Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
2368*1024, 0);
},
[ "PART-1-2424832-OK", "PART-2-0-OK", "EOM",
"PART-2-825753-OK",
"DONE" ],
[ 'eda70336:3250585', 'eda70336:3250585' ],
"Amanda::Xfer::Dest::Taper::Splitter - LEOM hits in file 2 header");
test_recovery_source(
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
[ 1 => [ 1, 2 ], 2 => [ 1, ], ],
[
'READY',
'PART',
'BYTES-2424832',
'PART',
'BYTES-0', # this wouldn't be in the catalog, but it's on the vtape
'PART',
'BYTES-825753',
'DONE'
],
[ '96ff2746:2424832',
'96ff2746:2424832',
'eda70336:3250585',
'eda70336:3250585' ],
);
test_taper_dest(
Amanda::Xfer::Source::Random->new(1024*1024*3.1, $RANDOM_SEED),
sub {
my ($first_dev) = @_;
Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
2368*1024, 0);
},
[ "PART-1-2424832-OK", "PART-2-98304-FAILED", "EOM",
"NOT-RETRYING", "CANCELLED", "DONE" ],
[ '96ff2746:2424832', '626c1849:2621440' ],
"Amanda::Xfer::Dest::Taper::Splitter - LEOM fails, PEOM => failure",
disable_leom => 1, do_not_retry => 1);
# run A::X::Dest::Taper::Cacher test in each of a few different cache permutations
test_taper_dest(
Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
sub {
my ($first_dev) = @_;
Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024,
1024*1024, 1, undef),
},
[ "PART-1-1048576-OK", "PART-2-1048576-OK", "PART-3-393216-FAILED",
"EOM", "PART-3-1048576-OK", "PART-4-1048576-OK", "PART-5-104857-OK",
"DONE" ],
[ 'c201f5aa:4299161' ],
"Amanda::Xfer::Dest::Taper::Cacher - mem cache");
test_recovery_source(
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
[ 1 => [ 1, 2 ], 2 => [ 1, 2, 3 ], ],
[
'READY',
'PART',
'BYTES-1048576',
'PART',
'BYTES-1048576',
'PART',
'BYTES-1048576',
'PART',
'BYTES-1048576',
'PART',
'BYTES-104857',
'DONE'
],
[ 'd4b66333:1048576',
'ab6d231b:2097152',
'ca8b7765:3145728',
'8f6c1b34:4194304',
'c201f5aa:4299161',
'c201f5aa:4299161' ],
);
test_taper_dest(
Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
sub {
my ($first_dev) = @_;
Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024,
1024*1024, 0, $disk_cache_dir),
},
[ "PART-1-1048576-OK", "PART-2-1048576-OK", "PART-3-393216-FAILED",
"EOM", "PART-3-1048576-OK", "PART-4-1048576-OK", "PART-5-104857-OK",
"DONE" ],
[ 'c201f5aa:4299161' ],
"Amanda::Xfer::Dest::Taper::Cacher - disk cache");
test_recovery_source(
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
[ 1 => [ 1, 2 ], 2 => [ 1, 2, 3 ], ],
[
'READY',
'PART',
'BYTES-1048576',
'PART',
'BYTES-1048576',
'PART',
'BYTES-1048576',
'PART',
'BYTES-1048576',
'PART',
'BYTES-104857',
'DONE'
],
[ 'd4b66333:1048576',
'ab6d231b:2097152',
'ca8b7765:3145728',
'8f6c1b34:4194304',
'c201f5aa:4299161',
'c201f5aa:4299161' ]
);
test_taper_dest(
Amanda::Xfer::Source::Random->new(1024*1024*2, $RANDOM_SEED),
sub {
my ($first_dev) = @_;
Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024,
1024*1024, 0, undef),
},
[ "PART-1-1048576-OK", "PART-2-1048576-OK", "PART-3-0-OK",
"DONE" ],
[ 'ab6d231b:2097152' ],
"Amanda::Xfer::Dest::Taper::Cacher - no cache (no failed parts; exact multiple of part size)");
test_recovery_source(
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
[ 1 => [ 1, 2, 3 ], ],
[
'READY',
'PART',
'BYTES-1048576',
'PART',
'BYTES-1048576',
'PART',
'BYTES-0',
'DONE'
],
[ 'd4b66333:1048576',
'ab6d231b:2097152',
'ab6d231b:2097152',
'ab6d231b:2097152' ]
);
test_taper_dest(
Amanda::Xfer::Source::Random->new(1024*1024*2, $RANDOM_SEED),
sub {
my ($first_dev) = @_;
Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024, 0, 0, undef),
},
[ "PART-1-2097152-OK", "DONE" ],
[ 'ab6d231b:2097152' ],
"Amanda::Xfer::Dest::Taper::Cacher - no splitting (fits on volume)");
test_recovery_source(
Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
[ 1 => [ 1 ], ],
[
'READY',
'PART',
'BYTES-2097152',
'DONE'
],
[ 'ab6d231b:2097152',
'ab6d231b:2097152' ]
);
test_taper_dest(
Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
sub {
my ($first_dev) = @_;
Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024, 0, 0, undef),
},
[ "PART-1-2555904-FAILED", "EOM",
"NOT-RETRYING", "CANCELLED", "DONE" ],
[ '00000000:0' ],
"Amanda::Xfer::Dest::Taper::Cacher - no splitting (doesn't fit on volume -> fails)",
do_not_retry => 1);
test_taper_dest(
Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
sub {
my ($first_dev) = @_;
Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024,
1024*1024, 0, $disk_cache_dir),
},
[ "PART-1-1048576-OK", "PART-2-1048576-OK", "PART-3-393216-FAILED",
"EOM", "PART-3-1048576-OK", "PART-4-1048576-OK", "CANCEL",
"CANCELLED", "DONE" ],
[ '8f6c1b34:4194304' ],
"Amanda::Xfer::Dest::Taper::Cacher - cancellation after success",
cancel_after_partnum => 4);
# set up a few holding chunks and read from those
$holding_file = make_holding_files(3);
my $xfer_source_holding = Amanda::Xfer::Source::Holding->new($holding_file);
test_taper_dest(
$xfer_source_holding,
sub {
my ($first_dev) = @_;
Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
1024*1024, 1);
},
[ "PART-1-1048576-OK", "PART-2-1048576-OK", "PART-3-393216-FAILED", "EOM",
"PART-3-1048576-OK", "PART-4-1048576-OK", "PART-5-393216-FAILED", "EOM",
"PART-5-1048576-OK", "PART-6-1048576-OK", "PART-7-0-OK",
"DONE" ],
[ 'a27c85ce:6291456', 'a27c85ce:6291456' ],
"Amanda::Xfer::Dest::Taper::Splitter - Amanda::Xfer::Source::Holding "
. "acts as a source and supplies cache_inform",
disable_leom => 1);
##
# test the cache_inform method
sub test_taper_dest_splitter_cache_inform {
my %params = @_;
my $xfer;
my $device;
my $fh;
my $part_size = 1024*1024;
my $file_size = $part_size * 4 + 100 * 1024;
my $cache_file = "$Installcheck::TMP/cache_file";
my $vtape_num = 1;
my @crcs;
# set up our "cache", cleverly using an Amanda::Xfer::Dest::Fd
open($fh, ">", "$cache_file") or die("Could not open '$cache_file' for writing");
$xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Random->new($file_size, $RANDOM_SEED),
Amanda::Xfer::Dest::Fd->new(fileno($fh)),
]);
$xfer->start(sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{'type'} == $XMSG_ERROR) {
die $msg->{'elt'} . " failed: " . $msg->{'message'};
} elsif ($msg->{'type'} == $XMSG_DONE) {
Amanda::MainLoop::quit();
}
});
Amanda::MainLoop::run();
$xfer = undef;
close($fh);
# create a list of holding chunks, some slab-aligned, some part-aligned,
# some not
my @holding_chunks;
my $offset = 0;
my $do_chunk = sub {
my ($break) = @_;
die unless $break > $offset;
push @holding_chunks, [ $cache_file, $offset, $break - $offset ];
$offset = $break;
};
$do_chunk->(277);
$do_chunk->($part_size);
$do_chunk->($part_size+128*1024);
$do_chunk->($part_size*3);
$do_chunk->($part_size*3+1024);
$do_chunk->($part_size*3+1024*2);
$do_chunk->($part_size*3+1024*3);
$do_chunk->($part_size*4);
$do_chunk->($part_size*4 + 77);
$do_chunk->($file_size - 1);
$do_chunk->($file_size);
# set up vtapes
my $testconf = Installcheck::Run::setup();
$testconf->write();
config_init($CONFIG_INIT_EXPLICIT_NAME, "TESTCONF");
my ($cfgerr_level, @cfgerr_errors) = config_errors();
if ($cfgerr_level >= $CFGERR_WARNINGS) {
config_print_errors();
BAIL_OUT("config errors");
}
my $hdr = Amanda::Header->new();
$hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
$hdr->{'name'} = "installcheck";
$hdr->{'disk'} = "/";
$hdr->{'datestamp'} = "20080102030405";
$hdr->{'program'} = "INSTALLCHECK";
# set up the cache file
open($fh, "<", "$cache_file") or die("Could not open '$cache_file' for reading");
# set up a device for writing
my $chg = Amanda::Changer->new();
my $res = load_vtape_res($chg, $vtape_num++);
$device = $res->{'device'};
die("Could not open VFS device: " . $device->error())
unless ($device->status() == $Amanda::Device::DEVICE_STATUS_VOLUME_UNLABELED);
$r = $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
$r = $device->property_set("LEOM", 0);
$device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
my $dest = Amanda::Xfer::Dest::Taper::Splitter->new($device, 128*1024,
1024*1024, 1);
$xfer = Amanda::Xfer->new([
Amanda::Xfer::Source::Fd->new(fileno($fh)),
$dest,
]);
my ($successful, $eof, $last_partnum);
my $start_new_part_1;
my $start_new_part_2;
my $start_new_part = sub {
($successful, $eof, $last_partnum) = @_;
if (!$device || !$successful) {
# set up a device and start writing a part to it
$device->finish() if $device;
if ($res) {
return $res->release(finished_cb => $start_new_part_1);
}
return $start_new_part_1->();
}
$start_new_part_2->();
};
$start_new_part_1 = sub {
$chg->load(slot => $vtape_num++,
res_cb => sub {
(my $err, $res) = @_;
$device = $res->{'device'};
die("Could not open VFS device: " . $device->error())
unless ($device->status() == $Amanda::Device::DEVICE_STATUS_VOLUME_UNLABELED);
$dest->use_device($device);
$r = $device->property_set("LEOM", 0);
$r = $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
$device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
$start_new_part_2->();
});
};
$start_new_part_2 = sub {
# feed enough chunks to cache_inform
my $upto = ($last_partnum+2) * $part_size;
while (@holding_chunks and $holding_chunks[0]->[1] < $upto) {
my ($filename, $offset, $length) = @{shift @holding_chunks};
$dest->cache_inform($filename, $offset, $length);
}
if (!$eof) {
if ($successful) {
$dest->start_part(0, $hdr);
} else {
$dest->start_part(1, $hdr);
}
}
};
my @messages;
$xfer->start(sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{'type'} == $XMSG_ERROR) {
push @messages, "ERROR: $msg->{message}";
} elsif ($msg->{'type'} == $XMSG_PART_DONE) {
push @messages, "PART-" . $msg->{'partnum'} . '-' . $msg->{'size'} . '-' . ($msg->{'successful'}? "OK" : "FAILED");
$start_new_part->($msg->{'successful'}, $msg->{'eof'}, $msg->{'partnum'});
} elsif ($msg->{'type'} == $XMSG_DONE) {
push @messages, "DONE";
$res->release(finished_cb => sub {
$chg->quit();
Amanda::MainLoop::quit();
});
} elsif ($msg->{'type'} == $XMSG_CANCEL) {
push @messages, "CANCELLED";
} elsif ($msg->{'type'} == $XMSG_CRC) {
push @crcs, "$msg->{'crc'}:$msg->{'size'}";
} else {
push @messages, $msg->{'type'};
}
});
Amanda::MainLoop::call_later(sub { $start_new_part->(1, 0, -1); });
Amanda::MainLoop::run();
$xfer = undef;
unlink($cache_file);
return ( @messages, @crcs );
}
is_deeply([ test_taper_dest_splitter_cache_inform() ],
[ "PART-1-1048576-OK", "PART-2-1048576-OK", "PART-3-393216-FAILED",
"PART-3-1048576-OK", "PART-4-1048576-OK", "PART-5-102400-OK",
"DONE", '9548ff3c:4296704', '9548ff3c:4296704'],
"cache_inform: splitter element produces the correct series of messages");
rmtree($holding_base);
}
# test Amanda::Xfer::Dest::Taper::DirectTCP; do it twice, once with a cancellation
SKIP: {
skip "not built with ndmp and server", 3 unless
Amanda::Util::built_with_component("ndmp") and Amanda::Util::built_with_component("server");
my $RANDOM_SEED = 0xFACADE;
# make XDT output fairly verbose
$Amanda::Config::debug_taper = 2;
my $ndmp = Installcheck::Mock::NdmpServer->new();
my $ndmp_port = $ndmp->{'port'};
my $drive = $ndmp->{'drive'};
my $mkdevice = sub {
my $dev = Amanda::Device->new("ndmp:127.0.0.1:$ndmp_port\@$drive");
die "can't create device" unless $dev->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS;
$r = $dev->property_set("indirect", 0) and die "can't set INDIRECT: $r";
$r = $dev->property_set("verbose", 1) and die "can't set VERBOSE: $r";
$r = $dev->property_set("ndmp_username", "ndmp") and die "can't set username: $r";
$r = $dev->property_set("ndmp_password", "ndmp") and die "can't set password: $r";
return $dev;
};
my $hdr = Amanda::Header->new();
$hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
$hdr->{'name'} = "installcheck";
$hdr->{'disk'} = "/";
$hdr->{'datestamp'} = "20080102030405";
$hdr->{'program'} = "INSTALLCHECK";
for my $do_cancel (0, 'later', 'in_setup') {
my $dev;
my $xfer;
my @messages;
# make a starting device
$dev = $mkdevice->();
# and create the xfer
my $src = Amanda::Xfer::Source::Random->new(32768*34-7, $RANDOM_SEED);
# note we ask for slightly less than 15 blocks; the dest should round up
my $dest = Amanda::Xfer::Dest::Taper::DirectTCP->new($dev, 32768*16-99);
$xfer = Amanda::Xfer->new([ $src, $dest ]);
my $start_new_part; # forward declaration
my $xmsg_cb = sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{'type'} == $XMSG_ERROR) {
# if this is an expected error, don't die
if ($do_cancel eq 'in_setup' and $msg->{'message'} =~ /operation not supported/) {
push @messages, "ERROR";
} else {
die $msg->{'elt'} . " failed: " . $msg->{'message'};
}
} elsif ($msg->{'type'} == $XMSG_READY) {
push @messages, "READY";
# get ourselves a new (albeit identical) device, just to prove that the connections
# are a little bit portable
$dev->finish();
$dev = $mkdevice->();
$dest->use_device($dev);
$dev->start($Amanda::Device::ACCESS_WRITE, "TESTCONF02", "20080102030406");
$start_new_part->(1, 0); # start first part
} elsif ($msg->{'type'} == $XMSG_PART_DONE) {
push @messages, "PART-" . $msg->{'partnum'} . '-' . $msg->{'size'} . '-' . ($msg->{'successful'}? "OK" : "FAILED");
if ($do_cancel and $msg->{'partnum'} == 2) {
$xfer->cancel();
} else {
$start_new_part->($msg->{'successful'}, $msg->{'eof'});
}
} elsif ($msg->{'type'} == $XMSG_DONE) {
push @messages, "DONE";
Amanda::MainLoop::quit();
} elsif ($msg->{'type'} == $XMSG_CANCEL) {
push @messages, "CANCELLED";
} elsif ($msg->{'type'} == $XMSG_CRC) {
#push @messages, "CRC";
} else {
push @messages, "$msg->{'type'}";
}
};
# trigger an error in the xfer dest's setup method by putting the device
# in an error state. NDMP devices do not support append, so starting in
# append mode should trigger the failure.
if ($do_cancel eq 'in_setup') {
no warnings 'once';
if ($dev->start($Amanda::Device::ACCESS_APPEND, "MYLABEL", undef)) {
die "successfully started NDMP device in ACCESS_APPEND?!";
}
use warnings 'once';
}
$xfer->start($xmsg_cb);
$start_new_part = sub {
my ($successful, $eof) = @_;
die "this dest shouldn't have unsuccessful parts" unless $successful;
if (!$eof) {
$dest->start_part(0, $hdr);
}
};
Amanda::MainLoop::run();
$xfer = undef;
$dev->finish();
if (!$do_cancel) {
is_deeply([@messages],
[ 'READY', 'PART-1-524288-OK', 'PART-2-524288-OK',
'PART-3-65536-OK', 'DONE' ],
"Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages")
or diag(Dumper([@messages]));
} elsif ($do_cancel eq 'in_setup') {
is_deeply([@messages],
[ 'ERROR', 'CANCELLED', 'DONE' ],
"Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages when cancelled during setup")
or diag(Dumper([@messages]));
} else {
is_deeply([@messages],
[ 'READY', 'PART-1-524288-OK', 'PART-2-524288-OK', 'CANCELLED',
'DONE' ],
"Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages when cancelled in mid-xfer")
or diag(Dumper([@messages]));
}
}
# Amanda::Xfer::Source::Recovery's directtcp functionality is not
# tested here, as to do so would basically require re-implementing
# Amanda::Recovery::Clerk; the xfer source is adequately tested by
# the Amanda::Recovery::Clerk tests.
$ndmp->cleanup();
}
if (!Amanda::Util::built_with_component("server")) {
my $testconf = Installcheck::Run::setup();
$testconf->write();
config_init($CONFIG_INIT_EXPLICIT_NAME|$CONFIG_INIT_CLIENT, "TESTCONF");
my ($cfgerr_level, @cfgerr_errors) = config_errors();
if ($cfgerr_level >= $CFGERR_WARNINGS) {
config_print_errors();
BAIL_OUT("config errors");
}
}
# directtcp stuff
{
my $RANDOM_SEED = 0x13131313; # 13 is bad luck, right?
# we want this to look like:
# A: [ Random -> DirectTCPConnect ]
# --dtcp-->
# B: [ DirectTCPListen -> filter -> DirectTCPListen ]
# --dtcp-->
# C: [ DirectTCPConnect -> filter -> Null ]
#
# this tests both XFER_MECH_DIRECTTCP_CONNECT and
# XFER_MECH_DIRECTTCP_LISTEN, as well as some of the glue
# used to attach those to filters.
#
# that means we need to start transfer B, since it has all of the
# addresses, before creating A or C.
my $done = { };
my $handle_msg = sub {
my ($letter, $src, $msg, $xfer) = @_;
if ($msg->{type} == $XMSG_ERROR) {
die $msg->{elt} . " failed: " . $msg->{message};
} elsif ($msg->{'type'} == $XMSG_DONE) {
$done->{$letter} = 1;
}
if ($done->{'A'} and $done->{'B'} and $done->{'C'}) {
Amanda::MainLoop::quit();
}
};
my %cbs;
for my $letter ('A', 'B', 'C') {
$cbs{$letter} = sub { $handle_msg->($letter, @_); };
}
my $src_listen = Amanda::Xfer::Source::DirectTCPListen->new();
my $dst_listen = Amanda::Xfer::Dest::DirectTCPListen->new();
my $xferB = Amanda::Xfer->new([
$src_listen,
Amanda::Xfer::Filter::Xor->new(0x13),
$dst_listen
]);
$xferB->start($cbs{'B'});
my $xferA = Amanda::Xfer->new([
Amanda::Xfer::Source::Random->new(1024*1024*3, $RANDOM_SEED),
Amanda::Xfer::Dest::DirectTCPConnect->new($src_listen->get_addrs())
]);
$xferA->start($cbs{'A'});
my $xferC = Amanda::Xfer->new([
Amanda::Xfer::Source::DirectTCPConnect->new($dst_listen->get_addrs()),
Amanda::Xfer::Filter::Xor->new(0x13),
Amanda::Xfer::Dest::Null->new($RANDOM_SEED)
]);
$xferC->start($cbs{'C'});
# let the already-started transfers go out of scope before they
# complete, as a memory management test..
Amanda::MainLoop::run();
$xferA = undef;
$xferB = undef;
$xferC = undef;
pass("Three xfers interlinked via DirectTCP complete successfully");
}
# try cancelling a DirectTCP xfer while it's waiting in accept()
{
my $xfer_src = Amanda::Xfer::Source::DirectTCPListen->new();
my $xfer_dst = Amanda::Xfer::Dest::Null->new(0);
my $xfer = Amanda::Xfer->new([ $xfer_src, $xfer_dst ]);
# start up the transfer, which starts a thread which will accept
# soon after that.
$xfer->start(sub {
my ($src, $msg, $xfer) = @_;
if ($msg->{'type'} == $XMSG_DONE) {
Amanda::MainLoop::quit();
}
});
sleep(1);
# Now, ideally we'd wait until the accept() is running, maybe testing it
# with a SYN or something like that. This is not terribly critical,
# because the element glue does not check for cancellation before it begins
# accepting.
$xfer->cancel();
Amanda::MainLoop::run();
$xfer = undef;
pass("A DirectTCP accept operation can be cancelled");
}
# test element comparison
{
my $a = Amanda::Xfer::Filter::Xor->new(0);
my $b = Amanda::Xfer::Filter::Xor->new(1);
ok($a == $a, "elements compare equal to themselves");
ok(!($a == $b), ".. and not to other elements");
ok(!($a != $a), "elements do not compare != to themselves");
ok($a != $b, ".. but are != to other elements");
}