|
Packit |
7ef13a |
package IO::Multiplex;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
use strict;
|
|
Packit |
7ef13a |
use warnings;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
our $VERSION = '1.16';
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head1 NAME
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
IO::Multiplex - Manage IO on many file handles
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head1 SYNOPSIS
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
use IO::Multiplex;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
my $mux = new IO::Multiplex;
|
|
Packit |
7ef13a |
$mux->add($fh1);
|
|
Packit |
7ef13a |
$mux->add(\*FH2);
|
|
Packit |
7ef13a |
$mux->set_callback_object(...);
|
|
Packit |
7ef13a |
$mux->listen($server_socket);
|
|
Packit |
7ef13a |
$mux->loop;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub mux_input { ... }
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
C<IO::Multiplex> is designed to take the effort out of managing
|
|
Packit |
7ef13a |
multiple file handles. It is essentially a really fancy front end to
|
|
Packit |
7ef13a |
the C<select> system call. In addition to maintaining the C<select>
|
|
Packit |
7ef13a |
loop, it buffers all input and output to/from the file handles. It
|
|
Packit |
7ef13a |
can also accept incoming connections on one or more listen sockets.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head1 DESCRIPTION
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
It is object oriented in design, and will notify you of significant events
|
|
Packit |
7ef13a |
by calling methods on an object that you supply. If you are not using
|
|
Packit |
7ef13a |
objects, you can simply supply C<__PACKAGE__> instead of an object reference.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
You may have one callback object registered for each file handle, or
|
|
Packit |
7ef13a |
one global one. Possibly both -- the per-file handle callback object
|
|
Packit |
7ef13a |
will be used instead of the global one.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Each file handle may also have a timer associated with it. A callback
|
|
Packit |
7ef13a |
function is called when the timer expires.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 Handling input on descriptors
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
When input arrives on a file handle, the C<mux_input> method is called
|
|
Packit |
7ef13a |
on the appropriate callback object. This method is passed three
|
|
Packit |
7ef13a |
arguments (in addition to the object reference itself of course):
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=over 4
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=item 1
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
a reference to the mux,
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=item 2
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
A reference to the file handle, and
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=item 3
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
a reference to the input buffer for the file handle.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=back
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
The method should remove the data that it has consumed from the
|
|
Packit |
7ef13a |
reference supplied. It may leave unconsumed data in the input buffer.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 Handling output to descriptors
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
If C<IO::Multiplex> did not handle output to the file handles as well
|
|
Packit |
7ef13a |
as input from them, then there is a chance that the program could
|
|
Packit |
7ef13a |
block while attempting to write. If you let the multiplexer buffer
|
|
Packit |
7ef13a |
the output, it will write the data only when the file handle is
|
|
Packit |
7ef13a |
capable of receiveing it.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
The basic method for handing output to the multiplexer is the C<write>
|
|
Packit |
7ef13a |
method, which simply takes a file descriptor and the data to be
|
|
Packit |
7ef13a |
written, like this:
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux->write($fh, "Some data");
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
For convenience, when the file handle is C<add>ed to the multiplexer, it
|
|
Packit |
7ef13a |
is tied to a special class which intercepts all attempts to write to the
|
|
Packit |
7ef13a |
file handle. Thus, you can use print and printf to send output to the
|
|
Packit |
7ef13a |
handle in a normal manner:
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
printf $fh "%s%d%X", $foo, $bar, $baz
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Unfortunately, Perl support for tied file handles is incomplete, and
|
|
Packit |
7ef13a |
functions such as C<send> cannot be supported.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Also, file handle object methods such as the C<send> method of
|
|
Packit |
7ef13a |
C<IO::Socket> cannot be intercepted.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head1 EXAMPLES
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 Simple Example
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
This is a simple telnet-like program, which demonstrates the concepts
|
|
Packit |
7ef13a |
covered so far. It does not really work too well against a telnet
|
|
Packit |
7ef13a |
server, but it does OK against the sample server presented further down.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
use IO::Socket;
|
|
Packit |
7ef13a |
use IO::Multiplex;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Create a multiplex object
|
|
Packit |
7ef13a |
my $mux = new IO::Multiplex;
|
|
Packit |
7ef13a |
# Connect to the host/port specified on the command line,
|
|
Packit |
7ef13a |
# or localhost:23
|
|
Packit |
7ef13a |
my $sock = new IO::Socket::INET(Proto => 'tcp',
|
|
Packit |
7ef13a |
PeerAddr => shift || 'localhost',
|
|
Packit |
7ef13a |
PeerPort => shift || 23)
|
|
Packit |
7ef13a |
or die "socket: $@";
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# add the relevant file handles to the mux
|
|
Packit |
7ef13a |
$mux->add($sock);
|
|
Packit |
7ef13a |
$mux->add(\*STDIN);
|
|
Packit |
7ef13a |
# We want to buffer output to the terminal. This prevents the program
|
|
Packit |
7ef13a |
# from blocking if the user hits CTRL-S for example.
|
|
Packit |
7ef13a |
$mux->add(\*STDOUT);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# We're not object oriented, so just request callbacks to the
|
|
Packit |
7ef13a |
# current package
|
|
Packit |
7ef13a |
$mux->set_callback_object(__PACKAGE__);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Enter the main mux loop.
|
|
Packit |
7ef13a |
$mux->loop;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# mux_input is called when input is available on one of
|
|
Packit |
7ef13a |
# the descriptors.
|
|
Packit |
7ef13a |
sub mux_input {
|
|
Packit |
7ef13a |
my $package = shift;
|
|
Packit |
7ef13a |
my $mux = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
my $input = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Figure out whence the input came, and send it on to the
|
|
Packit |
7ef13a |
# other place.
|
|
Packit |
7ef13a |
if ($fh == $sock) {
|
|
Packit |
7ef13a |
print STDOUT $$input;
|
|
Packit |
7ef13a |
} else {
|
|
Packit |
7ef13a |
print $sock $$input;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
# Remove the input from the input buffer.
|
|
Packit |
7ef13a |
$$input = '';
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# This gets called if the other end closes the connection.
|
|
Packit |
7ef13a |
sub mux_close {
|
|
Packit |
7ef13a |
print STDERR "Connection Closed\n";
|
|
Packit |
7ef13a |
exit;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 A server example
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Servers are just as simple to write. We just register a listen socket
|
|
Packit |
7ef13a |
with the multiplex object C<listen> method. It will automatically
|
|
Packit |
7ef13a |
accept connections on it and add them to its list of active file handles.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
This example is a simple chat server.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
use IO::Socket;
|
|
Packit |
7ef13a |
use IO::Multiplex;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
my $mux = new IO::Multiplex;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Create a listening socket
|
|
Packit |
7ef13a |
my $sock = new IO::Socket::INET(Proto => 'tcp',
|
|
Packit |
7ef13a |
LocalPort => shift || 2300,
|
|
Packit |
7ef13a |
Listen => 4)
|
|
Packit |
7ef13a |
or die "socket: $@";
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# We use the listen method instead of the add method.
|
|
Packit |
7ef13a |
$mux->listen($sock);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux->set_callback_object(__PACKAGE__);
|
|
Packit |
7ef13a |
$mux->loop;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub mux_input {
|
|
Packit |
7ef13a |
my $package = shift;
|
|
Packit |
7ef13a |
my $mux = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
my $input = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# The handles method returns a list of references to handles which
|
|
Packit |
7ef13a |
# we have registered, except for listen sockets.
|
|
Packit |
7ef13a |
foreach $c ($mux->handles) {
|
|
Packit |
7ef13a |
print $c $$input;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
$$input = '';
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 A more complex server example
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Let us take a look at the beginnings of a multi-user game server. We will
|
|
Packit |
7ef13a |
have a Player object for each player.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Paste the above example in here, up to but not including the
|
|
Packit |
7ef13a |
# mux_input subroutine.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# mux_connection is called when a new connection is accepted.
|
|
Packit |
7ef13a |
sub mux_connection {
|
|
Packit |
7ef13a |
my $package = shift;
|
|
Packit |
7ef13a |
my $mux = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Construct a new player object
|
|
Packit |
7ef13a |
Player->new($mux, $fh);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
package Player;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
my %players = ();
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub new {
|
|
Packit |
7ef13a |
my $package = shift;
|
|
Packit |
7ef13a |
my $self = bless { mux => shift,
|
|
Packit |
7ef13a |
fh => shift } => $package;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Register the new player object as the callback specifically for
|
|
Packit |
7ef13a |
# this file handle.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$self->{mux}->set_callback_object($self, $self->{fh});
|
|
Packit |
7ef13a |
print $self->{fh}
|
|
Packit |
7ef13a |
"Greetings, Professor. Would you like to play a game?\n";
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Register this player object in the main list of players
|
|
Packit |
7ef13a |
$players{$self} = $self;
|
|
Packit |
7ef13a |
$mux->set_timeout($self->{fh}, 1);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub players { return values %players; }
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub mux_input {
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
shift; shift; # These two args are boring
|
|
Packit |
7ef13a |
my $input = shift; # Scalar reference to the input
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Process each line in the input, leaving partial lines
|
|
Packit |
7ef13a |
# in the input buffer
|
|
Packit |
7ef13a |
while ($$input =~ s/^(.*?)\n//) {
|
|
Packit |
7ef13a |
$self->process_command($1);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub mux_close {
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Player disconnected;
|
|
Packit |
7ef13a |
# [Notify other players or something...]
|
|
Packit |
7ef13a |
delete $players{$self};
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
# This gets called every second to update player info, etc...
|
|
Packit |
7ef13a |
sub mux_timeout {
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $mux = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$self->heartbeat;
|
|
Packit |
7ef13a |
$mux->set_timeout($self->{fh}, 1);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head1 METHODS
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
use POSIX qw(errno_h BUFSIZ);
|
|
Packit |
7ef13a |
use Socket;
|
|
Packit |
7ef13a |
use FileHandle qw(autoflush);
|
|
Packit |
7ef13a |
use IO::Handle;
|
|
Packit |
7ef13a |
use Fcntl;
|
|
Packit |
7ef13a |
use Carp qw(carp);
|
|
Packit |
7ef13a |
use constant IsWin => ($^O eq 'MSWin32');
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
BEGIN {
|
|
Packit |
7ef13a |
eval {
|
|
Packit |
7ef13a |
# Can optionally use Hi Res timers if available
|
|
Packit |
7ef13a |
require Time::HiRes;
|
|
Packit |
7ef13a |
Time::HiRes->import('time');
|
|
Packit |
7ef13a |
};
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# This is what you want. Trust me.
|
|
Packit |
7ef13a |
$SIG{PIPE} = 'IGNORE';
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
{ no warnings;
|
|
Packit |
7ef13a |
if(IsWin) { *EWOULDBLOCK = sub() {10035} }
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 new
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Construct a new C<IO::Multiplex> object.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux = new IO::Multiplex;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub new
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $package = shift;
|
|
Packit |
7ef13a |
my $self = bless { _readers => '',
|
|
Packit |
7ef13a |
_writers => '',
|
|
Packit |
7ef13a |
_fhs => {},
|
|
Packit |
7ef13a |
_handles => {},
|
|
Packit |
7ef13a |
_timerkeys => {},
|
|
Packit |
7ef13a |
_timers => [],
|
|
Packit |
7ef13a |
_listen => {} } => $package;
|
|
Packit |
7ef13a |
return $self;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 listen
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Add a socket to be listened on. The socket should have had the
|
|
Packit |
7ef13a |
C<bind> and C<listen> system calls already applied to it. The C<IO::Socket>
|
|
Packit |
7ef13a |
module will do this for you.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$socket = new IO::Socket::INET(Listen => ..., LocalAddr => ...);
|
|
Packit |
7ef13a |
$mux->listen($socket);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Connections will be automatically accepted and C<add>ed to the multiplex
|
|
Packit |
7ef13a |
object. C<The mux_connection> callback method will also be called.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub listen
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$self->add($fh);
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{listen} = 1;
|
|
Packit |
7ef13a |
$fh;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 add
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Add a file handle to the multiplexer.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux->add($fh);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
As a side effect, this sets non-blocking mode on the handle, and disables
|
|
Packit |
7ef13a |
STDIO buffering. It also ties it to intercept output to the handle.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub add
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
return if $self->{_fhs}{"$fh"};
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
nonblock($fh);
|
|
Packit |
7ef13a |
autoflush($fh, 1);
|
|
Packit |
7ef13a |
fd_set($self->{_readers}, $fh, 1);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
my $sockopt = getsockopt $fh, SOL_SOCKET, SO_TYPE;
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{udp_true} = 1
|
|
Packit |
7ef13a |
if defined $sockopt && SOCK_DGRAM == unpack "i", $sockopt;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{inbuffer} = '';
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{outbuffer} = '';
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{fileno} = fileno($fh);
|
|
Packit |
7ef13a |
$self->{_handles}{"$fh"} = $fh;
|
|
Packit |
7ef13a |
tie *$fh, "IO::Multiplex::Handle", $self, $fh;
|
|
Packit |
7ef13a |
return $fh;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 remove
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Removes a file handle from the multiplexer. This also unties the
|
|
Packit |
7ef13a |
handle. It does not currently turn STDIO buffering back on, or turn
|
|
Packit |
7ef13a |
off non-blocking mode.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux->remove($fh);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub remove
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
fd_set($self->{_writers}, $fh, 0);
|
|
Packit |
7ef13a |
fd_set($self->{_readers}, $fh, 0);
|
|
Packit |
7ef13a |
delete $self->{_fhs}{"$fh"};
|
|
Packit |
7ef13a |
delete $self->{_handles}{"$fh"};
|
|
Packit |
7ef13a |
$self->_removeTimer($fh);
|
|
Packit |
7ef13a |
untie *$fh;
|
|
Packit |
7ef13a |
return 1;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 set_callback_object
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Set the object on which callbacks are made. If you are not using objects,
|
|
Packit |
7ef13a |
you can specify the name of the package into which the method calls are
|
|
Packit |
7ef13a |
to be made.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
If a file handle is supplied, the callback object is specific for that
|
|
Packit |
7ef13a |
handle:
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux->set_callback_object($object, $fh);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Otherwise, it is considered a default callback object, and is used when
|
|
Packit |
7ef13a |
events occur on a file handle that does not have its own callback object.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux->set_callback_object(__PACKAGE__);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
The previously registered object (if any) is returned.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
See also the CALLBACK INTERFACE section.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub set_callback_object
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $obj = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
return if $fh && !exists($self->{_fhs}{"$fh"});
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
my $old = $fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object};
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object} = $obj;
|
|
Packit |
7ef13a |
return $old;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 kill_output
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Remove any pending output on a file descriptor.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux->kill_output($fh);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub kill_output
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
return unless $fh && exists($self->{_fhs}{"$fh"});
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{outbuffer} = '';
|
|
Packit |
7ef13a |
fd_set($self->{_writers}, $fh, 0);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 outbuffer
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Return or set the output buffer for a descriptor
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$output = $mux->outbuffer($fh);
|
|
Packit |
7ef13a |
$mux->outbuffer($fh, $output);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub outbuffer
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
return unless $fh && exists($self->{_fhs}{"$fh"});
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
if (@_) {
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{outbuffer} = $_[0] if @_;
|
|
Packit |
7ef13a |
fd_set($self->{_writers}, $fh, 0) if !$_[0];
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{outbuffer};
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 inbuffer
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Return or set the input buffer for a descriptor
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$input = $mux->inbuffer($fh);
|
|
Packit |
7ef13a |
$mux->inbuffer($fh, $input);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub inbuffer
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
return unless $fh && exists($self->{_fhs}{"$fh"});
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
if (@_) {
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{inbuffer} = $_[0] if @_;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
return $self->{_fhs}{"$fh"}{inbuffer};
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 set_timeout
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Set the timer for a file handle. The timeout value is a certain number of
|
|
Packit |
7ef13a |
seconds in the future, after which the C<mux_timeout> callback is called.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
If the C<Time::HiRes> module is installed, the timers may be specified in
|
|
Packit |
7ef13a |
fractions of a second.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Timers are not reset automatically.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux->set_timeout($fh, 23.6);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Use C<$mux-E<gt>set_timeout($fh, undef)> to cancel a timer.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub set_timeout
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
my $timeout = shift;
|
|
Packit |
7ef13a |
return unless $fh && exists($self->{_fhs}{"$fh"});
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
if (defined $timeout) {
|
|
Packit |
7ef13a |
$self->_addTimer($fh, $timeout + time);
|
|
Packit |
7ef13a |
} else {
|
|
Packit |
7ef13a |
$self->_removeTimer($fh);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 handles
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Returns a list of handles that the C<IO::Multiplex> object knows about,
|
|
Packit |
7ef13a |
excluding listen sockets.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
@handles = $mux->handles;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub handles
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
return grep(!$self->{_fhs}{"$_"}{listen}, values %{$self->{_handles}});
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub _addTimer {
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
my $time = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Set a key so that we can quickly tell if a given $fh has
|
|
Packit |
7ef13a |
# a timer set
|
|
Packit |
7ef13a |
$self->{_timerkeys}{"$fh"} = 1;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Store the timeout in an array, and resort it
|
|
Packit |
7ef13a |
@{$self->{_timers}} = sort { $a->[1] <=> $b->[1] } (@{$self->{_timers}}, [ $fh, $time ] );
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub _removeTimer {
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Return quickly if no timer is set
|
|
Packit |
7ef13a |
return unless exists $self->{_timerkeys}{"$fh"};
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Remove the timeout from the sorted array
|
|
Packit |
7ef13a |
@{$self->{_timers}} = grep { $_->[0] ne $fh } @{$self->{_timers}};
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Get rid of the key
|
|
Packit |
7ef13a |
delete $self->{_timerkeys}{"$fh"};
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 loop
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Enter the main loop and start processing IO events.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux->loop;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub loop
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $heartbeat = shift;
|
|
Packit |
7ef13a |
$self->{_endloop} = 0;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
while (!$self->{_endloop} && keys %{$self->{_fhs}}) {
|
|
Packit |
7ef13a |
my $rv;
|
|
Packit |
7ef13a |
my $data;
|
|
Packit |
7ef13a |
my $rdready = "";
|
|
Packit |
7ef13a |
my $wrready = "";
|
|
Packit |
7ef13a |
my $timeout = undef;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
foreach my $fh (values %{$self->{_handles}}) {
|
|
Packit |
7ef13a |
fd_set($rdready, $fh, 1) if
|
|
Packit |
7ef13a |
ref($fh) =~ /SSL/ &&
|
|
Packit |
7ef13a |
$fh->can("pending") &&
|
|
Packit |
7ef13a |
$fh->pending;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
if (!length $rdready) {
|
|
Packit |
7ef13a |
if (@{$self->{_timers}}) {
|
|
Packit |
7ef13a |
$timeout = $self->{_timers}[0][1] - time;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
my $numready = select($rdready=$self->{_readers},
|
|
Packit |
7ef13a |
$wrready=$self->{_writers},
|
|
Packit |
7ef13a |
undef,
|
|
Packit |
7ef13a |
$timeout);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
unless(defined($numready)) {
|
|
Packit |
7ef13a |
if ($! == EINTR || $! == EAGAIN) {
|
|
Packit |
7ef13a |
next;
|
|
Packit |
7ef13a |
} else {
|
|
Packit |
7ef13a |
last;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
&{ $heartbeat } ($rdready, $wrready) if $heartbeat;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
foreach my $k (keys %{$self->{_handles}}) {
|
|
Packit |
7ef13a |
my $fh = $self->{_handles}->{$k} or next;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Avoid creating a permanent empty hash ref for "$fh"
|
|
Packit |
7ef13a |
# by attempting to access its {object} element
|
|
Packit |
7ef13a |
# if it has already been closed.
|
|
Packit |
7ef13a |
next unless exists $self->{_fhs}{"$fh"};
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# It is not easy to replace $self->{_fhs}{"$fh"} with a
|
|
Packit |
7ef13a |
# variable, because some mux_* routines may remove it as
|
|
Packit |
7ef13a |
# side-effect.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Get the callback object.
|
|
Packit |
7ef13a |
my $obj = $self->{_fhs}{"$fh"}{object} ||
|
|
Packit |
7ef13a |
$self->{_object};
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Is this descriptor ready for reading?
|
|
Packit |
7ef13a |
if (fd_isset($rdready, $fh))
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
if ($self->{_fhs}{"$fh"}{listen}) {
|
|
Packit |
7ef13a |
# It's a server socket, so a new connection is
|
|
Packit |
7ef13a |
# waiting to be accepted
|
|
Packit |
7ef13a |
my $client = $fh->accept;
|
|
Packit |
7ef13a |
next unless ($client);
|
|
Packit |
7ef13a |
$self->add($client);
|
|
Packit |
7ef13a |
$obj->mux_connection($self, $client)
|
|
Packit |
7ef13a |
if $obj && $obj->can("mux_connection");
|
|
Packit |
7ef13a |
} else {
|
|
Packit |
7ef13a |
if ($self->is_udp($fh)) {
|
|
Packit |
7ef13a |
$rv = recv($fh, $data, BUFSIZ, 0);
|
|
Packit |
7ef13a |
if (defined $rv) {
|
|
Packit |
7ef13a |
# Remember where the last UDP packet came from
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{udp_peer} = $rv;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
} else {
|
|
Packit |
7ef13a |
$rv = &POSIX::read(fileno($fh), $data, BUFSIZ);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
if (defined($rv) && length($data)) {
|
|
Packit |
7ef13a |
# Append the data to the client's receive buffer,
|
|
Packit |
7ef13a |
# and call process_input to see if anything needs to
|
|
Packit |
7ef13a |
# be done.
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{inbuffer} .= $data;
|
|
Packit |
7ef13a |
$obj->mux_input($self, $fh,
|
|
Packit |
7ef13a |
\$self->{_fhs}{"$fh"}{inbuffer})
|
|
Packit |
7ef13a |
if $obj && $obj->can("mux_input");
|
|
Packit |
7ef13a |
} else {
|
|
Packit |
7ef13a |
unless (defined $rv) {
|
|
Packit |
7ef13a |
next if
|
|
Packit |
7ef13a |
$! == EINTR ||
|
|
Packit |
7ef13a |
$! == EAGAIN ||
|
|
Packit |
7ef13a |
$! == EWOULDBLOCK;
|
|
Packit |
7ef13a |
warn "IO::Multiplex read error: $!"
|
|
Packit |
7ef13a |
if $! != ECONNRESET;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
# There's an error, or we received EOF. If
|
|
Packit |
7ef13a |
# there's pending data to be written, we leave
|
|
Packit |
7ef13a |
# the connection open so it can be sent. If
|
|
Packit |
7ef13a |
# the other end is closed for writing, the
|
|
Packit |
7ef13a |
# send will error and we close down there.
|
|
Packit |
7ef13a |
# Either way, we remove it from _readers as
|
|
Packit |
7ef13a |
# we're no longer interested in reading from
|
|
Packit |
7ef13a |
# it.
|
|
Packit |
7ef13a |
fd_set($self->{_readers}, $fh, 0);
|
|
Packit |
7ef13a |
$obj->mux_eof($self, $fh,
|
|
Packit |
7ef13a |
\$self->{_fhs}{"$fh"}{inbuffer})
|
|
Packit |
7ef13a |
if $obj && $obj->can("mux_eof");
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
if (exists $self->{_fhs}{"$fh"}) {
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{inbuffer} = '';
|
|
Packit |
7ef13a |
# The mux_eof handler could have responded
|
|
Packit |
7ef13a |
# with a shutdown for writing.
|
|
Packit |
7ef13a |
$self->close($fh)
|
|
Packit |
7ef13a |
unless exists $self->{_fhs}{"$fh"}
|
|
Packit |
7ef13a |
&& length $self->{_fhs}{"$fh"}{outbuffer};
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
next;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
} # end if readable
|
|
Packit |
7ef13a |
next unless exists $self->{_fhs}{"$fh"};
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
if (fd_isset($wrready, $fh)) {
|
|
Packit |
7ef13a |
unless (length $self->{_fhs}{"$fh"}{outbuffer}) {
|
|
Packit |
7ef13a |
fd_set($self->{_writers}, $fh, 0);
|
|
Packit |
7ef13a |
$obj->mux_outbuffer_empty($self, $fh)
|
|
Packit |
7ef13a |
if ($obj && $obj->can("mux_outbuffer_empty"));
|
|
Packit |
7ef13a |
next;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
$rv = &POSIX::write(fileno($fh),
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{outbuffer},
|
|
Packit |
7ef13a |
length($self->{_fhs}{"$fh"}{outbuffer}));
|
|
Packit |
7ef13a |
unless (defined($rv)) {
|
|
Packit |
7ef13a |
# We got an error writing to it. If it's
|
|
Packit |
7ef13a |
# EWOULDBLOCK (shouldn't happen if select told us
|
|
Packit |
7ef13a |
# we can write) or EAGAIN, or EINTR we don't worry
|
|
Packit |
7ef13a |
# about it. otherwise, close it down.
|
|
Packit |
7ef13a |
unless ($! == EWOULDBLOCK ||
|
|
Packit |
7ef13a |
$! == EINTR ||
|
|
Packit |
7ef13a |
$! == EAGAIN) {
|
|
Packit |
7ef13a |
if ($! == EPIPE) {
|
|
Packit |
7ef13a |
$obj->mux_epipe($self, $fh)
|
|
Packit |
7ef13a |
if $obj && $obj->can("mux_epipe");
|
|
Packit |
7ef13a |
} else {
|
|
Packit |
7ef13a |
warn "IO::Multiplex: write error: $!\n";
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
$self->close($fh);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
next;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
substr($self->{_fhs}{"$fh"}{outbuffer}, 0, $rv) = '';
|
|
Packit |
7ef13a |
unless (length $self->{_fhs}{"$fh"}{outbuffer}) {
|
|
Packit |
7ef13a |
# Mark us as not writable if there's nothing more to
|
|
Packit |
7ef13a |
# write
|
|
Packit |
7ef13a |
fd_set($self->{_writers}, $fh, 0);
|
|
Packit |
7ef13a |
$obj->mux_outbuffer_empty($self, $fh)
|
|
Packit |
7ef13a |
if ($obj && $obj->can("mux_outbuffer_empty"));
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
if ( $self->{_fhs}{"$fh"}
|
|
Packit |
7ef13a |
&& $self->{_fhs}{"$fh"}{shutdown}) {
|
|
Packit |
7ef13a |
# If we've been marked for shutdown after write
|
|
Packit |
7ef13a |
# do it.
|
|
Packit |
7ef13a |
shutdown($fh, 1);
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{outbuffer} = '';
|
|
Packit |
7ef13a |
unless (length $self->{_fhs}{"$fh"}{inbuffer}) {
|
|
Packit |
7ef13a |
# We'd previously been shutdown for reading
|
|
Packit |
7ef13a |
# also, so close out completely
|
|
Packit |
7ef13a |
$self->close($fh);
|
|
Packit |
7ef13a |
next;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
} # End if writeable
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
next unless exists $self->{_fhs}{"$fh"};
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
} # End foreach $fh (...)
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$self->_checkTimeouts() if @{$self->{_timers}};
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
} # End while(loop)
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub _checkTimeouts {
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Get the current time
|
|
Packit |
7ef13a |
my $time = time;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Copy all of the timers that should go off into
|
|
Packit |
7ef13a |
# a temporary array. This allows us to modify the
|
|
Packit |
7ef13a |
# real array as we process the timers, without
|
|
Packit |
7ef13a |
# interfering with the loop.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
my @timers = ();
|
|
Packit |
7ef13a |
foreach my $timer (@{$self->{_timers}}) {
|
|
Packit |
7ef13a |
# If the timer is in the future, we can stop
|
|
Packit |
7ef13a |
last if $timer->[1] > $time;
|
|
Packit |
7ef13a |
push @timers, $timer;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
foreach my $timer (@timers) {
|
|
Packit |
7ef13a |
my $fh = $timer->[0];
|
|
Packit |
7ef13a |
$self->_removeTimer($fh);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
next unless exists $self->{_fhs}{"$fh"};
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object};
|
|
Packit |
7ef13a |
$obj->mux_timeout($self, $fh) if $obj && $obj->can("mux_timeout");
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 endloop
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Prematurly terminate the loop. The loop will automatically terminate
|
|
Packit |
7ef13a |
when there are no remaining descriptors to be watched.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux->endloop;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub endloop
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
$self->{_endloop} = 1;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 udp_peer
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Get peer endpoint of where the last udp packet originated.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$saddr = $mux->udp_peer($fh);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub udp_peer {
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
return $self->{_fhs}{"$fh"}{udp_peer};
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 is_udp
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Sometimes UDP packets require special attention.
|
|
Packit |
7ef13a |
This method will tell if a file handle is of type UDP.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$is_udp = $mux->is_udp($fh);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub is_udp {
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
return $self->{_fhs}{"$fh"}{udp_true};
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 write
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Send output to a file handle.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux->write($fh, "'ere I am, JH!\n");
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub write
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
my $data = shift;
|
|
Packit |
7ef13a |
return unless $fh && exists($self->{_fhs}{"$fh"});
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
if ($self->{_fhs}{"$fh"}{shutdown}) {
|
|
Packit |
7ef13a |
$! = EPIPE;
|
|
Packit |
7ef13a |
return undef;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
if ($self->is_udp($fh)) {
|
|
Packit |
7ef13a |
if (my $udp_peer = $self->udp_peer($fh)) {
|
|
Packit |
7ef13a |
# Send the packet back to the last peer that said something
|
|
Packit |
7ef13a |
return send($fh, $data, 0, $udp_peer);
|
|
Packit |
7ef13a |
} else {
|
|
Packit |
7ef13a |
# No udp_peer yet?
|
|
Packit |
7ef13a |
# This better be a connect()ed UDP socket
|
|
Packit |
7ef13a |
# or else this will fail with ENOTCONN
|
|
Packit |
7ef13a |
return send($fh, $data, 0);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{outbuffer} .= $data;
|
|
Packit |
7ef13a |
fd_set($self->{_writers}, $fh, 1);
|
|
Packit |
7ef13a |
return length($data);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 shutdown
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Shut down a socket for reading or writing or both. See the C<shutdown>
|
|
Packit |
7ef13a |
Perl documentation for further details.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
If the shutdown is for reading, it happens immediately. However,
|
|
Packit |
7ef13a |
shutdowns for writing are delayed until any pending output has been
|
|
Packit |
7ef13a |
successfully written to the socket.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux->shutdown($socket, 1);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub shutdown
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
my $which = shift;
|
|
Packit |
7ef13a |
return unless $fh && exists($self->{_fhs}{"$fh"});
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
if ($which == 0 || $which == 2) {
|
|
Packit |
7ef13a |
# Shutdown for reading. We can do this now.
|
|
Packit |
7ef13a |
shutdown($fh, 0);
|
|
Packit |
7ef13a |
# The mux_eof hook must be run from the main loop to consume
|
|
Packit |
7ef13a |
# the rest of the inbuffer if there is anything left.
|
|
Packit |
7ef13a |
# It will also remove $fh from _readers.
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
if ($which == 1 || $which == 2) {
|
|
Packit |
7ef13a |
# Shutdown for writing. Only do this now if there is no pending
|
|
Packit |
7ef13a |
# data.
|
|
Packit |
7ef13a |
if(length $self->{_fhs}{"$fh"}{outbuffer}) {
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{shutdown} = 1;
|
|
Packit |
7ef13a |
} else {
|
|
Packit |
7ef13a |
shutdown($fh, 1);
|
|
Packit |
7ef13a |
$self->{_fhs}{"$fh"}{outbuffer} = '';
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
# Delete the descriptor if it's totally gone.
|
|
Packit |
7ef13a |
unless (length $self->{_fhs}{"$fh"}{inbuffer} ||
|
|
Packit |
7ef13a |
length $self->{_fhs}{"$fh"}{outbuffer}) {
|
|
Packit |
7ef13a |
$self->close($fh);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 close
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Close a handle. Always use this method to close a handle that is being
|
|
Packit |
7ef13a |
watched by the multiplexer.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$mux->close($fh);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub close
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
return unless exists $self->{_fhs}{"$fh"};
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object};
|
|
Packit |
7ef13a |
warn "closing with read buffer" if length $self->{_fhs}{"$fh"}{inbuffer};
|
|
Packit |
7ef13a |
warn "closing with write buffer" if length $self->{_fhs}{"$fh"}{outbuffer};
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
fd_set($self->{_readers}, $fh, 0);
|
|
Packit |
7ef13a |
fd_set($self->{_writers}, $fh, 0);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
delete $self->{_fhs}{"$fh"};
|
|
Packit |
7ef13a |
delete $self->{_handles}{"$fh"};
|
|
Packit |
7ef13a |
untie *$fh;
|
|
Packit |
7ef13a |
close $fh;
|
|
Packit |
7ef13a |
$obj->mux_close($self, $fh) if $obj && $obj->can("mux_close");
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# We set non-blocking mode on all descriptors. If we don't, then send
|
|
Packit |
7ef13a |
# might block if the data is larger than the kernel can accept all at once,
|
|
Packit |
7ef13a |
# even though select told us we can write. With non-blocking mode, we
|
|
Packit |
7ef13a |
# get a partial write in those circumstances, which is what we want.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub nonblock
|
|
Packit |
7ef13a |
{ my $fh = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
if(IsWin)
|
|
Packit |
7ef13a |
{ ioctl($fh, 0x8004667e, pack("L!", 1));
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
else
|
|
Packit |
7ef13a |
{ my $flags = fcntl($fh, F_GETFL, 0)
|
|
Packit |
7ef13a |
or die "fcntl F_GETFL: $!\n";
|
|
Packit |
7ef13a |
fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
|
|
Packit |
7ef13a |
or die "fcntl F_SETFL $!\n";
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub fd_set
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
vec($_[0], fileno($_[1]), 1) = $_[2];
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub fd_isset
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
return vec($_[0], fileno($_[1]), 1);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# We tie handles into this package to handle write buffering.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
package IO::Multiplex::Handle;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
use strict;
|
|
Packit |
7ef13a |
use Tie::Handle;
|
|
Packit |
7ef13a |
use Carp;
|
|
Packit |
7ef13a |
use vars qw(@ISA);
|
|
Packit |
7ef13a |
@ISA = qw(Tie::Handle);
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub FILENO
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
return ($self->{_mux}->{_fhs}->{"$self->{_fh}"}->{fileno});
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub TIEHANDLE
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $package = shift;
|
|
Packit |
7ef13a |
my $mux = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
my $self = bless { _mux => $mux,
|
|
Packit |
7ef13a |
_fh => $fh } => $package;
|
|
Packit |
7ef13a |
return $self;
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub WRITE
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my ($msg, $len, $offset) = @_;
|
|
Packit |
7ef13a |
$offset ||= 0;
|
|
Packit |
7ef13a |
return $self->{_mux}->write($self->{_fh}, substr($msg, $offset, $len));
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub CLOSE
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
return $self->{_mux}->shutdown($self->{_fh}, 2);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub READ
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
carp "Do not read from a muxed file handle";
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub READLINE
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
carp "Do not read from a muxed file handle";
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub FETCH
|
|
Packit |
7ef13a |
{
|
|
Packit |
7ef13a |
return "Fnord";
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub UNTIE {}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
1;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
__END__
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head1 CALLBACK INTERFACE
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Callback objects should support the following interface. You do not have
|
|
Packit |
7ef13a |
to provide all of these methods, just provide the ones you are interested in.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
All methods receive a reference to the callback object (or package) as
|
|
Packit |
7ef13a |
their first argument, in the traditional object oriented
|
|
Packit |
7ef13a |
way. References to the C<IO::Multiplex> object and the relevant file
|
|
Packit |
7ef13a |
handle are also provided. This will be assumed in the method
|
|
Packit |
7ef13a |
descriptions.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 mux_input
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Called when input is ready on a descriptor. It is passed a reference to
|
|
Packit |
7ef13a |
the input buffer. It should remove any input that it has consumed, and
|
|
Packit |
7ef13a |
leave any partially received data in the buffer.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub mux_input {
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $mux = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
my $data = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
# Process each line in the input, leaving partial lines
|
|
Packit |
7ef13a |
# in the input buffer
|
|
Packit |
7ef13a |
while ($$data =~ s/^(.*?\n)//) {
|
|
Packit |
7ef13a |
$self->process_command($1);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 mux_eof
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
This is called when an end-of-file condition is present on the descriptor.
|
|
Packit |
7ef13a |
This is does not nessecarily mean that the descriptor has been closed, as
|
|
Packit |
7ef13a |
the other end of a socket could have used C<shutdown> to close just half
|
|
Packit |
7ef13a |
of the socket, leaving us free to write data back down the still open
|
|
Packit |
7ef13a |
half. Like mux_input, it is also passed a reference to the input buffer.
|
|
Packit |
7ef13a |
It should consume the entire buffer or else it will just be lost.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
In this example, we send a final reply to the other end of the socket,
|
|
Packit |
7ef13a |
and then shut it down for writing. Since it is also shut down for reading
|
|
Packit |
7ef13a |
(implicly by the EOF condition), it will be closed once the output has
|
|
Packit |
7ef13a |
been sent, after which the mux_close callback will be called.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
sub mux_eof {
|
|
Packit |
7ef13a |
my $self = shift;
|
|
Packit |
7ef13a |
my $mux = shift;
|
|
Packit |
7ef13a |
my $fh = shift;
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
print $fh "Well, goodbye then!\n";
|
|
Packit |
7ef13a |
$mux->shutdown($fh, 1);
|
|
Packit |
7ef13a |
}
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 mux_close
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Called when a handle has been completely closed. At the time that
|
|
Packit |
7ef13a |
C<mux_close> is called, the handle will have been removed from the
|
|
Packit |
7ef13a |
multiplexer, and untied.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 mux_outbuffer_empty
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Called after all pending output has been written to the file descriptor.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 mux_connection
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Called upon a new connection being accepted on a listen socket.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head2 mux_timeout
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Called when a timer expires.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=head1 AUTHOR
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Copyright 1999 Bruce J Keeler <bruce@gridpoint.com>
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Copyright 2001-2008 Rob Brown <bbb@cpan.org>
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
Released under the same terms as Perl itself.
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
$Id: Multiplex.pm,v 1.45 2015/04/09 21:27:54 rob Exp $
|
|
Packit |
7ef13a |
|
|
Packit |
7ef13a |
=cut
|