# Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
# Copyright (c) 2013-2016 Carbonite, Inc. All Rights Reserved.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Contact information: Carbonite Inc., 756 N Pastoria Ave
# Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
use Test::More tests => 6;
use strict;
use warnings;
use lib '@amperldir@';
use Installcheck;
use Amanda::IPC::LineProtocol;
use IO::Handle;
use Amanda::MainLoop;
use Amanda::Debug;
use Data::Dumper;
use Carp;
##
# Define a test protocol
package TestProtocol;
use base "Amanda::IPC::LineProtocol";
use Amanda::IPC::LineProtocol;
use constant SIMPLE => message("SIMPLE",
format => [ qw( ) ],
);
use constant FOO => message("FOO",
format => [ qw( name? nicknames* ) ],
);
use constant FO => message("FO", # prefix of "FOO"
format => [ qw( ) ],
);
use constant ASSYM => message("ASSYM",
format => {
in => [ qw( a b ) ],
out => [ qw( x ) ],
},
);
use constant BAR => message("BAR",
match => qr/^BA[Rh]$/i, # more elaborate regex
format => [ qw( mandatory optional? ) ],
);
use constant QUIT => message("QUIT",
match => qr/^QUIT$/i,
on_eof => 1,
format => [ qw( reason? ) ],
);
package main;
# set up debugging so debug output doesn't interfere with test results
Amanda::Debug::dbopen("installcheck");
Installcheck::log_test_output();
# and disable Debug's die() and warn() overrides
Amanda::Debug::disable_die_override();
# run $code in a separate process, with read and write handles hooked up, and returns
# read and write handles.
sub in_fork {
my ($code) = @_;
my ($parent_read, $child_write) = POSIX::pipe();
my ($child_read, $parent_write) = POSIX::pipe();
my $pid = fork();
if (!defined($pid) or $pid < 0) {
die("Can't fork: $!");
}
if (!$pid) {
## child
# get our file-handle house in order
POSIX::close($parent_read);
POSIX::close($parent_write);
$code->(IO::Handle->new_from_fd($child_read, "r"),
IO::Handle->new_from_fd($child_write, "w"));
POSIX::exit(0);
}
## parent
POSIX::close($child_read);
POSIX::close($child_write);
return (IO::Handle->new_from_fd($parent_read, "r"),
IO::Handle->new_from_fd($parent_write, "w"),
$pid);
}
# generic "die" message_cb
my $message_cb = make_cb(message_cb => sub {
my ($msgtype, %params) = @_;
if (defined $msgtype) {
diag(Dumper(\%params));
die("unhandled message: $msgtype");
} else {
die("IPC error: $params{'error'}");
}
});
##
# Run some tests
my $proto;
my @events;
my ($rx_fh, $tx_fh, $pid);
# on QUIT, stop the protocol and quit the mainloop
my $quit_cb = make_cb(quit_cb => sub {
push @events, [ @_ ];
$proto->stop(finished_cb => sub {
Amanda::MainLoop::quit();
});
});
#
# test a simple "QUIT"
@events = ();
($rx_fh, $tx_fh, $pid) = in_fork(sub {
my ($rdh, $wrh) = @_;
$wrh->autoflush(1);
$rdh->getline(); # get 'start\n'
$wrh->write("QUIT \"just because\"");
});
$proto = TestProtocol->new(
rx_fh => $rx_fh, tx_fh => $tx_fh,
message_cb => $message_cb);
$proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
Amanda::MainLoop::call_later(sub {
$tx_fh->autoflush(1);
$tx_fh->write("start\n");
});
Amanda::MainLoop::run();
waitpid($pid, 0);
is_deeply([ @events ],
[
[ "QUIT", reason => "just because" ],
[ "QUIT" ],
],
"correct events for a simple 'QUIT \"just because\"")
or diag(Dumper(\@events));
##
# test a bogus message
@events = ();
($rx_fh, $tx_fh, $pid) = in_fork(sub {
my ($rdh, $wrh) = @_;
$wrh->autoflush(1);
$rdh->getline(); # get 'start\n'
$wrh->write("SNARSBLAT, yo");
});
$proto = TestProtocol->new(
rx_fh => $rx_fh, tx_fh => $tx_fh,
message_cb => sub { push @events, [ @_ ]; });
$proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
Amanda::MainLoop::call_later(sub {
$tx_fh->autoflush(1);
$tx_fh->write("start\n");
});
Amanda::MainLoop::run();
waitpid($pid, 0);
is_deeply([ @events ],
[
[ undef, 'error' => 'unknown command' ],
[ "QUIT" ], # from EOF
],
"bogus message handled correctly")
or diag(Dumper(\@events));
##
# a more complex conversation
@events = ();
($rx_fh, $tx_fh, $pid) = in_fork(sub {
my ($rdh, $wrh) = @_;
$wrh->autoflush(1);
$wrh->write("FOO\n");
$rdh->getline() =~ /SIMPLE/ or die("bad response");
$wrh->write("FOO one\n");
$rdh->getline() =~ /SIMPLE/ or die("bad response");
$wrh->write("FOO one \"t w o\"\n");
$rdh->getline() =~ /SIMPLE/ or die("bad response");
$wrh->write("FOO one \"t w o\" three\n");
$rdh->getline() =~ /SIMPLE/ or die("bad response");
});
$proto = TestProtocol->new(
rx_fh => $rx_fh, tx_fh => $tx_fh,
message_cb => $message_cb);
$proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
$proto->set_message_cb(TestProtocol::FOO, sub {
push @events, [ shift @_, { @_ } ];
$proto->send(TestProtocol::SIMPLE);
});
Amanda::MainLoop::run();
waitpid($pid, 0);
is_deeply([ @events ],
[
[ "FOO", { nicknames => [] } ],
[ "FOO", { nicknames => [], name => "one" } ],
[ "FOO", { nicknames => [ "t w o" ], name => "one" } ],
[ "FOO", { nicknames => [ "t w o", "three" ], name => "one" } ],
[ "QUIT" ],
],
"correct events for a few conversation steps, parsing")
or diag(Dumper(\@events));
##
# Asymmetrical formats
@events = ();
($rx_fh, $tx_fh, $pid) = in_fork(sub {
my ($rdh, $wrh) = @_;
$wrh->autoflush(1);
$wrh->write("ASSYM 1 2\n");
$rdh->getline() =~ /ASSYM a/ or die("bad response");
});
$proto = TestProtocol->new(
rx_fh => $rx_fh, tx_fh => $tx_fh,
message_cb => $message_cb);
$proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
$proto->set_message_cb(TestProtocol::ASSYM, sub {
push @events, [ shift @_, { @_ } ];
$proto->send(TestProtocol::ASSYM, x => "a");
});
Amanda::MainLoop::run();
waitpid($pid, 0);
is_deeply([ @events ],
[
[ "ASSYM", { a => "1", b => "2" } ],
[ "QUIT" ],
],
"correct events for asymmetric message format")
or diag(Dumper(\@events));
##
# test queueing up of messages on writing.
# The idea here is to write more than a pipe buffer can hold, while the child
# process does not read that data, and then to signal the child process,
# causing it to read all of that data, write a reply, and exit. Recent linuxes
# have a pipe buffer of 64k, so we exceed that threshold. We use an 'alarm' to
# fail in the case that this blocks.
my $NMSGS = 10000;
@events = ();
($rx_fh, $tx_fh, $pid) = in_fork(sub {
my ($rdh, $wrh) = @_;
$wrh->autoflush(1);
# on USR1, read lots of inputs
$SIG{'USR1'} = sub {
for (my $i = 0; $i < $NMSGS; $i++) {
$rdh->getline();
}
# send a message that the parent can hope to get
$wrh->write("BAR \"got your inputs\"\n");
# and bail out
POSIX::exit(0);
};
$wrh->write("SIMPLE\n");
# and sleep forever, or until killed.
while (1) { sleep(100); }
});
$proto = TestProtocol->new(
rx_fh => $rx_fh, tx_fh => $tx_fh,
message_cb => $message_cb);
$proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
$proto->set_message_cb(TestProtocol::SIMPLE, sub {
push @events, [ shift @_ ];
# send $NMSGS messages to the child, which isn't listening yet!
for (my $i = 0; $i < $NMSGS; $i++) {
$proto->send(TestProtocol::SIMPLE);
}
# and then send it SIGUSR1, so it reads those
kill USR1 => $pid;
});
$proto->set_message_cb(TestProtocol::BAR, sub {
push @events, [ shift @_, { @_ } ];
});
# die after 10 minutes
alarm 600;
Amanda::MainLoop::run();
waitpid($pid, 0);
alarm 0; # cancel the alarm
is_deeply([ @events ],
[
[ "SIMPLE" ],
[ "BAR", { mandatory => "got your inputs" } ],
[ "QUIT" ],
],
"write buffering handled correctly")
or diag(Dumper(\@events));
##
# test the message_obj functionality
package main::MessageObj;
sub msg_FOO {
my $self = shift;
push @{$self}, [ shift @_, { @_ } ];
$proto->send(TestProtocol::SIMPLE);
}
sub msg_BAR {
my $self = shift;
push @{$self}, [ shift @_, { @_ } ];
$proto->send(TestProtocol::SIMPLE);
}
package main;
@events = ();
($rx_fh, $tx_fh, $pid) = in_fork(sub {
my ($rdh, $wrh) = @_;
$wrh->autoflush(1);
$wrh->write("FOO\n");
$rdh->getline() =~ /SIMPLE/ or die("bad response");
$wrh->write("BAR one\n");
$rdh->getline() =~ /SIMPLE/ or die("bad response");
$wrh->write("BAH one \"t w o\"\n"); # note alternate spelling "BAH"
$rdh->getline() =~ /SIMPLE/ or die("bad response");
$wrh->write("FOO one \"t w o\" three\n");
$rdh->getline() =~ /SIMPLE/ or die("bad response");
});
$proto = TestProtocol->new(
rx_fh => $rx_fh, tx_fh => $tx_fh,
message_obj => bless(\@events, "main::MessageObj"));
$proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
Amanda::MainLoop::run();
waitpid($pid, 0);
is_deeply([ @events ],
[ [ 'FOO', { 'nicknames' => [] } ],
[ 'BAR', { 'mandatory' => 'one' } ],
[ 'BAR', { 'mandatory' => 'one', 'optional' => 't w o' } ],
[ 'FOO', { 'name' => 'one', 'nicknames' => [ 't w o', 'three' ] } ],
[ 'QUIT' ],
],
"message_obj works")
or diag(Dumper(\@events));