diff --git a/Changes b/Changes new file mode 100644 index 0000000..8f2372e --- /dev/null +++ b/Changes @@ -0,0 +1,126 @@ +Revision history for Perl extension IO::Multiplex. + +1.16 Thu Apr 9 17:00:00 CET 2015 + - Fix descriptor memory leak: + Make $mux->close actually untie *$fh + +1.15 Tue Mar 11 14:00:00 CET 2015 + - Move untie patch to the right place. + +1.14 Mon Mar 30 11:00:00 CET 2015 + - Fix 110_ntest to avoid reading from muxed handle. + - Apply patch to prevent untie warnings. + rt.cpan.org#67846 [dmcbridepath@cpan.org Darin McBride] + +1.13 Fri Apr 15 08:42:43 CEST 2011 + - fix handling of outbuf which contains '0'. + rt.cpan.org#67458 [Gordon Russell] + +1.12 Wed Feb 23 22:35:27 CET 2011 + - fix MSWin32 complaints + rt.cpan.org#66096 [Mark Dootson] + +1.11 Wed Feb 2 17:05:08 CET 2011 + - Avoid warning while adding pipe + rt.cpan.org#60068 [Khedin] and #16259 + - Add EWOULDBLOCK and non-blocking mode for windows, + rt.cpan.org#23982 [junk@penilecolada.com] + - Typo "closeing", rt.cpan.org#21085 [D Steinbrunner] + - avoid shutdown after close, + rt.cpan.org#5715 [Lars Jensen] and #5885 + - use length of outbuf, not exists to see if it is + empty. [Ersun Warncke] + - turn "use warnings" on + +1.10 Mon Sep 15 2008 + - Update Copyright and License. + - Buttwag around minor Windows deficiencies. + +1.09 Sat Mar 03 2007 + - Detect readability for special IO::Socket::SSL + handles more accurately. + - Avoid "freed value in iteration" crashing. + +1.08 Fri Nov 11 2003 + - Solaris/FreeBSD compatibility fixes. + - Add mux_epipe EPIPE hook, patch by + leo@strike.wu-wien.ac.at (Alexander Bergolth) + +1.07 Wed Jul 30 01:00:00 MDT 2003 + - Handle certain eof conditions better. + Spot by win@edx.de (Winfried Koenig) + - Fixed contrib/portfw to handle partly + shutdown sockets more accurately. + - Add udp support to contrib/portfw. + +1.06 Mon Jul 28 14:45:00 MDT 2003 + - Added contrib/portfw demo script. + +1.05 Web Apr 23 01:30:00 MST 2003 + - Handle rare ECONNRESET condition that can occur + when reading from the client on a socket that + has already been torn down. + - Added support for UDP (experimental). + Thanks klimkin@mcst.ru (Alexey Klimkin) + - Add writeable detection hook for sockets + even when the outbuffer is empty. + Patch by klimkin@mcst.ru (Alexey Klimkin) + +1.04 Sat Nov 23 12:00:00 MST 2002 + - Compatibility generalizations + perl 5.004 / 5.005 / 5.6.0 / 5.6.1 / 5.8.0 + Thanks muir@idiom.com (David Muir Sharnoff) + for bug reports and testing help. + - Rid old MVModule::MVmux::Handle package name. + - New Timeout Structure + Thanks dwebb@ovid.com (Douglas Webb) + - Use POSIX::read/write instead to avoid TIEHANDLE. + +1.03 Mon Jun 03 15:00:00 MDT 2002 + - Fix NAME for ABSTRACT_FROM setting. + - Added spec file for RPM + - Added TODO doc + - Avoid using Tie::RefHash to improve performance. + by dwebb@ovid.com (Douglas Webb) + +1.02 Tue Feb 05 13:05:00 MDT 2002 + - Allow the rest of input buffer to be consumed + when doing shutdown. + - Fix {_fhs} keys dereference race condition. + +1.01 Wed Oct 17 13:00:00 MDT 2001 + - The following by Rob Brown + - Ported to work with Perl 5.6.0 + - Added optional code ref for loop + - Added tie FILENO method + - Fixed tie CLOSE and shutdown to flush both + input {inbuffer} and output {outbuffer} correctly + +1.00 Wed Feb 23 12:07:07 PST 2000 + - Stable enough now. Call it a release. + +0.08 Thu Oct 28 16:06:33 MDT 1999 + - More autovivification ailments + +0.07 Mon Oct 25 15:01:46 PDT 1999 + - Fix documentation + +0.06 Sun Oct 10 13:25:03 MDT 1999 + - Fix autovivification evils + +0.05 Fri Oct 8 17:23:08 MDT 1999 + - Yet more EOF/close fixes + +0.04 Wed Sep 29 12:50:44 MDT 1999 + - Fix yesterday's fix + +0.03 Tue Sep 28 12:04:53 MDT 1999 + - Remove some debugging stuff + - Deal properly with the situation where a fd is shutdown in + response to EOF + +0.02 Wed Sep 22 15:44:28 PDT 1999 + - Check for errors on accept + +0.01 Fri Jul 2 10:39:23 1999 + - original version; created by h2xs 1.19 diff --git a/MANIFEST b/MANIFEST new file mode 100644 index 0000000..99a5436 --- /dev/null +++ b/MANIFEST @@ -0,0 +1,12 @@ +Changes +MANIFEST +META.yml +Makefile.PL +README +TODO +contrib/portfw +lib/IO/Multiplex.pm +t/100_load.t +t/110_ntest.t +t/110_test.t +t/200_udp.t diff --git a/META.yml b/META.yml new file mode 100644 index 0000000..2bf47ab --- /dev/null +++ b/META.yml @@ -0,0 +1,21 @@ +--- #YAML:1.0 +name: IO-Multiplex +version: 1.16 +abstract: Manage IO on many file handles +author: [] +license: unknown +distribution_type: module +configure_requires: + ExtUtils::MakeMaker: 0 +build_requires: + ExtUtils::MakeMaker: 0 +requires: + IO::Socket: 0 +no_index: + directory: + - t + - inc +generated_by: ExtUtils::MakeMaker version 6.55_02 +meta-spec: + url: http://module-build.sourceforge.net/META-spec-v1.4.html + version: 1.4 diff --git a/Makefile.PL b/Makefile.PL new file mode 100644 index 0000000..8e257a1 --- /dev/null +++ b/Makefile.PL @@ -0,0 +1,25 @@ +use ExtUtils::MakeMaker; +# See lib/ExtUtils/MakeMaker.pm for details of how to influence +# the contents of the Makefile that is written. +WriteMakefile + NAME => 'IO::Multiplex', + ABSTRACT_FROM => 'lib/IO/Multiplex.pm', + VERSION_FROM => 'lib/IO/Multiplex.pm', # finds $VERSION + PREREQ_PM => { # e.g., Module::Name => 1.1 + 'IO::Socket' => 0, + }, + dist => { + COMPRESS => 'gzip -vf', + }, + ; + +package MY; + +sub processPL { + my $self = shift; + my $block = $self->SUPER::processPL(@_); + # "Version:" in spec needs to match + # "$VERSION" from VERSION_FROM + $block =~ s%(spec.PL\s*)$%$1 \$\(VERSION_FROM\)%m; + $block; +} diff --git a/README b/README new file mode 100644 index 0000000..a25779e --- /dev/null +++ b/README @@ -0,0 +1,98 @@ +README for IO::Multiplex + +IO::Multiplex is designed to take the effort out of managing +multiple file handles. It is essentially a really fancy front end to +the C +loop, it buffers all input and output to/from the file handles. It +can also accept incoming connections on one or more listen sockets. + +It is object oriented in design, and will notify you of significant events +by calling methods on an object that you supply. If you are not using +objects, you can simply supply __PACKAGE__ instead of an object reference. + +You may have one callback object registered for each file handle, or +one global one. Possibly both -- the per-file handle callback object +will be used instead of the global one. + +Each file handle may also have a timer associated with it. A callback +function is called when the timer expires. + +Here's an example which implements the beginnings of a multiuser game: + + use IO::Socket; + use IO::Multiplex; + use Tie::RefHash; + + my $mux = new IO::Multiplex; + + # Create a listening socket + my $sock = new IO::Socket::INET(Proto => 'tcp', + LocalPort => shift || 2300, + Listen => 4) + or die "socket: $@"; + + # We use the listen method instead of the add method. + $mux->listen($sock); + + $mux->set_callback_object(__PACKAGE__); + $mux->loop; + + # mux_connection is called when a new connection is accepted. + sub mux_connection { + my $package = shift; + my $mux = shift; + my $fh = shift; + + # Construct a new player object + Player->new($mux, $fh); + } + + package Player; + + my %players = (); + + sub new { + my $package = shift; + my $self = bless { mux => shift, + fh => shift } => $package; + + # Register the new player object as the callback specifically for + # this file handle. + $mux->set_callback_object($self, $self->{fh}); + print $self->{fh} + "Greetings, Professor. Would you like to play a game?\n"; + + # Register this player object in the main list of players + $players{$self} = $self; + $mux->set_timeout($self->{fh}, 1); + } + + sub players { return values %players; } + + sub mux_input { + my $self = shift; + shift; shift; # These two args are boring + my $input = shift; # Scalar reference to the input + + # Process each line in the input, leaving partial lines + # in the input buffer + while ($$input =~ s/^(.*?\n)//) { + $self->process_command($1); + } + } + + sub mux_close { + my $self = shift; + + # Player disconnected; + # [Notify other players or something...] + delete $players{$self}; + } + # This gets called every second to update player info, etc... + sub mux_timeout { + my $self = shift; + my $mux = shift; + + $self->heartbeat; + $mux->set_timeout($self->{fh}, 1); + } diff --git a/TODO b/TODO new file mode 100644 index 0000000..265a2f8 --- /dev/null +++ b/TODO @@ -0,0 +1,13 @@ +Things still TODO +================= + +Fix a few broken semantics (shutdown semantics are wrong, for one) +Limits on the size of the buffers +Maybe rewrite to use Event module at the core +Make it work with SSL sockets (difficult) +Benchmarking stats. +More optimizations. +More examples. +Feature to let output buffer to switch to a file + after certain limits in order to save memory + and extend the buffer size. diff --git a/contrib/portfw b/contrib/portfw new file mode 100755 index 0000000..e2c7c8b --- /dev/null +++ b/contrib/portfw @@ -0,0 +1,193 @@ +#!/usr/bin/perl -w + +=pod + +=head1 NAME + +portfw - Port forwarder + +=head1 SYNOPSYS + +portfw [-p pidfile] [local_ip:]local_port[/proto] remote_ip[:remote_port] + +=head1 DESCRIPTION + +Forwards all incoming request from local_port to remote_port. If +local_ip is not specified, all addresses on all interfaces are used. +If no remote_port is specified, then the same local_port is assumed +as the default. If no /proto is specified, tcp is assumed. + +=head1 AUTHOR + +Rob Brown - bbb@cpan.org + +$Id: portfw,v 1.7 2003/07/30 06:50:26 rob Exp $ + +=cut + +use strict; +use Getopt::Long; +use IO::Multiplex; +use IO::Socket; + +my $pidfile; +GetOptions + "pidfile=s" => \$pidfile, + ; + +my ($local_addr,$remote_addr)=@ARGV; +die "Missing local port\n" if !$local_addr; +die "Missing remote ip\n" if !$remote_addr; + +my ($local_ip, $local_port, $proto, + $remote_ip,$remote_port); +if ($local_addr =~ s%/(\w+)$%%) { + $proto = $1; +} else { + $proto = "tcp"; +} +if ($local_addr =~ s%^([\d\.]+):%%) { + $local_ip = $1; +} else { + $local_ip = "0.0.0.0"; +} +if ($local_addr =~ m%^(\d+)$%) { + $local_port = $1; +} else { + die "Invalid local port [$local_addr]\n"; +} +if ($remote_addr =~ s%:(\d+)$%%) { + $remote_port = $1; +} else { + $remote_port = $local_port; +} +if ($remote_addr =~ m%^([\d\.]+)$%) { + $remote_ip = $1; +} else { + die "Invalid remote ip [$remote_addr]\n"; +} + +print STDERR "Forwarding $proto packets from $local_ip:$local_port to $remote_ip:$remote_port\n"; + +# Get ready to receive an incoming connection +my $listen = new IO::Socket::INET + LocalAddr => $local_ip, + LocalPort => $local_port, + Proto => $proto, + ReuseAddr => 1, + $proto eq "tcp"?(Listen => 10):(), + or die "Could not bind local port $local_port/$proto: $!"; + +# Just test the remote connection once. +my $remote_connect = new IO::Socket::INET + PeerAddr => $remote_ip, + PeerPort => $remote_port, + Proto => $proto, + or die "Could not connect to remote $remote_ip:$remote_port/$proto: $!"; + +if ($proto eq "tcp") { + # Close the test tcp socket + $remote_connect->close; +} elsif ($proto eq "udp") { + # Keep this around for udp replies +} else { + die "Unimplemented protocol $proto\n"; +} + +if ($pidfile) { + if (my $pid = fork) { + open (PID, ">$pidfile") or die "WARNING: Cannot create $pidfile: $!\n"; + print PID "$pid\n"; + close PID; + exit; + } elsif (!defined $pid) { + die "fork: $!\n"; + } + $SIG{TERM} = sub { + unlink $pidfile; + exit; + }; +} else { + exit if fork; +} +open STDIN, "/dev/null"; +open STDERR, ">/dev/null"; + +my $mux = new IO::Multiplex; +$mux->set_callback_object("My::Portfw"); +if ($proto eq "tcp") { + $mux->listen($listen); +} elsif ($proto eq "udp") { + $My::Portfw::complement{"$listen"} = $remote_connect; + $My::Portfw::complement{"$remote_connect"} = $listen; + $mux->add($listen); + $mux->add($remote_connect); +} else { + die "Unimplemented proto [$proto]"; +} +$mux->loop; +# Never reaches here +exit 1; + +package My::Portfw; +use vars qw(%complement); + +sub mux_connection { + my $self = shift; + my $mux = shift; + my $fh = shift; + my $remote_client = new IO::Socket::INET + PeerAddr => $remote_ip, + PeerPort => $remote_port, + Proto => $proto; + if (!$remote_client) { + warn "FAILED!\n"; + # Remote connection failed + $fh->write("Server Down! $!\n"); + $fh->close; + return; + } + $mux->add($remote_client); + $complement{"$fh"} = $remote_client; + $complement{"$remote_client"} = $fh; + return 1; +} + +sub mux_input { + my $self = shift; + my $mux = shift; + my $fh = shift; + my $data = shift; + if (my $proxy = $complement{"$fh"}) { + # Consume the packet by sending to its complement socket. + $proxy->write($$data); + $$data = ""; + } else { + # Not sure what to do, close it. + $$data = ""; + $fh->close; + } +} + +sub mux_eof { + my $self = shift; + my $mux = shift; + my $fh = shift; + my $data = shift; + if (my $proxy = $complement{"$fh"}) { + # Consume the packet by sending to its complement socket. + $proxy->write($$data); + $$data = ""; + # If this has been closed for writing, + # then close the complement for writing too. + $mux->shutdown($proxy, 1); + } +} + +sub mux_close { + my $self = shift; + my $mux = shift; + my $fh = shift; + delete $complement{"$fh"} if exists $complement{"$fh"}; +} diff --git a/lib/IO/Multiplex.pm b/lib/IO/Multiplex.pm new file mode 100644 index 0000000..f1d4ab0 --- /dev/null +++ b/lib/IO/Multiplex.pm @@ -0,0 +1,1113 @@ +package IO::Multiplex; + +use strict; +use warnings; + +our $VERSION = '1.16'; + +=head1 NAME + +IO::Multiplex - Manage IO on many file handles + +=head1 SYNOPSIS + + use IO::Multiplex; + + my $mux = new IO::Multiplex; + $mux->add($fh1); + $mux->add(\*FH2); + $mux->set_callback_object(...); + $mux->listen($server_socket); + $mux->loop; + + sub mux_input { ... } + +C is designed to take the effort out of managing +multiple file handles. It is essentially a really fancy front end to +the C +loop, it buffers all input and output to/from the file handles. It +can also accept incoming connections on one or more listen sockets. + +=head1 DESCRIPTION + +It is object oriented in design, and will notify you of significant events +by calling methods on an object that you supply. If you are not using +objects, you can simply supply C<__PACKAGE__> instead of an object reference. + +You may have one callback object registered for each file handle, or +one global one. Possibly both -- the per-file handle callback object +will be used instead of the global one. + +Each file handle may also have a timer associated with it. A callback +function is called when the timer expires. + +=head2 Handling input on descriptors + +When input arrives on a file handle, the C method is called +on the appropriate callback object. This method is passed three +arguments (in addition to the object reference itself of course): + +=over 4 + +=item 1 + +a reference to the mux, + +=item 2 + +A reference to the file handle, and + +=item 3 + +a reference to the input buffer for the file handle. + +=back + +The method should remove the data that it has consumed from the +reference supplied. It may leave unconsumed data in the input buffer. + +=head2 Handling output to descriptors + +If C did not handle output to the file handles as well +as input from them, then there is a chance that the program could +block while attempting to write. If you let the multiplexer buffer +the output, it will write the data only when the file handle is +capable of receiveing it. + +The basic method for handing output to the multiplexer is the C +method, which simply takes a file descriptor and the data to be +written, like this: + + $mux->write($fh, "Some data"); + +For convenience, when the file handle is Ced to the multiplexer, it +is tied to a special class which intercepts all attempts to write to the +file handle. Thus, you can use print and printf to send output to the +handle in a normal manner: + + printf $fh "%s%d%X", $foo, $bar, $baz + +Unfortunately, Perl support for tied file handles is incomplete, and +functions such as C cannot be supported. + +Also, file handle object methods such as the C method of +C cannot be intercepted. + +=head1 EXAMPLES + +=head2 Simple Example + +This is a simple telnet-like program, which demonstrates the concepts +covered so far. It does not really work too well against a telnet +server, but it does OK against the sample server presented further down. + + use IO::Socket; + use IO::Multiplex; + + # Create a multiplex object + my $mux = new IO::Multiplex; + # Connect to the host/port specified on the command line, + # or localhost:23 + my $sock = new IO::Socket::INET(Proto => 'tcp', + PeerAddr => shift || 'localhost', + PeerPort => shift || 23) + or die "socket: $@"; + + # add the relevant file handles to the mux + $mux->add($sock); + $mux->add(\*STDIN); + # We want to buffer output to the terminal. This prevents the program + # from blocking if the user hits CTRL-S for example. + $mux->add(\*STDOUT); + + # We're not object oriented, so just request callbacks to the + # current package + $mux->set_callback_object(__PACKAGE__); + + # Enter the main mux loop. + $mux->loop; + + # mux_input is called when input is available on one of + # the descriptors. + sub mux_input { + my $package = shift; + my $mux = shift; + my $fh = shift; + my $input = shift; + + # Figure out whence the input came, and send it on to the + # other place. + if ($fh == $sock) { + print STDOUT $$input; + } else { + print $sock $$input; + } + # Remove the input from the input buffer. + $$input = ''; + } + + # This gets called if the other end closes the connection. + sub mux_close { + print STDERR "Connection Closed\n"; + exit; + } + +=head2 A server example + +Servers are just as simple to write. We just register a listen socket +with the multiplex object C method. It will automatically +accept connections on it and add them to its list of active file handles. + +This example is a simple chat server. + + use IO::Socket; + use IO::Multiplex; + + my $mux = new IO::Multiplex; + + # Create a listening socket + my $sock = new IO::Socket::INET(Proto => 'tcp', + LocalPort => shift || 2300, + Listen => 4) + or die "socket: $@"; + + # We use the listen method instead of the add method. + $mux->listen($sock); + + $mux->set_callback_object(__PACKAGE__); + $mux->loop; + + sub mux_input { + my $package = shift; + my $mux = shift; + my $fh = shift; + my $input = shift; + + # The handles method returns a list of references to handles which + # we have registered, except for listen sockets. + foreach $c ($mux->handles) { + print $c $$input; + } + $$input = ''; + } + +=head2 A more complex server example + +Let us take a look at the beginnings of a multi-user game server. We will +have a Player object for each player. + + # Paste the above example in here, up to but not including the + # mux_input subroutine. + + # mux_connection is called when a new connection is accepted. + sub mux_connection { + my $package = shift; + my $mux = shift; + my $fh = shift; + + # Construct a new player object + Player->new($mux, $fh); + } + + package Player; + + my %players = (); + + sub new { + my $package = shift; + my $self = bless { mux => shift, + fh => shift } => $package; + + # Register the new player object as the callback specifically for + # this file handle. + + $self->{mux}->set_callback_object($self, $self->{fh}); + print $self->{fh} + "Greetings, Professor. Would you like to play a game?\n"; + + # Register this player object in the main list of players + $players{$self} = $self; + $mux->set_timeout($self->{fh}, 1); + } + + sub players { return values %players; } + + sub mux_input { + my $self = shift; + shift; shift; # These two args are boring + my $input = shift; # Scalar reference to the input + + # Process each line in the input, leaving partial lines + # in the input buffer + while ($$input =~ s/^(.*?)\n//) { + $self->process_command($1); + } + } + + sub mux_close { + my $self = shift; + + # Player disconnected; + # [Notify other players or something...] + delete $players{$self}; + } + # This gets called every second to update player info, etc... + sub mux_timeout { + my $self = shift; + my $mux = shift; + + $self->heartbeat; + $mux->set_timeout($self->{fh}, 1); + } + +=head1 METHODS + +=cut + +use POSIX qw(errno_h BUFSIZ); +use Socket; +use FileHandle qw(autoflush); +use IO::Handle; +use Fcntl; +use Carp qw(carp); +use constant IsWin => ($^O eq 'MSWin32'); + + +BEGIN { + eval { + # Can optionally use Hi Res timers if available + require Time::HiRes; + Time::HiRes->import('time'); + }; +} + +# This is what you want. Trust me. +$SIG{PIPE} = 'IGNORE'; + +{ no warnings; + if(IsWin) { *EWOULDBLOCK = sub() {10035} } +} + +=head2 new + +Construct a new C object. + + $mux = new IO::Multiplex; + +=cut + +sub new +{ + my $package = shift; + my $self = bless { _readers => '', + _writers => '', + _fhs => {}, + _handles => {}, + _timerkeys => {}, + _timers => [], + _listen => {} } => $package; + return $self; +} + +=head2 listen + +Add a socket to be listened on. The socket should have had the +C and C system calls already applied to it. The C +module will do this for you. + + $socket = new IO::Socket::INET(Listen => ..., LocalAddr => ...); + $mux->listen($socket); + +Connections will be automatically accepted and Ced to the multiplex +object. C callback method will also be called. + +=cut + +sub listen +{ + my $self = shift; + my $fh = shift; + + $self->add($fh); + $self->{_fhs}{"$fh"}{listen} = 1; + $fh; +} + +=head2 add + +Add a file handle to the multiplexer. + + $mux->add($fh); + +As a side effect, this sets non-blocking mode on the handle, and disables +STDIO buffering. It also ties it to intercept output to the handle. + +=cut + +sub add +{ + my $self = shift; + my $fh = shift; + + return if $self->{_fhs}{"$fh"}; + + nonblock($fh); + autoflush($fh, 1); + fd_set($self->{_readers}, $fh, 1); + + my $sockopt = getsockopt $fh, SOL_SOCKET, SO_TYPE; + $self->{_fhs}{"$fh"}{udp_true} = 1 + if defined $sockopt && SOCK_DGRAM == unpack "i", $sockopt; + + $self->{_fhs}{"$fh"}{inbuffer} = ''; + $self->{_fhs}{"$fh"}{outbuffer} = ''; + $self->{_fhs}{"$fh"}{fileno} = fileno($fh); + $self->{_handles}{"$fh"} = $fh; + tie *$fh, "IO::Multiplex::Handle", $self, $fh; + return $fh; +} + +=head2 remove + +Removes a file handle from the multiplexer. This also unties the +handle. It does not currently turn STDIO buffering back on, or turn +off non-blocking mode. + + $mux->remove($fh); + +=cut + +sub remove +{ + my $self = shift; + my $fh = shift; + fd_set($self->{_writers}, $fh, 0); + fd_set($self->{_readers}, $fh, 0); + delete $self->{_fhs}{"$fh"}; + delete $self->{_handles}{"$fh"}; + $self->_removeTimer($fh); + untie *$fh; + return 1; +} + +=head2 set_callback_object + +Set the object on which callbacks are made. If you are not using objects, +you can specify the name of the package into which the method calls are +to be made. + +If a file handle is supplied, the callback object is specific for that +handle: + + $mux->set_callback_object($object, $fh); + +Otherwise, it is considered a default callback object, and is used when +events occur on a file handle that does not have its own callback object. + + $mux->set_callback_object(__PACKAGE__); + +The previously registered object (if any) is returned. + +See also the CALLBACK INTERFACE section. + +=cut + +sub set_callback_object +{ + my $self = shift; + my $obj = shift; + my $fh = shift; + return if $fh && !exists($self->{_fhs}{"$fh"}); + + my $old = $fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object}; + + $fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object} = $obj; + return $old; +} + +=head2 kill_output + +Remove any pending output on a file descriptor. + + $mux->kill_output($fh); + +=cut + +sub kill_output +{ + my $self = shift; + my $fh = shift; + return unless $fh && exists($self->{_fhs}{"$fh"}); + + $self->{_fhs}{"$fh"}{outbuffer} = ''; + fd_set($self->{_writers}, $fh, 0); +} + +=head2 outbuffer + +Return or set the output buffer for a descriptor + + $output = $mux->outbuffer($fh); + $mux->outbuffer($fh, $output); + +=cut + +sub outbuffer +{ + my $self = shift; + my $fh = shift; + return unless $fh && exists($self->{_fhs}{"$fh"}); + + if (@_) { + $self->{_fhs}{"$fh"}{outbuffer} = $_[0] if @_; + fd_set($self->{_writers}, $fh, 0) if !$_[0]; + } + + $self->{_fhs}{"$fh"}{outbuffer}; +} + +=head2 inbuffer + +Return or set the input buffer for a descriptor + + $input = $mux->inbuffer($fh); + $mux->inbuffer($fh, $input); + +=cut + +sub inbuffer +{ + my $self = shift; + my $fh = shift; + return unless $fh && exists($self->{_fhs}{"$fh"}); + + if (@_) { + $self->{_fhs}{"$fh"}{inbuffer} = $_[0] if @_; + } + + return $self->{_fhs}{"$fh"}{inbuffer}; +} + +=head2 set_timeout + +Set the timer for a file handle. The timeout value is a certain number of +seconds in the future, after which the C callback is called. + +If the C module is installed, the timers may be specified in +fractions of a second. + +Timers are not reset automatically. + + $mux->set_timeout($fh, 23.6); + +Use C<$mux-Eset_timeout($fh, undef)> to cancel a timer. + +=cut + +sub set_timeout +{ + my $self = shift; + my $fh = shift; + my $timeout = shift; + return unless $fh && exists($self->{_fhs}{"$fh"}); + + if (defined $timeout) { + $self->_addTimer($fh, $timeout + time); + } else { + $self->_removeTimer($fh); + } +} + +=head2 handles + +Returns a list of handles that the C object knows about, +excluding listen sockets. + + @handles = $mux->handles; + +=cut + +sub handles +{ + my $self = shift; + + return grep(!$self->{_fhs}{"$_"}{listen}, values %{$self->{_handles}}); +} + +sub _addTimer { + my $self = shift; + my $fh = shift; + my $time = shift; + + # Set a key so that we can quickly tell if a given $fh has + # a timer set + $self->{_timerkeys}{"$fh"} = 1; + + # Store the timeout in an array, and resort it + @{$self->{_timers}} = sort { $a->[1] <=> $b->[1] } (@{$self->{_timers}}, [ $fh, $time ] ); +} + +sub _removeTimer { + my $self = shift; + my $fh = shift; + + # Return quickly if no timer is set + return unless exists $self->{_timerkeys}{"$fh"}; + + # Remove the timeout from the sorted array + @{$self->{_timers}} = grep { $_->[0] ne $fh } @{$self->{_timers}}; + + # Get rid of the key + delete $self->{_timerkeys}{"$fh"}; +} + + +=head2 loop + +Enter the main loop and start processing IO events. + + $mux->loop; + +=cut + +sub loop +{ + my $self = shift; + my $heartbeat = shift; + $self->{_endloop} = 0; + + while (!$self->{_endloop} && keys %{$self->{_fhs}}) { + my $rv; + my $data; + my $rdready = ""; + my $wrready = ""; + my $timeout = undef; + + foreach my $fh (values %{$self->{_handles}}) { + fd_set($rdready, $fh, 1) if + ref($fh) =~ /SSL/ && + $fh->can("pending") && + $fh->pending; + } + + if (!length $rdready) { + if (@{$self->{_timers}}) { + $timeout = $self->{_timers}[0][1] - time; + } + + my $numready = select($rdready=$self->{_readers}, + $wrready=$self->{_writers}, + undef, + $timeout); + + unless(defined($numready)) { + if ($! == EINTR || $! == EAGAIN) { + next; + } else { + last; + } + } + } + + &{ $heartbeat } ($rdready, $wrready) if $heartbeat; + + foreach my $k (keys %{$self->{_handles}}) { + my $fh = $self->{_handles}->{$k} or next; + + # Avoid creating a permanent empty hash ref for "$fh" + # by attempting to access its {object} element + # if it has already been closed. + next unless exists $self->{_fhs}{"$fh"}; + + # It is not easy to replace $self->{_fhs}{"$fh"} with a + # variable, because some mux_* routines may remove it as + # side-effect. + + # Get the callback object. + my $obj = $self->{_fhs}{"$fh"}{object} || + $self->{_object}; + + # Is this descriptor ready for reading? + if (fd_isset($rdready, $fh)) + { + if ($self->{_fhs}{"$fh"}{listen}) { + # It's a server socket, so a new connection is + # waiting to be accepted + my $client = $fh->accept; + next unless ($client); + $self->add($client); + $obj->mux_connection($self, $client) + if $obj && $obj->can("mux_connection"); + } else { + if ($self->is_udp($fh)) { + $rv = recv($fh, $data, BUFSIZ, 0); + if (defined $rv) { + # Remember where the last UDP packet came from + $self->{_fhs}{"$fh"}{udp_peer} = $rv; + } + } else { + $rv = &POSIX::read(fileno($fh), $data, BUFSIZ); + } + + if (defined($rv) && length($data)) { + # Append the data to the client's receive buffer, + # and call process_input to see if anything needs to + # be done. + $self->{_fhs}{"$fh"}{inbuffer} .= $data; + $obj->mux_input($self, $fh, + \$self->{_fhs}{"$fh"}{inbuffer}) + if $obj && $obj->can("mux_input"); + } else { + unless (defined $rv) { + next if + $! == EINTR || + $! == EAGAIN || + $! == EWOULDBLOCK; + warn "IO::Multiplex read error: $!" + if $! != ECONNRESET; + } + # There's an error, or we received EOF. If + # there's pending data to be written, we leave + # the connection open so it can be sent. If + # the other end is closed for writing, the + # send will error and we close down there. + # Either way, we remove it from _readers as + # we're no longer interested in reading from + # it. + fd_set($self->{_readers}, $fh, 0); + $obj->mux_eof($self, $fh, + \$self->{_fhs}{"$fh"}{inbuffer}) + if $obj && $obj->can("mux_eof"); + + if (exists $self->{_fhs}{"$fh"}) { + $self->{_fhs}{"$fh"}{inbuffer} = ''; + # The mux_eof handler could have responded + # with a shutdown for writing. + $self->close($fh) + unless exists $self->{_fhs}{"$fh"} + && length $self->{_fhs}{"$fh"}{outbuffer}; + } + next; + } + } + } # end if readable + next unless exists $self->{_fhs}{"$fh"}; + + if (fd_isset($wrready, $fh)) { + unless (length $self->{_fhs}{"$fh"}{outbuffer}) { + fd_set($self->{_writers}, $fh, 0); + $obj->mux_outbuffer_empty($self, $fh) + if ($obj && $obj->can("mux_outbuffer_empty")); + next; + } + $rv = &POSIX::write(fileno($fh), + $self->{_fhs}{"$fh"}{outbuffer}, + length($self->{_fhs}{"$fh"}{outbuffer})); + unless (defined($rv)) { + # We got an error writing to it. If it's + # EWOULDBLOCK (shouldn't happen if select told us + # we can write) or EAGAIN, or EINTR we don't worry + # about it. otherwise, close it down. + unless ($! == EWOULDBLOCK || + $! == EINTR || + $! == EAGAIN) { + if ($! == EPIPE) { + $obj->mux_epipe($self, $fh) + if $obj && $obj->can("mux_epipe"); + } else { + warn "IO::Multiplex: write error: $!\n"; + } + $self->close($fh); + } + next; + } + substr($self->{_fhs}{"$fh"}{outbuffer}, 0, $rv) = ''; + unless (length $self->{_fhs}{"$fh"}{outbuffer}) { + # Mark us as not writable if there's nothing more to + # write + fd_set($self->{_writers}, $fh, 0); + $obj->mux_outbuffer_empty($self, $fh) + if ($obj && $obj->can("mux_outbuffer_empty")); + + if ( $self->{_fhs}{"$fh"} + && $self->{_fhs}{"$fh"}{shutdown}) { + # If we've been marked for shutdown after write + # do it. + shutdown($fh, 1); + $self->{_fhs}{"$fh"}{outbuffer} = ''; + unless (length $self->{_fhs}{"$fh"}{inbuffer}) { + # We'd previously been shutdown for reading + # also, so close out completely + $self->close($fh); + next; + } + } + } + } # End if writeable + + next unless exists $self->{_fhs}{"$fh"}; + + } # End foreach $fh (...) + + $self->_checkTimeouts() if @{$self->{_timers}}; + + } # End while(loop) +} + +sub _checkTimeouts { + my $self = shift; + + # Get the current time + my $time = time; + + # Copy all of the timers that should go off into + # a temporary array. This allows us to modify the + # real array as we process the timers, without + # interfering with the loop. + + my @timers = (); + foreach my $timer (@{$self->{_timers}}) { + # If the timer is in the future, we can stop + last if $timer->[1] > $time; + push @timers, $timer; + } + + foreach my $timer (@timers) { + my $fh = $timer->[0]; + $self->_removeTimer($fh); + + next unless exists $self->{_fhs}{"$fh"}; + + my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object}; + $obj->mux_timeout($self, $fh) if $obj && $obj->can("mux_timeout"); + } +} + + +=head2 endloop + +Prematurly terminate the loop. The loop will automatically terminate +when there are no remaining descriptors to be watched. + + $mux->endloop; + +=cut + +sub endloop +{ + my $self = shift; + $self->{_endloop} = 1; +} + +=head2 udp_peer + +Get peer endpoint of where the last udp packet originated. + + $saddr = $mux->udp_peer($fh); + +=cut + +sub udp_peer { + my $self = shift; + my $fh = shift; + return $self->{_fhs}{"$fh"}{udp_peer}; +} + +=head2 is_udp + +Sometimes UDP packets require special attention. +This method will tell if a file handle is of type UDP. + + $is_udp = $mux->is_udp($fh); + +=cut + +sub is_udp { + my $self = shift; + my $fh = shift; + return $self->{_fhs}{"$fh"}{udp_true}; +} + +=head2 write + +Send output to a file handle. + + $mux->write($fh, "'ere I am, JH!\n"); + +=cut + +sub write +{ + my $self = shift; + my $fh = shift; + my $data = shift; + return unless $fh && exists($self->{_fhs}{"$fh"}); + + if ($self->{_fhs}{"$fh"}{shutdown}) { + $! = EPIPE; + return undef; + } + if ($self->is_udp($fh)) { + if (my $udp_peer = $self->udp_peer($fh)) { + # Send the packet back to the last peer that said something + return send($fh, $data, 0, $udp_peer); + } else { + # No udp_peer yet? + # This better be a connect()ed UDP socket + # or else this will fail with ENOTCONN + return send($fh, $data, 0); + } + } + $self->{_fhs}{"$fh"}{outbuffer} .= $data; + fd_set($self->{_writers}, $fh, 1); + return length($data); +} + +=head2 shutdown + +Shut down a socket for reading or writing or both. See the C +Perl documentation for further details. + +If the shutdown is for reading, it happens immediately. However, +shutdowns for writing are delayed until any pending output has been +successfully written to the socket. + + $mux->shutdown($socket, 1); + +=cut + +sub shutdown +{ + my $self = shift; + my $fh = shift; + my $which = shift; + return unless $fh && exists($self->{_fhs}{"$fh"}); + + if ($which == 0 || $which == 2) { + # Shutdown for reading. We can do this now. + shutdown($fh, 0); + # The mux_eof hook must be run from the main loop to consume + # the rest of the inbuffer if there is anything left. + # It will also remove $fh from _readers. + } + + if ($which == 1 || $which == 2) { + # Shutdown for writing. Only do this now if there is no pending + # data. + if(length $self->{_fhs}{"$fh"}{outbuffer}) { + $self->{_fhs}{"$fh"}{shutdown} = 1; + } else { + shutdown($fh, 1); + $self->{_fhs}{"$fh"}{outbuffer} = ''; + } + } + # Delete the descriptor if it's totally gone. + unless (length $self->{_fhs}{"$fh"}{inbuffer} || + length $self->{_fhs}{"$fh"}{outbuffer}) { + $self->close($fh); + } +} + +=head2 close + +Close a handle. Always use this method to close a handle that is being +watched by the multiplexer. + + $mux->close($fh); + +=cut + +sub close +{ + my $self = shift; + my $fh = shift; + return unless exists $self->{_fhs}{"$fh"}; + + my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object}; + warn "closing with read buffer" if length $self->{_fhs}{"$fh"}{inbuffer}; + warn "closing with write buffer" if length $self->{_fhs}{"$fh"}{outbuffer}; + + fd_set($self->{_readers}, $fh, 0); + fd_set($self->{_writers}, $fh, 0); + + delete $self->{_fhs}{"$fh"}; + delete $self->{_handles}{"$fh"}; + untie *$fh; + close $fh; + $obj->mux_close($self, $fh) if $obj && $obj->can("mux_close"); +} + +# We set non-blocking mode on all descriptors. If we don't, then send +# might block if the data is larger than the kernel can accept all at once, +# even though select told us we can write. With non-blocking mode, we +# get a partial write in those circumstances, which is what we want. + +sub nonblock +{ my $fh = shift; + + if(IsWin) + { ioctl($fh, 0x8004667e, pack("L!", 1)); + } + else + { my $flags = fcntl($fh, F_GETFL, 0) + or die "fcntl F_GETFL: $!\n"; + fcntl($fh, F_SETFL, $flags | O_NONBLOCK) + or die "fcntl F_SETFL $!\n"; + } +} + +sub fd_set +{ + vec($_[0], fileno($_[1]), 1) = $_[2]; +} + +sub fd_isset +{ + return vec($_[0], fileno($_[1]), 1); +} + +# We tie handles into this package to handle write buffering. + +package IO::Multiplex::Handle; + +use strict; +use Tie::Handle; +use Carp; +use vars qw(@ISA); +@ISA = qw(Tie::Handle); + +sub FILENO +{ + my $self = shift; + return ($self->{_mux}->{_fhs}->{"$self->{_fh}"}->{fileno}); +} + + +sub TIEHANDLE +{ + my $package = shift; + my $mux = shift; + my $fh = shift; + + my $self = bless { _mux => $mux, + _fh => $fh } => $package; + return $self; +} + +sub WRITE +{ + my $self = shift; + my ($msg, $len, $offset) = @_; + $offset ||= 0; + return $self->{_mux}->write($self->{_fh}, substr($msg, $offset, $len)); +} + +sub CLOSE +{ + my $self = shift; + return $self->{_mux}->shutdown($self->{_fh}, 2); +} + +sub READ +{ + carp "Do not read from a muxed file handle"; +} + +sub READLINE +{ + carp "Do not read from a muxed file handle"; +} + +sub FETCH +{ + return "Fnord"; +} + +sub UNTIE {} + +1; + +__END__ + +=head1 CALLBACK INTERFACE + +Callback objects should support the following interface. You do not have +to provide all of these methods, just provide the ones you are interested in. + +All methods receive a reference to the callback object (or package) as +their first argument, in the traditional object oriented +way. References to the C object and the relevant file +handle are also provided. This will be assumed in the method +descriptions. + +=head2 mux_input + +Called when input is ready on a descriptor. It is passed a reference to +the input buffer. It should remove any input that it has consumed, and +leave any partially received data in the buffer. + + sub mux_input { + my $self = shift; + my $mux = shift; + my $fh = shift; + my $data = shift; + + # Process each line in the input, leaving partial lines + # in the input buffer + while ($$data =~ s/^(.*?\n)//) { + $self->process_command($1); + } + } + +=head2 mux_eof + +This is called when an end-of-file condition is present on the descriptor. +This is does not nessecarily mean that the descriptor has been closed, as +the other end of a socket could have used C to close just half +of the socket, leaving us free to write data back down the still open +half. Like mux_input, it is also passed a reference to the input buffer. +It should consume the entire buffer or else it will just be lost. + +In this example, we send a final reply to the other end of the socket, +and then shut it down for writing. Since it is also shut down for reading +(implicly by the EOF condition), it will be closed once the output has +been sent, after which the mux_close callback will be called. + + sub mux_eof { + my $self = shift; + my $mux = shift; + my $fh = shift; + + print $fh "Well, goodbye then!\n"; + $mux->shutdown($fh, 1); + } + +=head2 mux_close + +Called when a handle has been completely closed. At the time that +C is called, the handle will have been removed from the +multiplexer, and untied. + +=head2 mux_outbuffer_empty + +Called after all pending output has been written to the file descriptor. + +=head2 mux_connection + +Called upon a new connection being accepted on a listen socket. + +=head2 mux_timeout + +Called when a timer expires. + +=head1 AUTHOR + +Copyright 1999 Bruce J Keeler + +Copyright 2001-2008 Rob Brown + +Released under the same terms as Perl itself. + +$Id: Multiplex.pm,v 1.45 2015/04/09 21:27:54 rob Exp $ + +=cut diff --git a/t/100_load.t b/t/100_load.t new file mode 100644 index 0000000..dfd7636 --- /dev/null +++ b/t/100_load.t @@ -0,0 +1,17 @@ +# Before `make install' is performed this script should be runnable with +# `make test'. After `make install' it should work as `perl test.t' + +######################### We start with some black magic to print on failure. + +BEGIN { $| = 1; $loaded = 0; print "1..1\n"; } +END { print "not ok 1\n" unless $loaded; } + +use IO::Multiplex; +$loaded = 1; +print "ok 1\n"; + +######################### End of black magic. + +# Insert your test code below (better if it prints "ok 13" +# (correspondingly "not ok 13") depending on the success of chunk 13 +# of the test code): diff --git a/t/110_ntest.t b/t/110_ntest.t new file mode 100644 index 0000000..2414175 --- /dev/null +++ b/t/110_ntest.t @@ -0,0 +1,89 @@ +# Before `make install' is performed this script should be runnable with +# `make test'. After `make install' it should work as `perl test.t' + +######################### We start with some black magic to print on failure. + +# Change 1..1 below to 1..last_test_to_print. +# Testing syswrite() to a MUX handle + +use strict; +BEGIN { $| = 1; print "1..8\n";} +my $loaded; +END {print "not ok 1\n" unless $loaded;} +use IO::Socket; +use IO::Multiplex; +use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); +$loaded = 1; + +my $test_msg = "Hello\n"; + +print "ok 1\n"; + +######################### End of black magic. + +# Insert your test code below (better if it prints "ok 13" +# (correspondingly "not ok 13") depending on the success of chunk 13 +# of the test code): + +my $mux = new IO::Multiplex; + +print $mux ? "ok 2\n" : "not ok 2\n"; + +my $client_socket; +my $listen_socket = IO::Socket::INET->new(Proto => 'tcp', + Listen => 4); + +print $listen_socket ? "ok 3\n" : "not ok 3\n"; + +my $port = $listen_socket->sockport; + +my $test_no = 4; + +$SIG{ALRM} = sub { print "not ok $test_no\n"; exit }; + +alarm(20); + +$mux->listen($listen_socket); +$mux->set_callback_object(__PACKAGE__); +$mux->set_timeout($listen_socket, 5); +#print STDERR "DEBUG: Doing loop...\n"; +$mux->loop; + +sub mux_timeout +{ + #print STDERR "DEBUG: mux_timeout reached!\n"; + print "ok 4\n"; + + $test_no = 5; + $client_socket = IO::Socket::INET->new(PeerAddr => "127.0.0.1", + PeerPort => $port, + Proto => 'tcp'); + + print $client_socket ? "ok 5\n" : "not ok 5\n"; + $test_no = 6; +} + +sub mux_connection +{ + my $package = shift; + my $mux = shift; + my $fh = shift; + + print "ok 6\n"; + $test_no++; + + syswrite($client_socket, $test_msg, length $test_msg); +} + +sub mux_input +{ + print "ok 7\n"; + shift; shift; shift; + my $input = shift; + + return unless $$input =~ /\n/; + + print $$input eq $test_msg ? "ok 8\n" : "not ok 8\n"; + + exit; +} diff --git a/t/110_test.t b/t/110_test.t new file mode 100644 index 0000000..1dd6ae0 --- /dev/null +++ b/t/110_test.t @@ -0,0 +1,82 @@ +# Before `make install' is performed this script should be runnable with +# `make test'. After `make install' it should work as `perl test.t' + +######################### We start with some black magic to print on failure. + +# Change 1..1 below to 1..last_test_to_print . +# (It may become useful if the test is moved to ./t subdirectory.) + +BEGIN { $| = 1; print "1..8\n"; } +END {print "not ok 1\n" unless $loaded;} +use IO::Socket; +use IO::Multiplex; +$loaded = 1; +print "ok 1\n"; + +######################### End of black magic. + +# Insert your test code below (better if it prints "ok 13" +# (correspondingly "not ok 13") depending on the success of chunk 13 +# of the test code): + +my $mux = new IO::Multiplex; + +print $mux ? "ok 2\n" : "not ok 2\n"; + +my $listen_socket = IO::Socket::INET->new(Proto => 'tcp', + Listen => 4); + +print $listen_socket ? "ok 3\n" : "not ok 3\n"; + +$port = $listen_socket->sockport; + +$test_no = 4; + +$SIG{ALRM} = sub { print "not ok $test_no\n"; exit }; + +alarm(20); + +$mux->listen($listen_socket); +$mux->set_callback_object(__PACKAGE__); +$mux->set_timeout($listen_socket, 5); +$mux->loop; + +my $client_socket; + +sub mux_timeout +{ + print "ok 4\n"; + + $test_no = 5; + $client_socket = IO::Socket::INET->new(PeerAddr => "127.0.0.1", + PeerPort => $port, + Proto => 'tcp'); + + print $client_socket ? "ok 5\n" : "not ok 5\n"; + $test_no = 6; +} + +sub mux_connection +{ + my $package = shift; + my $mux = shift; + my $fh = shift; + + print "ok 6\n"; + $test_no++; + + print $client_socket "Hello\n"; +} + +sub mux_input +{ + print "ok 7\n"; + shift; shift; shift; + my $input = shift; + + return unless $$input =~ /\n/; + + print $$input eq "Hello\n" ? "ok 8\n" : "not ok 8\n"; + + exit; +} diff --git a/t/200_udp.t b/t/200_udp.t new file mode 100644 index 0000000..b7b8fda --- /dev/null +++ b/t/200_udp.t @@ -0,0 +1,173 @@ +# Test using UDP with two IO::Multiplex +# servers communicating with each other. +# Assume no UDP packet loss on loopback. + +# This script tests the following: +# 1) Sending packets using a connected UDP socket. +# (connect() and send() syscalls) +# 2) Sending packets using unconnected UDP socket. +# (sendto() syscall) +# 3) Receiving UDP packets. +# (bind() and recv() syscalls) +# 4) The tied handle interface to send UDP data. +# print $fh $UDP_data; +# 5) The mux_input interface for incoming UDP data. +# (simple $$data scalar consumption) + +use strict; +use Test; +use IO::Socket; +use IO::Multiplex; +use POSIX qw(ENOTCONN EDESTADDRREQ); + +if($^O eq 'MSWin32') +{ no warnings; + *ENOTCONN = sub() {10057}; +} + +$| = 1; +plan tests => 15; + +# Create a recv()ing socket. +ok my $sock1 = new IO::Socket::INET + LocalAddr => "127.0.0.1", + Proto => "udp", + or die $!; + +my $magic_port = $sock1->sockport; + +# Create connect()ed socket for send()ing. +ok my $sock2 = new IO::Socket::INET + PeerAddr => "127.0.0.1", + PeerPort => $magic_port, + Proto => "udp", + or die $!; + +# Create a generic unconnected socket for sendto()ing. +ok my $sock3 = new IO::Socket::INET + Proto => "udp" + or die $!; + +my $msg1 = "uno"; +my $msg2 = "dos"; +my $msg3 = "tres"; +my $msg4 = "cuatro"; +my $msg5 = "cinco"; +my $msg6 = "seis"; + +my $pid = fork(); +# Catch runaway processes just in case... +alarm(10); +$SIG{ALRM} = sub { + die "[$$] Got bored"; +}; + +if (!defined $pid) { + ok 0; + die "fork: $!"; +} + +if ($pid) { + # Parent process + # This will be the Pitcher IO::Multiplex server. + my $plexer = new IO::Multiplex; + + $plexer->add($sock2); + $plexer->add($sock3); + $plexer->set_callback_object("Pitcher"); + # Set timer to do mux_timeout in 2 seconds + $plexer->set_timeout($sock2, 2); + $plexer->loop; + ok 1; + exit; +} else { + # Child process + # This will be the Catcher IO::Multiplex server. + # (No talking allowed.) + my $plexer = new IO::Multiplex; + + $plexer->add($sock1); + $plexer->set_callback_object("Catcher"); + + $plexer->loop; + exit; +} + +sub Pitcher::mux_timeout { + my $self = shift; + my $mux = shift; + my $fh = shift; + if (fileno $fh == fileno $sock2) { + ok 1; + # Connected UDP socket should know where to send it + print $fh $msg1; + ok !$!; + } elsif (fileno $fh == fileno $sock3) { + ok 1; + # Unconnected UDP socket should fail + # when trying to send() a packet. + $! = 0; + print $fh $msg2; + ok ($! == ENOTCONN || $! == EDESTADDRREQ) + or warn "DEBUG: bang = [$!](".($!+0).")"; + + # Grab the real peer destination. + ok my $saddr = $mux->{_fhs}{$sock2}{udp_peer}; + + # Unconnected UDP socket will sendto() just fine + # but only with an explicit destination. + ok send($fh, $msg3, 0, $saddr); + ok !$!; + } else { + die "$$: Not my fh?"; + } +} + +sub Pitcher::mux_input { + my $package = shift; + my $mux = shift; + my $fh = shift; + my $data = shift; + if (fileno $fh == fileno $sock2) { + ok ($$data eq $msg2); + $mux->set_timeout($sock3, 3); + } elsif (fileno $fh == fileno $sock3) { + if ($$data eq $msg4) { + ok 1; + # Even though this was the unconnected socket, + # it should remember where the last packer came from. + print $fh $msg5; + ok !$!; + } elsif ($$data eq $msg6) { + # Yippy, caught the final packet + ok 1; + # All done + $mux->endloop; + } else { + die "sock3 caught weird [$$data]"; + } + } else { + die "$$: Pitcher found something weird [$$data]"; + } + $$data = ""; +} + +# Just bounce it back with one up +sub Catcher::mux_input { + my $package = shift; + my $mux = shift; + my $fh = shift; + my $data = shift; + if ($$data eq $msg1) { + print $fh $msg2; + } elsif ($$data eq $msg3) { + print $fh $msg4; + } elsif ($$data eq $msg5) { + print $fh $msg6; + # I'm done. + $mux->endloop; + } else { + die "$$: Caught something weird [$$data]"; + } + $$data = ""; +}