Blob Blame History Raw
# Copyright (c) 2009-2012 Zmanda Inc.  All Rights Reserved.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
#
# Contact information: Carbonite Inc., 756 N Pastoria Ave
# Sunnyvale, CA 94086, USA, or: http://www.zmanda.com

=head1 NAME

Amanda::Taper::Worker

=head1 DESCRIPTION

This package is a component of the Amanda taper, and is not intended for use by
other scripts or applications.

This package interface between L<Amanda::Taper::Controller> and L<Amanda::Taper::Scribe>.

The worker use an L<Amanda::Taper::Scribe> object to execute the request
received from the L<Amanda::Taper::Controller>.

=cut

use strict;
use warnings;

package Amanda::Taper::Worker;

use Carp;
use POSIX qw( :errno_h );
use Amanda::Changer;
use Amanda::Config qw( :getconf config_dir_relative );
use Amanda::Debug qw( :logging );
use Amanda::Device qw( :constants );
use Amanda::Header;
use Amanda::Holding;
use Amanda::MainLoop qw( :GIOCondition );
use Amanda::MainLoop;
use Amanda::Taper::Protocol;
use Amanda::Taper::Scan;
use Amanda::Taper::Scribe qw( get_splitting_args_from_config );
use Amanda::Logfile qw( :logtype_t log_add make_stats );
use Amanda::Xfer qw( :constants );
use Amanda::Util qw( quote_string );
use Amanda::Tapelist;
use Amanda::Recovery::Planner;
use Amanda::Recovery::Scan;
use Amanda::Recovery::Clerk;
use File::Temp;

use parent -norequire, qw( Amanda::Taper::Scribe::Feedback Amanda::Recovery::Clerk::Feedback);

our $tape_num = 0;

sub new {
    my $class           = shift;
    my $taper_name      = shift;
    my $worker_name     = shift;
    my $controller      = shift;
    my $write_timestamp = shift;

    my $self = bless {
	state       => "init",
	taper_name  => $taper_name,
	worker_name => $worker_name,
	controller  => $controller,
	scribe      => undef,
	timestamp   => $write_timestamp,

	# filled in when a write starts:
	xfer => undef,
	xfer_source => undef,
	xfer_dest => undef,
	handle => undef,
	hostname => undef,
	diskname => undef,
	datestamp => undef,
	level => undef,
	header => undef,
	doing_port_write => undef,
	doing_shm_write => undef,
	input_errors => [],

	# periodic status updates
	timer => undef,
	status_filename => undef,
	status_fh => undef,

	# filled in after the header is available
	header => undef,

	# filled in when a new tape is started:
	label => undef
    }, $class;

    my $scribe = Amanda::Taper::Scribe->new(
	worker => $self,
	taperscan => $controller->{'taperscan'},
	feedback => $self,
	debug => $Amanda::Config::debug_taper);

    $self->{'scribe'} = $scribe;
    $self->{'scribe'}->start(write_timestamp => $write_timestamp,
	finished_cb => sub { $self->_scribe_started_cb(@_); });

    return $self;
}

# called when the scribe is fully started up and ready to go
sub _scribe_started_cb {
    my $self = shift;
    my $err  = shift;
    my $allow_take_scribe_from = shift;

    if ($err) {
	$self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPE_ERROR,
		worker_name  => $self->{'worker_name'},
		message => "$err");
	$self->{'state'} = "error";

	# log the error (note that the message is intentionally not quoted)
	log_add($L_ERROR, "no-tape error [$err]");

    } else {
	$self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPER_OK,
		worker_name => $self->{'worker_name'},
		allow_take_scribe_from => $allow_take_scribe_from?"ALLOW-TAKE-SCRIBE-FROM":"NO-TAKE-SCRIBE-FROM");
	$self->{'state'} = "idle";
    }
}


sub FILE_WRITE {
    my $self = shift;
    my ($msgtype, %params) = @_;
    $self->_assert_in_state("idle") or return;

    $self->{'doing_port_write'} = 0;
    $self->{'doing_shm_write'} = 0;
    $self->{'doing_vault'} = 0;

    $self->setup_and_start_dump($msgtype,
	dump_cb => sub { $self->dump_cb(@_); },
	%params);
}

sub PORT_WRITE {
    my $self = shift;
    my ($msgtype, %params) = @_;

    my $read_cb;

    $self->_assert_in_state("idle") or return;

    $self->{'doing_port_write'} = 1;
    $self->{'doing_shm_write'} = 0;
    $self->{'doing_vault'} = 0;

    $self->setup_and_start_dump($msgtype,
	dump_cb => sub { $self->dump_cb(@_); },
	%params);
}

sub SHM_WRITE {
    my $self = shift;
    my ($msgtype, %params) = @_;

    my $read_cb;

    $self->_assert_in_state("idle") or return;

    $self->{'doing_port_write'} = 0;
    $self->{'doing_shm_write'} = 1;
    $self->{'doing_vault'} = 0;

    $self->setup_and_start_dump($msgtype,
	dump_cb => sub { $self->dump_cb(@_); },
	%params);
}

sub VAULT_WRITE {
    my $self = shift;
    my ($msgtype, %params) = @_;

    my $read_cb;

    $self->_assert_in_state("idle") or return;

    $self->{'doing_port_write'} = 0;
    $self->{'doing_shm_write'} = 0;
    $self->{'doing_vault'} = 1;

    $self->setup_and_start_dump($msgtype,
	dump_cb => sub { $self->dump_cb(@_); },
	%params);
}

sub START_SCAN {
    my $self = shift;
    my ($msgtype, %params) = @_;

    $self->{'scribe'}->start_scan(undef);
}

sub NEW_TAPE {
    my $self = shift;
    my ($msgtype, %params) = @_;

    $self->_assert_in_state("writing") or return;

    $self->{'perm_cb'}->(allow => 1);
}

sub NO_NEW_TAPE {
    my $self = shift;
    my ($msgtype, %params) = @_;

    $self->_assert_in_state("writing") or return;

    # log the error (note that the message is intentionally not quoted)
    log_add($L_ERROR, "no-tape config [$params{reason}]");

    $self->{'perm_cb'}->(cause => "config", message => $params{'reason'});
}

sub TAKE_SCRIBE_FROM {
    my $self = shift;
    my ($worker1, $msgtype, %params) = @_;

    $self->_assert_in_state("writing") or return;
    $worker1->_assert_in_state("idle") or return;

    my $scribe = $self->{'scribe'};
    my $scribe1 = $worker1->{'scribe'};
    $self->{'scribe'} = $scribe1;
    $scribe1->{'worker'} = $self;
    $worker1->{'scribe'} = $scribe;
    $scribe->{'worker'} = $worker1;

    $self->{'label'} = $worker1->{'label'};
    $self->{'perm_cb'}->(scribe => $scribe1);
    delete $worker1->{'scribe'};
    $worker1->{'state'} = 'error';
    $scribe->quit(finished_cb => sub {});
    $scribe1->{'cancelled'} = 0;
 }

sub DONE {
    my $self = shift;
    my ($msgtype, %params) = @_;

    if (!defined($self->{'handle'}) or $params{'handle'} ne $self->{'handle'}) {
	# ignore message for previous handle
	return;
    }

    if (defined $self->{'dumper_status'}) {
	# ignore duplicate message
	return
    }

    $self->{'dumper_status'} = "DONE";
    $self->{'orig_kb'} = $params{'orig_kb'};
    $self->{'native_crc'} = $params{'native_crc'};
    $self->{'client_crc'} = $params{'client_crc'};
    if (defined $self->{'result'}) {
	$self->result_cb(undef);
    }
}

sub FAILED {
    my $self = shift;
    my ($msgtype, %params) = @_;

    if (!defined($self->{'handle'}) or $params{'handle'} ne $self->{'handle'}) {
	# ignore message for previous handle
	return;
    }

    if (defined $self->{'dumper_status'}) {
	# ignore duplicate message
	return
    }

    $self->{'dumper_status'} = "FAILED";
    if (defined $self->{'header_xfer'}) {
	$self->{'header_xfer'}->cancel();
    } elsif (defined $self->{'result'}) {
	$self->result_cb(undef);
    } elsif (!defined $self->{'scribe'}->{'xdt'}) {
	# ignore, the dump is already cancelled or not yet started.
    } elsif (!defined $self->{'scribe'}->{'xfer'}) {
	# ignore, the dump is already cancelled or not yet started.
    } else { # Abort the dump
	push @{$self->{'input_errors'}}, "dumper failed";
	$self->{'scribe'}->cancel_dump(
		xfer => $self->{'scribe'}->{'xfer'},
		dump_cb => $self->{'dump_cb'});
    }
}

sub CLOSE_VOLUME {
    my $self = shift;
    my ($msgtype, %params) = @_;

    $self->_assert_in_state("idle") or return;

    $self->{'scribe'}->close_volume(close_volume_cb => sub {
	    my %msg_params = (
		worker_name => $self->{'worker_name'}
	    );
	    $msgtype = Amanda::Taper::Protocol::CLOSED_VOLUME;
	    $self->{'controller'}->{'proto'}->send($msgtype, %msg_params);
    });
}

sub CLOSE_SOURCE_VOLUME {
    my $self = shift;
    my ($msgtype, %params) = @_;

    $self->_assert_in_state("idle") or return;

    $self->{'src'}->{'clerk'}->close_volume(close_volume_cb => sub {

	    my %msg_params = (
		worker_name => $self->{'worker_name'}
	    );
	    $msgtype = Amanda::Taper::Protocol::CLOSED_SOURCE_VOLUME;
	    $self->{'controller'}->{'proto'}->send($msgtype, %msg_params);
    }) if defined $self->{'src'}->{'clerk'};
}

sub result_cb {
    my $self = shift;
    my %params = %{$self->{'dump_params'}};
    my $msgtype;
    my $logtype;

    if (!defined $self->{'source_server_crc'}) {
	$self->{'source_server_crc'} = '00000000:0';
    }
    if (!defined $self->{'dest_server_crc'}) {
	$self->{'dest_server_crc'} = '00000000:0';
    }
    if (!defined $self->{'server_crc'}) {
	$self->{'server_crc'} = '00000000:0';
    }
    if ($self->{'server_crc'} ne '00000000:0' and
	$self->{'source_server_crc'} ne '00000000:0' and
	$self->{'server_crc'} ne $self->{'source_server_crc'}) {
	if ($params{'result'} eq 'DONE') {
	    $params{'result'} = 'PARTIAL';
	    push @{$self->{'input_errors'}}, "source server crc ($self->{'source_server_crc'}) and input server crc ($self->{'server_crc'}) differ)";
	}
    }

    if ($self->{'source_server_crc'} ne '00000000:0' and
	$self->{'dest_server_crc'} ne '00000000:0' and
	$self->{'source_server_crc'} ne $self->{'dest_server_crc'}) {
	if ($params{'result'} eq 'DONE') {
	    $params{'result'} = 'PARTIAL';
	    push @{$self->{'input_errors'}}, "source server crc ($self->{'source_server_crc'}) and dest server crc ($self->{'dest_server_crc'}) differ)";
	}
    }

    if ($self->{'server_crc'} ne '00000000:0' and
	$self->{'source_server_crc'} ne '00000000:0' and
	$self->{'server_crc'} ne $self->{'source_server_crc'}) {
	if ($params{'result'} eq 'DONE') {
	    $params{'result'} = 'PARTIAL';
	    push @{$self->{'input_errors'}}, "server crc ($self->{'server_crc'}) and source server crc ($self->{'source_server_crc'}) differ)";
	}
    }
    if ($self->{'server_crc'} eq '00000000:0') {
	$self->{'server_crc'} = $self->{'source_server_crc'};
    }

    if ($params{'result'} eq 'DONE') {
	if ($self->{'dumper_status'} eq "DONE") {
	    $msgtype = Amanda::Taper::Protocol::DONE;
	    $logtype = $L_DONE;
	} else {
	    $msgtype = Amanda::Taper::Protocol::DONE;
	    $logtype = $L_PARTIAL;
	}
    } elsif ($params{'result'} eq 'PARTIAL') {
	$msgtype = Amanda::Taper::Protocol::PARTIAL;
	$logtype = $L_PARTIAL;
    } elsif ($params{'result'} eq 'FAILED') {
	$msgtype = Amanda::Taper::Protocol::FAILED;
	$logtype = $L_FAIL;
    }

    if (!defined $params{'device_errors'}) {
	$params{'device_errors'} = [];
    }
    # note that we use total_duration here, which is the total time between
    # start_dump and dump_cb, so the kps generated here is much less than the
    # actual tape write speed.  Think of this as the *taper* speed, rather than
    # the *tape* speed.
    my $stats = make_stats($params{'size'}, $params{'total_duration'}, $self->{'orig_kb'});

    # consider this a config-derived failure only if there were no errors
    my $failure_from = (defined $params{'config_denial_message'})?  'config' : 'error';

    my @all_messages = (@{$params{'device_errors'}}, @{$self->{'input_errors'}});
    push @all_messages, $params{'config_denial_message'} if $params{'config_denial_message'};
    my $msg = quote_string(join("; ", @all_messages));

    # write a DONE/PARTIAL/FAIL log line
    if ($logtype == $L_FAIL) {
	my $format = "%s %s %s %s %s %s %s %s";
	$format = "VAULT %s %s %s %s %s %s %s %s" if $self->{'doing_vault'};
	log_add($L_FAIL, sprintf($format,
	    quote_string("ST:" . $self->{'controller'}->{'storage'}->{'storage_name'}),
	    quote_string("POOL:" . $self->{'controller'}->{'storage'}->{'tapepool'}),
	    quote_string($self->{'hostname'}.""), # " is required for SWIG..
	    quote_string($self->{'diskname'}.""),
	    $self->{'datestamp'},
	    $self->{'level'},
	    $failure_from,
	    $msg));
    } else {
	log_add($logtype, sprintf("%s %s %s %s %s %s %s %s %s %s %s%s",
	    quote_string("ST:" . $self->{'controller'}->{'storage'}->{'storage_name'}),
	    quote_string("POOL:" . $self->{'controller'}->{'storage'}->{'tapepool'}),
	    quote_string($self->{'hostname'}.""), # " is required for SWIG..
	    quote_string($self->{'diskname'}.""),
	    $self->{'datestamp'},
	    $params{'nparts'},
	    $self->{'level'},
	    $self->{'native_crc'},
	    $self->{'client_crc'},
	    $self->{'server_crc'},
	    $stats,
	    ($logtype == $L_PARTIAL and @all_messages)? " $msg" : ""));
    }

    # and send a message back to the driver
    my %msg_params = (
	handle => $self->{'handle'},
    );

    $msg_params{'server_crc'} = $self->{'dest_server_crc'};
    # reflect errors in our own elements in INPUT-ERROR or INPUT-GOOD
    if (@{$self->{'input_errors'}}) {
	$msg_params{'input'} = 'INPUT-ERROR';
	$msg_params{'inputerr'} = join("; ", @{$self->{'input_errors'}});
    } else {
	$msg_params{'input'} = 'INPUT-GOOD';
	$msg_params{'inputerr'} = '';
    }

    # and errors from the scribe in TAPE-ERROR or TAPE-GOOD
    if (@{$params{'device_errors'}}) {
	$msg_params{'taper'} = 'TAPE-ERROR';
	$msg_params{'tapererr'} = join("; ", @{$params{'device_errors'}});
    } elsif ($params{'config_denial_message'}) {
	$msg_params{'taper'} = 'TAPE-CONFIG';
	$msg_params{'tapererr'} = $params{'config_denial_message'};
    } else {
	$msg_params{'taper'} = 'TAPE-GOOD';
	$msg_params{'tapererr'} = '';
    }

    if ($msgtype ne Amanda::Taper::Protocol::FAILED) {
	$msg_params{'stats'} = $stats;
    }
    $msg_params{'worker_name'} = $self->{'worker_name'};

    # reset things to 'idle' before sending the message
    $self->{'xfer'} = undef;
    $self->{'xfer_source'} = undef;
    $self->{'xfer_dest'} = undef;
    $self->{'handle'} = undef;
    $self->{'hostname'} = undef;
    $self->{'diskname'} = undef;
    $self->{'datestamp'} = undef;
    $self->{'level'} = undef;
    $self->{'header'} = undef;
    $self->{'state'} = 'idle';
    delete $self->{'result'};
    delete $self->{'dumper_status'};
    delete $self->{'dump_params'};

    $self->{'controller'}->{'proto'}->send($msgtype, %msg_params);
    if ($self->{timer}) {
	$self->{timer}->remove();
	undef $self->{timer};
	$self->{status_fh}->close();
	undef $self->{status_fh};
	unlink($self->{status_filename});
	undef $self->{status_filename};
    }
}


##
# Scribe feedback

sub request_volume_permission {
    my $self = shift;
    my %params = @_;

    $self->{'perm_cb'} = $params{'perm_cb'};
    # and send the request to the driver
    $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::REQUEST_NEW_TAPE,
	worker_name => $self->{'worker_name'},
	handle => $self->{'handle'});
}

sub scribe_notif_new_tape {
    my $self = shift;
    my %params = @_;

    # TODO: if $params{error} is set, report it back to the driver
    # (this will be a change to the protocol)
    log_add($L_INFO, "$params{'error'}") if defined $params{'error'};

    if ($params{'volume_label'} && !$params{'error'}) {
	$self->{'label'} = $params{'volume_label'};

	# add to the trace log
	log_add($L_START, sprintf("datestamp %s %s %s label %s tape %s",
		$self->{'timestamp'},
		quote_string("ST:" . $self->{'controller'}->{'storage'}->{'storage_name'}),
		quote_string("POOL:" . $self->{'controller'}->{'storage'}->{'tapepool'}),
		quote_string($self->{'label'}),
		++$tape_num));

	# and the amdump log
	print STDERR "taper: wrote label '$self->{label}'\n";

	# and inform the driver
	$self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NEW_TAPE,
	    worker_name => $self->{'worker_name'},
	    handle => $self->{'handle'},
	    label => $params{'volume_label'});
    } else {
	$self->{'label'} = undef;

	$self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NO_NEW_TAPE,
	    worker_name => $self->{'worker_name'},
	    handle => $self->{'handle'});
    }
}

sub scribe_ready {
    my $self = shift;
    my %params = @_;

    $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::READY,
	worker_name => $self->{'worker_name'},
	handle => $self->{'handle'});
}

sub scribe_notif_part_done {
    my $self = shift;
    my %params = @_;

    $self->_assert_in_state("writing") or return;

    my $stats = make_stats($params{'size'}, $params{'duration'}, $self->{'orig_kb'});

    # log the part, using PART or PARTPARTIAL
    my $logbase = sprintf("%s %s %s %s %s %s %s %s/%s %s %s",
	quote_string("ST:" . $self->{'controller'}->{'storage'}->{'storage_name'}),
	quote_string("POOL:" . $self->{'controller'}->{'storage'}->{'tapepool'}),
	quote_string($self->{'label'}),
	$params{'fileno'},
	quote_string($self->{'header'}->{'name'}.""), # " is required for SWIG..
	quote_string($self->{'header'}->{'disk'}.""),
	$self->{'datestamp'},
	$params{'partnum'}, -1, # totalparts is always -1
	$self->{'level'},
	$stats);
    if ($params{'successful'}) {
	log_add($L_PART, $logbase);
    } else {
	log_add($L_PARTPARTIAL, "$logbase \"No space left on device\"");
    }

    # only send a PARTDONE if it was successful
    if ($params{'successful'}) {
	$self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PARTDONE,
	    worker_name => $self->{'worker_name'},
	    handle => $self->{'handle'},
	    label => $self->{'label'},
	    fileno => $params{'fileno'},
	    stats => $stats,
	    kb => $params{'size'} / 1024);
    }
}

sub scribe_notif_log_info {
    my $self = shift;
    my %params = @_;

    debug("$params{'message'}");
    log_add($L_INFO, "$params{'message'}");
}

##
# recovery feedback

sub recovery_clerk_notif_open_volume {
    my $self = shift;
    my %params = @_;

    $self->{'controller'}->{'proto'}->send(
	Amanda::Taper::Protocol::OPENED_SOURCE_VOLUME,
	worker_name => $self->{'worker_name'},
	handle => $self->{'handle'},
	label => $params{'label'}
    );
}

sub recovery_clerk_notif_close_volume {
    my $self = shift;
    my %params = @_;

    $self->{'controller'}->{'proto'}->send(
	Amanda::Taper::Protocol::CLOSED_SOURCE_VOLUME,
	worker_name => $self->{'worker_name'},
	handle => $self->{'handle'},
	label => $params{'label'}
    );
}

##
# Utilities

sub _assert_in_state {
    my $self = shift;
    my ($state) = @_;
    if ($self->{'state'} eq $state) {
	return 1;
    } else {
	$self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::BAD_COMMAND,
	    message => "command not appropriate in state '$self->{state}' : '$state'");
	return 0;
    }
}

sub create_status_file {
    my $self = shift;

    # create temporary file
    ($self->{status_fh}, $self->{status_filename}) =
	File::Temp::tempfile("taper_status_file_XXXXXX",
				DIR => $Amanda::Paths::AMANDA_TMPDIR,
				UNLINK => 1);

    # tell amstatus about it by writing it to the dump log
    my $qdisk = Amanda::Util::quote_string($self->{'diskname'});
    my $qhost = Amanda::Util::quote_string($self->{'hostname'});
    print STDERR "taper: status file $self->{'taper_name'} $self->{'worker_name'} $qhost $qdisk:" .
		    "$self->{status_filename}\n";
    print {$self->{status_fh}} "0";

    # create timer callback, firing every 5s (=5000msec)
    $self->{timer} = Amanda::MainLoop::timeout_source(5000);
    $self->{timer}->set_callback(sub {
	my $size = $self->{scribe}->get_bytes_written();
	seek $self->{status_fh}, 0, 0;
	print {$self->{status_fh}} $size;
	truncate $self->{status_fh}, length($size);
	$self->{status_fh}->flush();
    });
}

sub send_port_and_get_header {
    my $self = shift;
    my ($finished_cb) = @_;

    my ($xsrc, $xdst);
    my $errmsg;

    my $steps = define_steps
	cb_ref => \$finished_cb;

    step start => sub {
	if ($self->{'doing_port_write'}) {
            $steps->{'send_port'}->();
        } else {
            $steps->{'send_shm_name'}->();
        }
    };

    step send_shm_name => sub {
	# get the shm_name
	my $shm_name = $self->{'xfer_source'}->get_shm_name();

	if (!$shm_name) {
	    if (@{$self->{'scribe'}->{'device_errors'}}) {
		$self->{'scribe'}->abort_setup(dump_cb => $self->{'dump_cb'});
		return;
	    }
	    #An XMSG_ERROR will call the dump_cb
	    $self->{'scribe'}->set_dump_cb(dump_cb => $self->{'dump_cb'});
	    return;
	}

	# and set up an xfer for the header, too, using DirectTCP as an easy
	# way to implement a listen/accept/read process.  Note that this does
	# not enforce a maximum size, so this portion of Amanda at least can
	# handle any size header
	($xsrc, $xdst) = (
	    Amanda::Xfer::Source::DirectTCPListen->new(),
	    Amanda::Xfer::Dest::Buffer->new(0));
	$self->{'header_xfer'} = Amanda::Xfer->new([$xsrc, $xdst]);
	$self->{'header_xfer'}->start($steps->{'header_xfer_xmsg_cb'});

	my $header_addrs = $xsrc->get_addrs();
	my $header_port = $header_addrs->[0][1];

	# and tell the driver which ports we're listening on
	$self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::SHM_NAME,
	    worker_name => $self->{'worker_name'},
	    handle => $self->{'handle'},
	    port => $header_port,
	    shm_name => $shm_name);
    };

    step send_port => sub {
	# get the ip:port pairs for the data connection from the data xfer source,
	# which should be an Amanda::Xfer::Source::DirectTCPListen
	my $data_addrs = $self->{'xfer_source'}->get_addrs();

	if (!$data_addrs || @$data_addrs == 0) {
	    if (@{$self->{'scribe'}->{'device_errors'}}) {
		$self->{'scribe'}->abort_setup(dump_cb => $self->{'dump_cb'});
		return;
	    }
	    #An XMSG_ERROR will call the dump_cb
	    $self->{'scribe'}->set_dump_cb(dump_cb => $self->{'dump_cb'});
	    return;
	}

	$data_addrs = join ";", map { $_->[0] . ':' . $_->[1] } @$data_addrs;

	# and set up an xfer for the header, too, using DirectTCP as an easy
	# way to implement a listen/accept/read process.  Note that this does
	# not enforce a maximum size, so this portion of Amanda at least can
	# handle any size header
	($xsrc, $xdst) = (
	    Amanda::Xfer::Source::DirectTCPListen->new(),
	    Amanda::Xfer::Dest::Buffer->new(0));
	$self->{'header_xfer'} = Amanda::Xfer->new([$xsrc, $xdst]);
	$self->{'header_xfer'}->start($steps->{'header_xfer_xmsg_cb'});

	my $header_addrs = $xsrc->get_addrs();
	my $header_port = $header_addrs->[0][1];

	# and tell the driver which ports we're listening on
	$self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PORT,
	    worker_name => $self->{'worker_name'},
	    handle => $self->{'handle'},
	    port => $header_port,
	    ipports => $data_addrs);
    };

    step header_xfer_xmsg_cb => sub {
	my ($src, $xmsg, $xfer) = @_;
	if ($xmsg->{'type'} == $XMSG_INFO) {
	    info($xmsg->{'message'});
	} elsif ($xmsg->{'type'} == $XMSG_ERROR) {
	    $errmsg = $xmsg->{'message'};
	} elsif ($xmsg->{'type'} == $XMSG_DONE) {
	    if ($errmsg) {
		$finished_cb->($errmsg);
	    } else {
		$steps->{'got_header'}->();
	    }
	}
    };

    step got_header => sub {
	my $hdr_buf = $xdst->get();

	# close stuff up
	$self->{'header_xfer'} = $xsrc = $xdst = undef;

	if (!defined $hdr_buf) {
	    return $finished_cb->("Got empty header");
	}

	# parse the header, finally!
	$self->{'header'} = Amanda::Header->from_string($hdr_buf);
	$self->{'native_crc'} = $self->{'header'}->{'native_crc'};
	$self->{'client_crc'} = $self->{'header'}->{'client_crc'};
	$self->{'server_crc'} = $self->{'header'}->{'server_crc'};

	Amanda::Debug::debug("header native_crc: $self->{'native_crc'}");
	Amanda::Debug::debug("header client_crc: $self->{'client_crc'}");
	Amanda::Debug::debug("header server_crc: $self->{'server_crc'}");

	$finished_cb->(undef);
    };
}

# do the work of starting a new xfer; this contains the code common to
# msg_PORT_WRITE and msg_FILE_WRITE.
sub setup_and_start_dump {
    my $self = shift;
    my ($msgtype, %params) = @_;
    my %splitting_args;
    my %get_xfer_dest_args;
    my @dumper_cb_result;
    my $wait_for_recovery_cb;

    $self->{'dump_cb'} = $params{'dump_cb'};

    # setting up the dump is a bit complex, due to the requirements of
    # a directtcp port_write.  This function:
    # 1. creates and starts a transfer (make_xfer)
    # 2. gets the header
    # 3. calls the scribe's start_dump method with the new header

    my $steps = define_steps
	cb_ref => \$params{'dump_cb'};

    step setup => sub {
	$self->{'handle'} = $params{'handle'};
	$self->{'src_storage'} = $params{'src_storage'};
	$self->{'src_pool'} = $params{'src_pool'};
	$self->{'src_label'} = $params{'src_label'};
	$self->{'hostname'} = $params{'hostname'};
	$self->{'diskname'} = $params{'diskname'};
	$self->{'datestamp'} = $params{'datestamp'};
	$self->{'level'} = $params{'level'};
	$self->{'header'} = undef; # no header yet
	$self->{'orig_kb'} = $params{'orig_kb'};
	$self->{'native_crc'} = undef;
	$self->{'client_crc'} = undef;
	$self->{'server_crc'} = undef;
	$self->{'source_server_crc'} = undef;
	$self->{'dest_server_crc'} = undef;
	$self->{'input_errors'} = [];

	$steps->{'process_args'}->();
    };

    step process_args => sub {
	# extract the splitting-related parameters, stripping out empty strings
	%splitting_args = map {
	    (defined $params{$_} && $params{$_} ne '')? ($_, $params{$_}) : ()
	} qw(
	    dle_tape_splitsize dle_split_diskbuffer dle_fallback_splitsize dle_allow_split
	    part_size part_cache_type part_cache_dir part_cache_max_size data_path
	);

	# convert numeric values to BigInts
	for (qw(dle_tape_splitsize dle_fallback_splitsize part_size part_cache_max_size)) {
	    $splitting_args{$_} = Math::BigInt->new($splitting_args{$_})
		if (exists $splitting_args{$_});
	}

	if ($msgtype eq Amanda::Taper::Protocol::VAULT_WRITE) {

	    my $src = $self->{'src'};

	    if (defined $src and defined $src->{'storage'} and
		$src->{'storage'}->{'storage_name'} ne $self->{'src_storage'}) {
		return $src->{'clerk'}->quit(finished_cb => $steps->{'quit_clerk_finished'});
	    } else {
		return $steps->{'get_src_clerk'}->();
	    }
	}
	$self->{'scribe'}->wait_device(finished_cb => $steps->{'got_device'});
    };

    step quit_clerk_finished => sub {
	Amanda::Debug::debug("quit_clerk_finished");
        $self->{'src'}->{'storage'}->quit();
        delete $self->{'src'}->{'clerk'};
        delete $self->{'src'}->{'scan'};
        delete $self->{'src'}->{'storage'};
        delete $self->{'src'}->{'chg'};
	$steps->{'get_src_clerk'}->();
    };

    step get_src_clerk => sub {

	my $src;
	if (defined $self->{'src'}) {
	    $src = $self->{'src'};
	} else {
	    $src = $self->{'src'} = {};
	}
	my $chg;
	if (!defined $src->{'clerk'}) {
	    my $tlf = Amanda::Config::config_dir_relative(getconf($CNF_TAPELIST));
	    my ($tl, $message) = Amanda::Tapelist->new($tlf);
	    if (defined $message) {
		return $self->failure($message);
	    }

	    my $storage = $self->{'storages'}->{$self->{'src_storage'}};
	    if (!$storage) {
		$storage = Amanda::Storage->new(
				storage_name => $self->{'src_storage'},
				tapelist => $tl);
		return $self->failure($storage)
		    if $storage->isa("Amanda::Changer::Error");
		$self->{'storages'}->{$self->{'src_storage'}} = $storage;
	    }

	    $chg = $storage->{'chg'};
	    if ($chg->isa('Amanda::Changer::Error')) {
		$storage->quit();
		return $self->failure($chg);
	    }

	    $src->{'storage'} = $storage;
	    $src->{'chg'} = $chg;

	    $src->{'scan'} = Amanda::Recovery::Scan->new(chg => $chg);

	    $src->{'clerk'} = Amanda::Recovery::Clerk->new(
		changer => $chg,
		feedback => $self,
		scan    => $src->{'scan'});
	}

	my @dumpspecs;
	push @dumpspecs => Amanda::Cmdline::dumpspec_t->new(
					$self->{'hostname'},
					$self->{'diskname'},
					$self->{'datestamp'},
					$self->{'level'},
					undef);
	my @storage_list = ( $self->{'src_storage'} );
	Amanda::Recovery::Planner::make_plan(
			hostname => $self->{'hostname'},
			diskname => $self->{'diskname'},
			dump_timestamp => $self->{'datestamp'},
			level => $self->{'level'},
			changer => $chg,
			storage_list => \@storage_list,
			only_in_storage => 1,
			status => 'OK',
			plan_cb => sub { $steps->{'plan_cb'}->(@_) });
	return;
    };

    step plan_cb => sub {
	my ($err, $plan) = @_;

	if ($err) {
	    return $params{'dump_cb'}->(
		result => "FAILED",
		device_errors => [ 'error', "$err" ],
		size => 0,
		duration => 0.0,
		total_duration => 0);
	}

	return $params{'dump_cb'}->(
                result => "FAILED",
                device_errors => [ 'error', "Dump not found" ],
                size => 0,
                duration => 0.0,
                total_duration => 0) if (!@{$plan->{'dumps'}});

	$self->{'src'}->{'plan'} = $plan;

	# we've started the xfer now, but the destination won't actually write
	# any data until we call start_dump.  And we'll need a device for that.
	$self->{'scribe'}->wait_device(finished_cb => $steps->{'got_device'});
    };

    step got_device => sub {
	my ($err) = @_;

	if ($err) {
	    return $params{'dump_cb'}->(
		result => "FAILED",
		device_errors => [ 'error', "got_device failed: $err" ],
		size => 0,
		duration => 0.0,
		total_duration => 0);
	}

	my $device = $self->{'scribe'}->get_device();
	if (!defined $device) {
	    confess "no device is available to create an xfer_dest";
	}

	if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE &&
	    (my $err = $self->{'scribe'}->check_data_path($params{'data_path'}))) {
	    return $params{'dump_cb'}->(
		result => "FAILED",
		device_errors => [ 'error', "$err" ],
		size => 0,
		duration => 0.0,
		total_duration => 0);
	}

	$splitting_args{'leom_supported'} = $device->property_get("leom");
	# and convert those to get_xfer_dest args
        %get_xfer_dest_args = get_splitting_args_from_config(
		%splitting_args);
	$get_xfer_dest_args{'max_memory'} = $self->{'controller'}->{'storage'}->{'device_output_buffer_size'};
	if (!$self->{'controller'}->{'storage'}->{'seen_device_output_buffer_size'}) {
	    my $block_size4 = $device->block_size * 4;
	    if ($block_size4 > $get_xfer_dest_args{'max_memory'}) {
		$get_xfer_dest_args{'max_memory'} = $block_size4;
	    }
	}
	$get_xfer_dest_args{'can_cache_inform'} = ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE and $get_xfer_dest_args{'allow_split'});

	# if we're unable to fulfill the user's splitting needs, we can still give
	# the dump a shot - but we'll warn them about the problem
	if ($get_xfer_dest_args{'warning'}) {
	    log_add($L_WARNING, sprintf("%s:%s: %s",
		    $params{'hostname'}, $params{'diskname'},
		    $get_xfer_dest_args{'warning'}));
	    delete $get_xfer_dest_args{'warning'};
	}

	$steps->{'make_xfer'}->();
    };

    step make_xfer => sub {
        $self->_assert_in_state("idle") or return;
        $self->{'state'} = 'making_xfer';

	if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE) {
	    $self->{'xfer_source'} = Amanda::Xfer::Source::DirectTCPListen->new();
	} elsif ($msgtype eq Amanda::Taper::Protocol::SHM_WRITE) {
	    $self->{'xfer_source'} = Amanda::Xfer::Source::ShmRing->new();
	} elsif ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE) {
	    $self->{'xfer_source'} = Amanda::Xfer::Source::Holding->new($params{'filename'});
	} elsif ($msgtype eq Amanda::Taper::Protocol::VAULT_WRITE) {
	    my $dump = $self->{'src'}->{'plan'}->shift_dump();
	    return $self->{'src'}->{'clerk'}->get_xfer_src(
		dump => $dump,
		xfer_src_cb => $steps->{'got_xfer_src'});
	} else {
	    die("Bad msgtype: $msgtype");
	}

	if ($msgtype eq Amanda::Taper::Protocol::VAULT_WRITE) {
	}
	$steps->{'make_xfer_2'}->();
    };

    step got_xfer_src => sub {
	my ($errors, $header, $xfer_src_, $directtcp_supported) = @_;

	if ($errors) {
	    push @{$self->{'input_errors'}}, @$errors;
	    return $params{'dump_cb'}->(
		result => "FAILED",
		size => 0,
		duration => 0.0,
		total_duration => 0);
	}

	$self->{'xfer_source'} = $xfer_src_;
	$self->{'header'} = $header;
	    $self->{'native_crc'} = $self->{'header'}->{'native_crc'};
	    $self->{'client_crc'} = $self->{'header'}->{'client_crc'};
	    $self->{'server_crc'} = $self->{'header'}->{'server_crc'};
	    Amanda::Debug::debug("header native_crc: $self->{'native_crc'}");
	    Amanda::Debug::debug("header client_crc: $self->{'client_crc'}");
	    Amanda::Debug::debug("header server_crc: $self->{'server_crc'}");

	    if ($self->{'header'}->{'is_partial'}) {
		$self->{'dumper_status'} = "FAILED";
	    } else {
		$self->{'dumper_status'} = "DONE";
	    }
	$steps->{'make_xfer_2'}->();
    };

    step make_xfer_2 => sub {
        $self->{'xfer_dest'} = $self->{'scribe'}->get_xfer_dest(%get_xfer_dest_args);

        $self->{'xfer'} = Amanda::Xfer->new([$self->{'xfer_source'}, $self->{'xfer_dest'}]);
        $self->{'xfer'}->start(sub {
	    my ($src, $msg, $xfer) = @_;

	    if ($msg->{'type'} == $XMSG_CRC) {
		if ($msg->{'elt'} == $self->{'xfer_source'}) {
		    $self->{'source_server_crc'} = $msg->{'crc'}.":".$msg->{'size'};
		} elsif ($msg->{'elt'} == $self->{'xfer_dest'}) {
		    $self->{'dest_server_crc'} = $msg->{'crc'}.":".$msg->{'size'};
		} else {
		}
	    }
	    if ($msgtype eq Amanda::Taper::Protocol::VAULT_WRITE) {
		$self->{'src'}->{'clerk'}->handle_xmsg($src, $msg, $xfer);
	    }
	    $self->{'scribe'}->handle_xmsg($src, $msg, $xfer);

	    # if this is an error message that's not from the scribe's element, then
	    # we'll need to keep track of it ourselves
	    if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $self->{'xfer_dest'}) {
		push @{$self->{'input_errors'}}, $msg->{'message'};
	    }
        });

	# we've found a device, but the destination won't actually write
	# any data until we call start_dump.  And we'll need a header for that.

	$steps->{'get_header'}->();
    };

    step get_header => sub {
        $self->_assert_in_state("making_xfer") or return;
        $self->{'state'} = 'getting_header';

	if ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE) {
	    # getting the header is easy for FILE-WRITE..
	    my $hdr = $self->{'header'} = Amanda::Holding::get_header($params{'filename'});

	    if (!defined $hdr || $hdr->{'type'} != $Amanda::Header::F_DUMPFILE) {
		confess("Could not read header from '$params{filename}'");
	    }

	    # strip out header fields we don't need
	    $hdr->{'cont_filename'} = '';

	    $self->{'native_crc'} = $self->{'header'}->{'native_crc'};
	    $self->{'client_crc'} = $self->{'header'}->{'client_crc'};
	    $self->{'server_crc'} = $self->{'header'}->{'server_crc'};
	    Amanda::Debug::debug("header native_crc: $self->{'native_crc'}");
	    Amanda::Debug::debug("header client_crc: $self->{'client_crc'}");
	    Amanda::Debug::debug("header server_crc: $self->{'server_crc'}");

	    if ($self->{'header'}->{'is_partial'}) {
		$self->{'dumper_status'} = "FAILED";
	    } else {
		$self->{'dumper_status'} = "DONE";
	    }

	    $self->{'xfer_source'}->start_recovery();
	    $steps->{'start_dump'}->(undef);
	} elsif ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE ||
		 $msgtype eq Amanda::Taper::Protocol::SHM_WRITE) {
	    # ..but quite a bit harder for PORT-WRITE; this method will send the
	    # proper PORT command, then read the header from the dumper and parse
	    # it, placing the result in $self->{'header'}
	    $self->send_port_and_get_header($steps->{'start_dump'});
	} else { # VAULT_WRITE
	    # already done
	    $steps->{'start_dump'}->(undef);
	}
    };

    step start_dump => sub {
	my ($err) = @_;

        $self->_assert_in_state("getting_header") or return;
        $self->{'state'} = 'writing';

	# abort if we already got a device_errors
	if (@{$self->{'scribe'}->{'device_errors'}}) {
	    $self->{'scribe'}->abort_setup(dump_cb => $params{'dump_cb'});
	    return;
	}
	# abort if we already got a input_errors
	if (@{$self->{'input_errors'}}) {
	    $self->{'scribe'}->abort_setup(dump_cb => $params{'dump_cb'});
	    return;
	}
        # if $err is set, cancel the dump, treating it as a input error
        if ($err) {
	    push @{$self->{'input_errors'}}, $err;
	    return $self->{'scribe'}->cancel_dump(
		xfer => $self->{'xfer'},
		dump_cb => $params{'dump_cb'});
        }

        # sanity check the header..
        my $hdr = $self->{'header'};
        if ($hdr->{'dumplevel'} != $params{'level'}
            or $hdr->{'name'} ne $params{'hostname'}
            or $hdr->{'disk'} ne $params{'diskname'}
	    or $hdr->{'datestamp'} ne $params{'datestamp'}) {
            confess("Header of dumpfile does not match command from driver $hdr->{'dumplevel'} $hdr->{'name'} $hdr->{'disk'} $hdr->{'datestamp'} ------ $params{'level'} $params{'hostname'} $params{'diskname'} $params{'datestamp'}");
        }

	# start producing status
	$self->create_status_file();

	# and fix it up before writing it
        $hdr->{'totalparts'} = -1;
        $hdr->{'type'} = $Amanda::Header::F_SPLIT_DUMPFILE;

	$wait_for_recovery_cb = 0;
	if ($msgtype eq Amanda::Taper::Protocol::VAULT_WRITE) {
	    $wait_for_recovery_cb = 1;
	    $self->{'src'}->{'clerk'}->start_recovery(
		xfer => $self->{'xfer'},
		recovery_cb => $steps->{'recovery_cb'});
	}
        $self->{'scribe'}->start_dump(
	    xfer => $self->{'xfer'},
            dump_header => $hdr,
	    dump_cb => $steps->{'dump_cb'});
            #dump_cb => $params{'dump_cb'});
    };

    step recovery_cb => sub {
	$wait_for_recovery_cb = 0;
	return if @dumper_cb_result == 0;
	Amanda::Debug::debug("quit_clerk_finished aa");
	$params{'dump_cb'}->(@dumper_cb_result);
    };

    step dump_cb => sub {
	my @paramsX = @_;
	@dumper_cb_result = @paramsX;
	if (!$wait_for_recovery_cb) {
	    $params{'dump_cb'}->(@dumper_cb_result);
	}
    };
}

sub dump_cb {
    my $self = shift;
    my %params = @_;

    $self->{'dump_params'} = \%params;
    $self->{'result'} = $params{'result'};

    # if we need to the dumper status (to differentiate a dropped network
    # connection from a normal EOF) and have not done so yet, then send a
    # DUMPER_STATUS message and re-call this method (dump_cb) with the result.
    if ($params{'result'} eq "DONE"
	    and ($self->{'doing_port_write'} || $self->{'doing_shm_write'})
	    and !exists $self->{'dumper_status'}) {
	my $controller = $self->{'controller'};
	my $proto = $controller->{'proto'};
	my $handle = $self->{'handle'};
	$proto->send(Amanda::Taper::Protocol::DUMPER_STATUS,
		worker_name => $self->{'worker_name'},
		handle => "$handle");
    } else {
	$self->result_cb();
    }
}

1;