Blob Blame History Raw
# Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
# Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
#
# Contact information: Carbonite Inc., 756 N Pastoria Ave
# Sunnyvale, CA 94086, USA, or: http://www.zmanda.com

use Test::More tests => 25;
use strict;
use warnings;
use POSIX qw(WIFEXITED WEXITSTATUS EINTR );
use IO::Pipe;

use lib '@amperldir@';
use Amanda::MainLoop qw( :GIOCondition make_cb define_steps step );
use Amanda::Util;
use Installcheck;
use Amanda::Debug;

Amanda::Debug::dbopen("installcheck");
Installcheck::log_test_output();

{
    my $global = 0;

    my $to = Amanda::MainLoop::timeout_source(200);
    $to->set_callback(sub { 
	# ignore $src argument
	if (++$global >= 3) {
	    $to->remove();
	    Amanda::MainLoop::quit();
	}
    });

    Amanda::MainLoop::run();
    is($global, 3, "Timeout source works, calls back repeatedly (using a closure)");
}

{
    my $global = 0;

    my $to = Amanda::MainLoop::timeout_source(200);
    $to->set_callback(sub { 
	my ($src) = @_;
	if (++$global >= 3) {
	    $src->remove();
	    Amanda::MainLoop::quit();
	}
    });
    $to = undef; # remove the lexical reference to the source

    Amanda::MainLoop::run();
    is($global, 3, "Timeout source works, calls back repeatedly (no external reference to the source)");
}

{
    my $global = 0;

    my $id = Amanda::MainLoop::idle_source(5);
    $id->set_callback(sub { 
	my ($src) = @_;
	if (++$global >= 30) {
	    $src->remove();
	    Amanda::MainLoop::quit();
	}
    });

    Amanda::MainLoop::run();
    is($global, 30, "Idle source works, calls back repeatedly");
    $id->remove();
}

{
    my $global = 0;

    # to1 is removed before it runs, so it should never
    # execute its callback
    my $to1 = Amanda::MainLoop::timeout_source(10);
    $to1->set_callback(sub { ++$global; });
    $to1->remove();

    my $to2 = Amanda::MainLoop::timeout_source(300);
    $to2->set_callback(sub { Amanda::MainLoop::quit(); });

    Amanda::MainLoop::run();
    is($global, 0, "A remove()d source doesn't call back");

    $to2->remove();
}

{
    my $global = 0;

    my $pid = fork();
    if ($pid == 0) {
	## child
	sleep(1);
	exit(9);
    }

    ## parent

    my $cw = Amanda::MainLoop::child_watch_source($pid);
    $cw->set_callback(sub {
	my ($src, $got_pid, $got_status) = @_;
	$src->remove();

	if ($got_pid != $pid) {
	    diag("Got pid $got_pid, but expected $pid");
	    return;
	}
	if (!WIFEXITED($got_status)) {
	    diag("Didn't get an 'exited' status");
	    return;
	}
	if (WEXITSTATUS($got_status) != 9) {
	    diag("Didn't get exit status 9");
	    return;
	}
	$global = 1;
	Amanda::MainLoop::quit();
    });

    my $to = Amanda::MainLoop::timeout_source(20000);
    $to->set_callback(sub {
	my ($src) = @_;
	$global = 7;

	$src->remove();
	Amanda::MainLoop::quit();
    });

    Amanda::MainLoop::run();
    is($global, 1, "Child watch detects a dead child");

    $cw->remove();
    $to->remove();
}

{
    my $global = 0;

    my $pid = fork();
    if ($pid == 0) {
	## child
	exit(11);
    }

    ## parent

    sleep(1);
    my $cw = Amanda::MainLoop::child_watch_source($pid);
    $cw->set_callback(sub {
	my ($src, $got_pid, $got_status) = @_;
	$src->remove();

	if ($got_pid != $pid) {
	    diag("Got pid $got_pid, but expected $pid");
	    return;
	}
	if (!WIFEXITED($got_status)) {
	    diag("Didn't get an 'exited' status");
	    return;
	}
	if (WEXITSTATUS($got_status) != 11) {
	    diag("Didn't get exit status 11");
	    return;
	}
	$global = "ok";
	Amanda::MainLoop::quit();
    });

    my $to = Amanda::MainLoop::timeout_source(20000);
    $to->set_callback(sub { $global = "timeout"; Amanda::MainLoop::quit(); });

    Amanda::MainLoop::run();
    is($global, "ok", "Child watch detects a dead child that dies before the callback is set");

    $cw->remove();
    $to->remove();
}

{
    my $global = 0;
    my ($readinfd, $writeinfd) = POSIX::pipe();
    my ($readoutfd, $writeoutfd) = POSIX::pipe();

    my $pid = fork();
    if ($pid == 0) {
	## child

	my $data;

	POSIX::close($readinfd);
	POSIX::close($writeoutfd);

	# the read()s here are to synchronize with our parent; the
	# results are ignored.
	POSIX::read($readoutfd, $data, 1024);
	POSIX::write($writeinfd, "HELLO\n", 6);
	POSIX::read($readoutfd, $data, 1024);
	POSIX::write($writeinfd, "WORLD\n", 6);
	POSIX::read($readoutfd, $data, 1024);
	exit(33);
    }

    ## parent

    POSIX::close($writeinfd);
    POSIX::close($readoutfd);

    my @events;

    my $to = Amanda::MainLoop::timeout_source(200);
    my $times = 3;
    $to->set_callback(sub {
	push @events, "time";
	POSIX::write($writeoutfd, "A", 1); # wake up the child
	if (--$times == 0) {
	    $to->remove();
	}
    });

    my $cw = Amanda::MainLoop::child_watch_source($pid);
    $cw->set_callback(sub {
	my ($src, $got_pid, $got_status) = @_;
	$cw->remove();
	push @events, "died";

	Amanda::MainLoop::quit();
    });

    my $fd = Amanda::MainLoop::fd_source($readinfd, $G_IO_IN | $G_IO_HUP);
    $fd->set_callback(sub {
	my $str;
	if (POSIX::read($readinfd, $str, 1024) == 0) {
	    # EOF
	    POSIX::close($readinfd);
	    POSIX::close($writeoutfd);
	    $fd->remove();
	    return;
	}
	chomp $str;
	push @events, "read $str";
    });

    Amanda::MainLoop::run();
    $to->remove();
    $cw->remove();
    $fd->remove();

    is_deeply([ @events ],
	[ "time", "read HELLO", "time", "read WORLD", "time", "died" ],
	"fd source works for reading from a file descriptor");
}

# see if a "looping" callback with some closure values works.  This test teased
# out some memory corruption bugs once upon a time.

{
    my $completed = 0;
    sub loop {
	my ($finished_cb) = @_;
	my $time = 700;
	my $to;

	my $cb;
	$cb = sub {
	    $time -= 300;
	    $to->remove();
	    if ($time <= 0) {
		$finished_cb->();
	    } else {
		$to = Amanda::MainLoop::timeout_source($time);
		$to->set_callback($cb);
	    }
	};
	$to = Amanda::MainLoop::timeout_source($time);
	$to->set_callback($cb);
    };
    loop(sub {
	$completed = 1;
	Amanda::MainLoop::quit();
    });
    Amanda::MainLoop::run();
    is($completed, 1, "looping construct terminates with a callback");
}

# Make sure that a die() in a callback correctly kills the process.  Such
# a die() skips the usual Perl handling, so an eval { } won't do -- we have
# to fork a child.
{
    my $global = 0;
    my ($readfd, $writefd) = POSIX::pipe();

    my $pid = fork();
    if ($pid == 0) {
	## child

	my $data;

	# fix up the file descriptors to hook fd 2 (stderr) to
	# the pipe
	POSIX::close($readfd);
	POSIX::dup2($writefd, 2);
	POSIX::close($writefd);

	# and now die in a callback, using an eval {} in case the
	# exception propagates out of the MainLoop run()
	my $src = Amanda::MainLoop::timeout_source(10);
	$src->set_callback(sub { die("Oh, the humanity"); });
	eval { Amanda::MainLoop::run(); };
	exit(33);
    }

    ## parent

    POSIX::close($writefd);

    # read from the child and wait for it to die.  There's no
    # need to use MainLoop here.
    my $str;
    while (!defined(POSIX::read($readfd, $str, 1024))) {
	# we may be interrupted by a SIGCHLD; keep going
	next if ($! == EINTR);
	die ("POSIX::read failed: $!");
    }
    POSIX::close($readfd);
    waitpid($pid, 0);

    ok($? != 33 && $? != 0, "die() in a callback exits with an error condition");
    like($str, qr/Oh, the humanity/, "..and displays die message on stderr");
}

# test misc. management of sources.  Ideally it won't crash :)

my $src = Amanda::MainLoop::idle_source(1);
$src->set_callback(sub { 1; });
$src->set_callback(sub { 1; });
$src->set_callback(sub { 1; });
pass("Can call set_callback a few times on the same source");

$src->remove();
$src->remove();
pass("Calling remove twice is ok");

# call_later

{
    my ($cb1, $cb2);
    my $gothere = 0;

    $cb1 = sub {
	my ($a, $b) = @_;
	ok(Amanda::MainLoop::is_running(),
	    "call_later waits until mainloop runs");
	is($a+$b, 10,
	    "call_later passes arguments correctly");
	Amanda::MainLoop::call_later($cb2);
	Amanda::MainLoop::quit();
    };

    $cb2 = sub {
	$gothere = 1;
    };

    ok(!Amanda::MainLoop::is_running(), "main loop is correctly recognized as not running");
    Amanda::MainLoop::call_later($cb1, 7, 3);
    Amanda::MainLoop::run();
    ok($gothere, "call_later while already running calls immediately");

    my @actions = ();

    $cb1 = sub {
        push @actions, "cb1 start";
	Amanda::MainLoop::call_later($cb2, "hello");
        push @actions, "cb1 end";
    };

    $cb2 = sub {
	my ($greeting) = @_;

        push @actions, "cb2 start $greeting";
        push @actions, "cb2 end";
	Amanda::MainLoop::quit();
    };

    Amanda::MainLoop::call_later($cb1);
    Amanda::MainLoop::run();
    is_deeply([ @actions ],
              [ "cb1 start", "cb1 end", "cb2 start hello", "cb2 end" ],
              "call_later doesn't call its argument immediately");

    my @calls;
    Amanda::MainLoop::call_later(sub { push @calls, "call1"; });
    Amanda::MainLoop::call_later(sub { push @calls, "call2"; });
    Amanda::MainLoop::call_later(sub { Amanda::MainLoop::quit(); });
    Amanda::MainLoop::run();
    is_deeply([ @calls ],
	      [ "call1", "call2" ],
	      "call_later preserves the order of its invocations");
}

# call_after

{
    # note: gettimeofday is in usec, but call_after is in msec

    my ($start, $end) = (Amanda::Util::gettimeofday(), undef );
    Amanda::MainLoop::call_after(100, sub {
	my ($a, $b) = @_;
	is($a+$b, 10, "call_after passes arguments correctly");
	$end = Amanda::Util::gettimeofday();
	Amanda::MainLoop::quit();
    }, 2, 8);
    Amanda::MainLoop::run();

    ok(($end - $start)/1000 > 75,
	"call_after makes callbacks in the correct order")
	or diag("only " . (($end - $start)/1000) . "msec elapsed");
}

# async_read

{
    my $global = 0;
    my $inpipe = IO::Pipe->new();
    my $outpipe = IO::Pipe->new();
    my @events;

    my $pid = fork();
    if ($pid == 0) {
	## child

	my $data;

	$inpipe->writer();
	$inpipe->autoflush(1);
	$outpipe->reader();

	$inpipe->write("HELLO");
	$outpipe->read($data, 1);
	$inpipe->write("WORLD");
	exit(33);
    }

    ## parent

    $inpipe->reader();
    $inpipe->blocking(0);
    $outpipe->writer();
    $outpipe->blocking(0);
    $outpipe->autoflush(1);

    sub test_async_read {
	my ($finished_cb) = @_;

	my $steps = define_steps
	    cb_ref => \$finished_cb;

	step start => sub {
	    Amanda::MainLoop::async_read(
		fd => $inpipe->fileno(),
		size => 0,
		async_read_cb => $steps->{'read_hello'});
	};

	step read_hello => sub {
	    my ($err, $data) = @_;
	    die $err if $err;
	    push @events, "read1 '$data'";

	    $outpipe->write("A"); # wake up the child
	    Amanda::MainLoop::async_read(
		fd => $inpipe->fileno(),
		size => 5,
		async_read_cb => $steps->{'read_world'});
	};

	step read_world => sub {
	    my ($err, $data) = @_;
	    die $err if $err;
	    push @events, "read2 '$data'";

	    Amanda::MainLoop::async_read(
		fd => $inpipe->fileno(),
		size => 5,
		async_read_cb => $steps->{'read_eof'});
	};

	step read_eof => sub {
	    my ($err, $data) = @_;
	    die $err if $err;
	    push @events, "read3 '$data'";

	    Amanda::MainLoop::quit();
	};
    }

    test_async_read(sub { Amanda::MainLoop::quit(); });
    Amanda::MainLoop::run();
    waitpid($pid, 0);

    is_deeply([ @events ],
	[ "read1 'HELLO'", "read2 'WORLD'", "read3 ''" ],
	"async_read works for reading from a file descriptor");
}

{
    my $inpipe;
    my $outpipe;
    my $pid;
    my $thunk;
    my @events;

    sub test_async_read_harder {
	my ($finished_cb) = @_;

	my $steps = define_steps
	    cb_ref => \$finished_cb;

	step start => sub {
	    if (defined $pid) {
		waitpid($pid, 0);
		$pid = undef;
	    }

	    $inpipe = IO::Pipe->new();
	    $outpipe = IO::Pipe->new();

	    $pid = fork();
	    if ($pid == 0) {
		my $data;

		$inpipe->writer();
		$inpipe->autoflush(1);
		$outpipe->reader();

		while (1) {
		    $outpipe->read($data, 1);
		    last if ($data eq 'X');
		    if ($data eq 'W') {
			$inpipe->write("a" x 4096);
		    } else {
			$inpipe->write("GOT=$data");
		    }
		}

		exit(0);
	    }

	    # parent

	    $inpipe->reader();
	    $inpipe->blocking(0);
	    $outpipe->writer();
	    $outpipe->blocking(0);
	    $outpipe->autoflush(1);

	    # trigger two replies
	    $outpipe->write('A');
	    $outpipe->write('B');

	    # give the child time to write GOT=AGOT=B
	    Amanda::MainLoop::call_after(100, $steps->{'do_read_1'});
	};

	step do_read_1 => sub {
	    Amanda::MainLoop::async_read(
		fd => $inpipe->fileno(),
		size => 0, # 0 => all avail
		async_read_cb => $steps->{'done_read_1'},
		args => [ "x", "y" ]);
	};

	step done_read_1 => sub {
	    my ($err, $data, $x, $y) = @_;
	    die $err if $err;
	    push @events, $data;

	    # test the @args
	    is_deeply([$x, $y], ["x", "y"], "async_read's args key handled correctly");

	    $outpipe->write('C'); # should trigger a 'GOT=C' for done_read_2

	    $steps->{'do_read_2'}->();
	};

	step do_read_2 => sub {
	    Amanda::MainLoop::async_read(
		fd => $inpipe->fileno(),
		size => 5,
		async_read_cb => $steps->{'done_read_2'});
	};

	step done_read_2 => sub {
	    my ($err, $data) = @_;
	    die $err if $err;
	    push @events, $data;

	    # request a 4k write and then an EOF
	    $outpipe->write('W');
	    $outpipe->write('X');

	    $steps->{'do_read_block'}->();
	};

	step do_read_block => sub {
	    Amanda::MainLoop::async_read(
		fd => $inpipe->fileno(),
		size => 1000,
		async_read_cb => $steps->{'got_block'});
	};

	step got_block => sub {
	    my ($err, $data) = @_;
	    die $err if $err;
	    push @events, "block" . length($data);
	    if ($data ne '') {
		$steps->{'do_read_block'}->();
	    } else {
		$steps->{'done_reading_blocks'}->();
	    }
	};

	step done_reading_blocks => sub {
	    # one more read that should make an EOF
	    Amanda::MainLoop::async_read(
		fd => $inpipe->fileno(),
		# omit size this time -> default of 0
		async_read_cb => $steps->{'got_eof'});
	};

	step got_eof => sub {
	    my ($err, $data) = @_;
	    die $err if $err;
	    if ($data eq '') {
		push @events, "EOF";
	    }

	    $finished_cb->();
	};

	# note: not all operating systems (hi, Solaris) will generate
	# an error other than EOF on reading from a file descriptor
    }

    test_async_read_harder(sub { Amanda::MainLoop::quit(); });
    Amanda::MainLoop::run();
    waitpid($pid, 0) if defined($pid);

    is_deeply([ @events ],
	[ "GOT=AGOT=B", "GOT=C",
	  "block1000", "block1000", "block1000", "block1000", "block96", "block0",
	  "EOF", # got_eof
	], "more complex async_read");
}

# async_write

{
    my $inpipe = IO::Pipe->new();
    my $outpipe = IO::Pipe->new();
    my @events;
    my $pid;

    sub test_async_write {
	my ($finished_cb) = @_;

	my $steps = define_steps
	    cb_ref => \$finished_cb;

	step start => sub {
	    $pid = fork();
	    if ($pid == 0) {
		## child

		my $data;

		$inpipe->writer();
		$inpipe->autoflush(1);
		$outpipe->reader();

		while (1) {
		    $outpipe->sysread($data, 1024);
		    last if ($data eq "X");
		    $inpipe->write("$data");
		}
		exit(0);
	    }

	    ## parent

	    $inpipe->reader();
	    $inpipe->blocking(1);   # do blocking reads below, for simplicity
	    $outpipe->writer();
	    $outpipe->blocking(0);
	    $outpipe->autoflush(1);

	    Amanda::MainLoop::async_write(
		fd => $outpipe->fileno(),
		data => 'FUDGE',
		async_write_cb => $steps->{'wrote_fudge'});
	};

	step wrote_fudge => sub {
	    my ($err, $bytes) = @_;
	    die $err if $err;
	    push @events, "wrote $bytes";

	    my $buf;
	    $inpipe->read($buf, $bytes);
	    push @events, "read $buf";

	    $steps->{'double_write'}->();
	};

	step double_write => sub {
	    Amanda::MainLoop::async_write(
		fd => $outpipe->fileno(),
		data => 'ICECREAM',
		async_write_cb => $steps->{'wrote_icecream'});
	    Amanda::MainLoop::async_write(
		fd => $outpipe->fileno(),
		data => 'BROWNIES',
		async_write_cb => $steps->{'wrote_brownies'});
	};

	step wrote_icecream => sub {
	    my ($err, $bytes) = @_;
	    die $err if $err;
	    push @events, "wrote $bytes";

	    my $buf;
	    $inpipe->read($buf, $bytes);
	    push @events, "read $buf";
	};

	step wrote_brownies => sub {
	    my ($err, $bytes) = @_;
	    die $err if $err;
	    push @events, "wrote $bytes";

	    my $buf;
	    $inpipe->read($buf, $bytes);
	    push @events, "read $buf";

	    $steps->{'send_x'}->();
	};

	step send_x => sub {
	    Amanda::MainLoop::async_write(
		fd => $outpipe->fileno(),
		data => 'X',
		async_write_cb => $finished_cb);
	};
    }

    test_async_write(sub { Amanda::MainLoop::quit(); });
    Amanda::MainLoop::run();
    waitpid($pid, 0);

    is_deeply([ @events ],
	[ 'wrote 5', 'read FUDGE',
	  'wrote 8', 'read ICECREAM',
	  'wrote 8', 'read BROWNIES' ],
	"async_write works");
}

# test synchronized
{
    my $lock = [];
    my @messages;

    sub syncd1 {
	my ($msg, $cb) = @_;
	return Amanda::MainLoop::synchronized($lock, $cb, sub {
	    my ($ser_cb) = @_;
	    push @messages, "BEG-$msg";
	    Amanda::MainLoop::call_after(10, sub {
		push @messages, "END-$msg";
		$ser_cb->($msg);
	    });
	});
    };

    # add a second syncd function to demonstrate that several functions
    # can serialize on the same lock
    sub syncd2 {
	my ($msg, $fin_cb) = @_;
	return Amanda::MainLoop::synchronized($lock, $fin_cb, sub {
	    my ($ser_cb) = @_;
	    push @messages, "BEG2-$msg";
	    Amanda::MainLoop::call_after(10, sub {
		push @messages, "END2-$msg";
		$ser_cb->($msg);
	    });
	});
    };

    my $num_running = 3;
    my $fin_cb = sub {
	push @messages, "FIN-$_[0]";
	if (--$num_running == 0) {
	    Amanda::MainLoop::quit();
	}
    };

    syncd1("A", $fin_cb);
    syncd2("B", $fin_cb);
    syncd1("C", $fin_cb);

    Amanda::MainLoop::run();

    is_deeply([ @messages ],
	[
	    "BEG-A", "END-A", "FIN-A",
	    "BEG2-B", "END2-B", "FIN-B",
	    "BEG-C", "END-C", "FIN-C",
	], "synchronized works");
}