Blame lib/IO/Multiplex.pm

Packit 7ef13a
package IO::Multiplex;
Packit 7ef13a
Packit 7ef13a
use strict;
Packit 7ef13a
use warnings;
Packit 7ef13a
Packit 7ef13a
our $VERSION = '1.16';
Packit 7ef13a
Packit 7ef13a
=head1 NAME
Packit 7ef13a
Packit 7ef13a
IO::Multiplex - Manage IO on many file handles
Packit 7ef13a
Packit 7ef13a
=head1 SYNOPSIS
Packit 7ef13a
Packit 7ef13a
  use IO::Multiplex;
Packit 7ef13a
Packit 7ef13a
  my $mux = new IO::Multiplex;
Packit 7ef13a
  $mux->add($fh1);
Packit 7ef13a
  $mux->add(\*FH2);
Packit 7ef13a
  $mux->set_callback_object(...);
Packit 7ef13a
  $mux->listen($server_socket);
Packit 7ef13a
  $mux->loop;
Packit 7ef13a
Packit 7ef13a
  sub mux_input { ... }
Packit 7ef13a
Packit 7ef13a
C<IO::Multiplex> is designed to take the effort out of managing
Packit 7ef13a
multiple file handles. It is essentially a really fancy front end to
Packit 7ef13a
the C<select> system call. In addition to maintaining the C<select>
Packit 7ef13a
loop, it buffers all input and output to/from the file handles.  It
Packit 7ef13a
can also accept incoming connections on one or more listen sockets.
Packit 7ef13a
Packit 7ef13a
=head1 DESCRIPTION
Packit 7ef13a
Packit 7ef13a
It is object oriented in design, and will notify you of significant events
Packit 7ef13a
by calling methods on an object that you supply.  If you are not using
Packit 7ef13a
objects, you can simply supply C<__PACKAGE__> instead of an object reference.
Packit 7ef13a
Packit 7ef13a
You may have one callback object registered for each file handle, or
Packit 7ef13a
one global one.  Possibly both -- the per-file handle callback object
Packit 7ef13a
will be used instead of the global one.
Packit 7ef13a
Packit 7ef13a
Each file handle may also have a timer associated with it.  A callback
Packit 7ef13a
function is called when the timer expires.
Packit 7ef13a
Packit 7ef13a
=head2 Handling input on descriptors
Packit 7ef13a
Packit 7ef13a
When input arrives on a file handle, the C<mux_input> method is called
Packit 7ef13a
on the appropriate callback object.  This method is passed three
Packit 7ef13a
arguments (in addition to the object reference itself of course):
Packit 7ef13a
Packit 7ef13a
=over 4
Packit 7ef13a
Packit 7ef13a
=item 1
Packit 7ef13a
Packit 7ef13a
a reference to the mux,
Packit 7ef13a
Packit 7ef13a
=item 2
Packit 7ef13a
Packit 7ef13a
A reference to the file handle, and
Packit 7ef13a
Packit 7ef13a
=item 3
Packit 7ef13a
Packit 7ef13a
a reference to the input buffer for the file handle.
Packit 7ef13a
Packit 7ef13a
=back
Packit 7ef13a
Packit 7ef13a
The method should remove the data that it has consumed from the
Packit 7ef13a
reference supplied.  It may leave unconsumed data in the input buffer.
Packit 7ef13a
Packit 7ef13a
=head2 Handling output to descriptors
Packit 7ef13a
Packit 7ef13a
If C<IO::Multiplex> did not handle output to the file handles as well
Packit 7ef13a
as input from them, then there is a chance that the program could
Packit 7ef13a
block while attempting to write.  If you let the multiplexer buffer
Packit 7ef13a
the output, it will write the data only when the file handle is
Packit 7ef13a
capable of receiveing it.
Packit 7ef13a
Packit 7ef13a
The basic method for handing output to the multiplexer is the C<write>
Packit 7ef13a
method, which simply takes a file descriptor and the data to be
Packit 7ef13a
written, like this:
Packit 7ef13a
Packit 7ef13a
    $mux->write($fh, "Some data");
Packit 7ef13a
Packit 7ef13a
For convenience, when the file handle is C<add>ed to the multiplexer, it
Packit 7ef13a
is tied to a special class which intercepts all attempts to write to the
Packit 7ef13a
file handle.  Thus, you can use print and printf to send output to the
Packit 7ef13a
handle in a normal manner:
Packit 7ef13a
Packit 7ef13a
    printf $fh "%s%d%X", $foo, $bar, $baz
Packit 7ef13a
Packit 7ef13a
Unfortunately, Perl support for tied file handles is incomplete, and
Packit 7ef13a
functions such as C<send> cannot be supported.
Packit 7ef13a
Packit 7ef13a
Also, file handle object methods such as the C<send> method of
Packit 7ef13a
C<IO::Socket> cannot be intercepted.
Packit 7ef13a
Packit 7ef13a
=head1 EXAMPLES
Packit 7ef13a
Packit 7ef13a
=head2 Simple Example
Packit 7ef13a
Packit 7ef13a
This is a simple telnet-like program, which demonstrates the concepts
Packit 7ef13a
covered so far.  It does not really work too well against a telnet
Packit 7ef13a
server, but it does OK against the sample server presented further down.
Packit 7ef13a
Packit 7ef13a
    use IO::Socket;
Packit 7ef13a
    use IO::Multiplex;
Packit 7ef13a
Packit 7ef13a
    # Create a multiplex object
Packit 7ef13a
    my $mux  = new IO::Multiplex;
Packit 7ef13a
    # Connect to the host/port specified on the command line,
Packit 7ef13a
    # or localhost:23
Packit 7ef13a
    my $sock = new IO::Socket::INET(Proto    => 'tcp',
Packit 7ef13a
                                    PeerAddr => shift || 'localhost',
Packit 7ef13a
                                    PeerPort => shift || 23)
Packit 7ef13a
        or die "socket: $@";
Packit 7ef13a
Packit 7ef13a
    # add the relevant file handles to the mux
Packit 7ef13a
    $mux->add($sock);
Packit 7ef13a
    $mux->add(\*STDIN);
Packit 7ef13a
    # We want to buffer output to the terminal.  This prevents the program
Packit 7ef13a
    # from blocking if the user hits CTRL-S for example.
Packit 7ef13a
    $mux->add(\*STDOUT);
Packit 7ef13a
Packit 7ef13a
    # We're not object oriented, so just request callbacks to the
Packit 7ef13a
    # current package
Packit 7ef13a
    $mux->set_callback_object(__PACKAGE__);
Packit 7ef13a
Packit 7ef13a
    # Enter the main mux loop.
Packit 7ef13a
    $mux->loop;
Packit 7ef13a
Packit 7ef13a
    # mux_input is called when input is available on one of
Packit 7ef13a
    # the descriptors.
Packit 7ef13a
    sub mux_input {
Packit 7ef13a
        my $package = shift;
Packit 7ef13a
        my $mux     = shift;
Packit 7ef13a
        my $fh      = shift;
Packit 7ef13a
        my $input   = shift;
Packit 7ef13a
Packit 7ef13a
        # Figure out whence the input came, and send it on to the
Packit 7ef13a
        # other place.
Packit 7ef13a
        if ($fh == $sock) {
Packit 7ef13a
            print STDOUT $$input;
Packit 7ef13a
        } else {
Packit 7ef13a
            print $sock $$input;
Packit 7ef13a
        }
Packit 7ef13a
        # Remove the input from the input buffer.
Packit 7ef13a
        $$input = '';
Packit 7ef13a
    }
Packit 7ef13a
Packit 7ef13a
    # This gets called if the other end closes the connection.
Packit 7ef13a
    sub mux_close {
Packit 7ef13a
        print STDERR "Connection Closed\n";
Packit 7ef13a
        exit;
Packit 7ef13a
    }
Packit 7ef13a
Packit 7ef13a
=head2 A server example
Packit 7ef13a
Packit 7ef13a
Servers are just as simple to write.  We just register a listen socket
Packit 7ef13a
with the multiplex object C<listen> method.  It will automatically
Packit 7ef13a
accept connections on it and add them to its list of active file handles.
Packit 7ef13a
Packit 7ef13a
This example is a simple chat server.
Packit 7ef13a
Packit 7ef13a
    use IO::Socket;
Packit 7ef13a
    use IO::Multiplex;
Packit 7ef13a
Packit 7ef13a
    my $mux  = new IO::Multiplex;
Packit 7ef13a
Packit 7ef13a
    # Create a listening socket
Packit 7ef13a
    my $sock = new IO::Socket::INET(Proto     => 'tcp',
Packit 7ef13a
                                    LocalPort => shift || 2300,
Packit 7ef13a
                                    Listen    => 4)
Packit 7ef13a
        or die "socket: $@";
Packit 7ef13a
Packit 7ef13a
    # We use the listen method instead of the add method.
Packit 7ef13a
    $mux->listen($sock);
Packit 7ef13a
Packit 7ef13a
    $mux->set_callback_object(__PACKAGE__);
Packit 7ef13a
    $mux->loop;
Packit 7ef13a
Packit 7ef13a
    sub mux_input {
Packit 7ef13a
        my $package = shift;
Packit 7ef13a
        my $mux     = shift;
Packit 7ef13a
        my $fh      = shift;
Packit 7ef13a
        my $input   = shift;
Packit 7ef13a
Packit 7ef13a
        # The handles method returns a list of references to handles which
Packit 7ef13a
        # we have registered, except for listen sockets.
Packit 7ef13a
        foreach $c ($mux->handles) {
Packit 7ef13a
            print $c $$input;
Packit 7ef13a
        }
Packit 7ef13a
        $$input = '';
Packit 7ef13a
    }
Packit 7ef13a
Packit 7ef13a
=head2 A more complex server example
Packit 7ef13a
Packit 7ef13a
Let us take a look at the beginnings of a multi-user game server.  We will
Packit 7ef13a
have a Player object for each player.
Packit 7ef13a
Packit 7ef13a
    # Paste the above example in here, up to but not including the
Packit 7ef13a
    # mux_input subroutine.
Packit 7ef13a
Packit 7ef13a
    # mux_connection is called when a new connection is accepted.
Packit 7ef13a
    sub mux_connection {
Packit 7ef13a
        my $package = shift;
Packit 7ef13a
        my $mux     = shift;
Packit 7ef13a
        my $fh      = shift;
Packit 7ef13a
Packit 7ef13a
        # Construct a new player object
Packit 7ef13a
        Player->new($mux, $fh);
Packit 7ef13a
    }
Packit 7ef13a
Packit 7ef13a
    package Player;
Packit 7ef13a
Packit 7ef13a
    my %players = ();
Packit 7ef13a
Packit 7ef13a
    sub new {
Packit 7ef13a
        my $package = shift;
Packit 7ef13a
        my $self    = bless { mux  => shift,
Packit 7ef13a
                              fh   => shift } => $package;
Packit 7ef13a
Packit 7ef13a
        # Register the new player object as the callback specifically for
Packit 7ef13a
        # this file handle.
Packit 7ef13a
Packit 7ef13a
        $self->{mux}->set_callback_object($self, $self->{fh});
Packit 7ef13a
        print $self->{fh}
Packit 7ef13a
            "Greetings, Professor.  Would you like to play a game?\n";
Packit 7ef13a
Packit 7ef13a
        # Register this player object in the main list of players
Packit 7ef13a
        $players{$self} = $self;
Packit 7ef13a
        $mux->set_timeout($self->{fh}, 1);
Packit 7ef13a
    }
Packit 7ef13a
Packit 7ef13a
    sub players { return values %players; }
Packit 7ef13a
Packit 7ef13a
    sub mux_input {
Packit 7ef13a
        my $self = shift;
Packit 7ef13a
        shift; shift;         # These two args are boring
Packit 7ef13a
        my $input = shift;    # Scalar reference to the input
Packit 7ef13a
Packit 7ef13a
        # Process each line in the input, leaving partial lines
Packit 7ef13a
        # in the input buffer
Packit 7ef13a
        while ($$input =~ s/^(.*?)\n//) {
Packit 7ef13a
            $self->process_command($1);
Packit 7ef13a
        }
Packit 7ef13a
    }
Packit 7ef13a
Packit 7ef13a
    sub mux_close {
Packit 7ef13a
       my $self = shift;
Packit 7ef13a
Packit 7ef13a
       # Player disconnected;
Packit 7ef13a
       # [Notify other players or something...]
Packit 7ef13a
       delete $players{$self};
Packit 7ef13a
    }
Packit 7ef13a
    # This gets called every second to update player info, etc...
Packit 7ef13a
    sub mux_timeout {
Packit 7ef13a
        my $self = shift;
Packit 7ef13a
        my $mux  = shift;
Packit 7ef13a
Packit 7ef13a
        $self->heartbeat;
Packit 7ef13a
        $mux->set_timeout($self->{fh}, 1);
Packit 7ef13a
    }
Packit 7ef13a
Packit 7ef13a
=head1 METHODS
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
use POSIX qw(errno_h BUFSIZ);
Packit 7ef13a
use Socket;
Packit 7ef13a
use FileHandle qw(autoflush);
Packit 7ef13a
use IO::Handle;
Packit 7ef13a
use Fcntl;
Packit 7ef13a
use Carp qw(carp);
Packit 7ef13a
use constant IsWin => ($^O eq 'MSWin32');
Packit 7ef13a
Packit 7ef13a
Packit 7ef13a
BEGIN {
Packit 7ef13a
    eval {
Packit 7ef13a
        # Can optionally use Hi Res timers if available
Packit 7ef13a
        require Time::HiRes;
Packit 7ef13a
        Time::HiRes->import('time');
Packit 7ef13a
    };
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
# This is what you want.  Trust me.
Packit 7ef13a
$SIG{PIPE} = 'IGNORE';
Packit 7ef13a
Packit 7ef13a
{   no warnings;
Packit 7ef13a
    if(IsWin) { *EWOULDBLOCK = sub() {10035} }
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 new
Packit 7ef13a
Packit 7ef13a
Construct a new C<IO::Multiplex> object.
Packit 7ef13a
Packit 7ef13a
    $mux = new IO::Multiplex;
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub new
Packit 7ef13a
{
Packit 7ef13a
    my $package = shift;
Packit 7ef13a
    my $self = bless { _readers     => '',
Packit 7ef13a
                       _writers     => '',
Packit 7ef13a
                       _fhs         => {},
Packit 7ef13a
                       _handles     => {},
Packit 7ef13a
                       _timerkeys   => {},
Packit 7ef13a
                       _timers      => [],
Packit 7ef13a
                       _listen      => {}  } => $package;
Packit 7ef13a
    return $self;
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 listen
Packit 7ef13a
Packit 7ef13a
Add a socket to be listened on.  The socket should have had the
Packit 7ef13a
C<bind> and C<listen> system calls already applied to it.  The C<IO::Socket>
Packit 7ef13a
module will do this for you.
Packit 7ef13a
Packit 7ef13a
    $socket = new IO::Socket::INET(Listen => ..., LocalAddr => ...);
Packit 7ef13a
    $mux->listen($socket);
Packit 7ef13a
Packit 7ef13a
Connections will be automatically accepted and C<add>ed to the multiplex
Packit 7ef13a
object.  C<The mux_connection> callback method will also be called.
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub listen
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my $fh   = shift;
Packit 7ef13a
Packit 7ef13a
    $self->add($fh);
Packit 7ef13a
    $self->{_fhs}{"$fh"}{listen} = 1;
Packit 7ef13a
    $fh;
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 add
Packit 7ef13a
Packit 7ef13a
Add a file handle to the multiplexer.
Packit 7ef13a
Packit 7ef13a
    $mux->add($fh);
Packit 7ef13a
Packit 7ef13a
As a side effect, this sets non-blocking mode on the handle, and disables
Packit 7ef13a
STDIO buffering.  It also ties it to intercept output to the handle.
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub add
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my $fh   = shift;
Packit 7ef13a
Packit 7ef13a
    return if $self->{_fhs}{"$fh"};
Packit 7ef13a
Packit 7ef13a
    nonblock($fh);
Packit 7ef13a
    autoflush($fh, 1);
Packit 7ef13a
    fd_set($self->{_readers}, $fh, 1);
Packit 7ef13a
Packit 7ef13a
    my $sockopt = getsockopt $fh, SOL_SOCKET, SO_TYPE;
Packit 7ef13a
    $self->{_fhs}{"$fh"}{udp_true} = 1
Packit 7ef13a
        if defined $sockopt && SOCK_DGRAM == unpack "i", $sockopt;
Packit 7ef13a
Packit 7ef13a
    $self->{_fhs}{"$fh"}{inbuffer} = '';
Packit 7ef13a
    $self->{_fhs}{"$fh"}{outbuffer} = '';
Packit 7ef13a
    $self->{_fhs}{"$fh"}{fileno} = fileno($fh);
Packit 7ef13a
    $self->{_handles}{"$fh"} = $fh;
Packit 7ef13a
    tie *$fh, "IO::Multiplex::Handle", $self, $fh;
Packit 7ef13a
    return $fh;
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 remove
Packit 7ef13a
Packit 7ef13a
Removes a file handle from the multiplexer.  This also unties the
Packit 7ef13a
handle.  It does not currently turn STDIO buffering back on, or turn
Packit 7ef13a
off non-blocking mode.
Packit 7ef13a
Packit 7ef13a
    $mux->remove($fh);
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub remove
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my $fh   = shift;
Packit 7ef13a
    fd_set($self->{_writers}, $fh, 0);
Packit 7ef13a
    fd_set($self->{_readers}, $fh, 0);
Packit 7ef13a
    delete $self->{_fhs}{"$fh"};
Packit 7ef13a
    delete $self->{_handles}{"$fh"};
Packit 7ef13a
    $self->_removeTimer($fh);
Packit 7ef13a
    untie *$fh;
Packit 7ef13a
    return 1;
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 set_callback_object
Packit 7ef13a
Packit 7ef13a
Set the object on which callbacks are made.  If you are not using objects,
Packit 7ef13a
you can specify the name of the package into which the method calls are
Packit 7ef13a
to be made.
Packit 7ef13a
Packit 7ef13a
If a file handle is supplied, the callback object is specific for that
Packit 7ef13a
handle:
Packit 7ef13a
Packit 7ef13a
    $mux->set_callback_object($object, $fh);
Packit 7ef13a
Packit 7ef13a
Otherwise, it is considered a default callback object, and is used when
Packit 7ef13a
events occur on a file handle that does not have its own callback object.
Packit 7ef13a
Packit 7ef13a
    $mux->set_callback_object(__PACKAGE__);
Packit 7ef13a
Packit 7ef13a
The previously registered object (if any) is returned.
Packit 7ef13a
Packit 7ef13a
See also the CALLBACK INTERFACE section.
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub set_callback_object
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my $obj  = shift;
Packit 7ef13a
    my $fh   = shift;
Packit 7ef13a
    return if $fh && !exists($self->{_fhs}{"$fh"});
Packit 7ef13a
Packit 7ef13a
    my $old  = $fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object};
Packit 7ef13a
Packit 7ef13a
    $fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object} = $obj;
Packit 7ef13a
    return $old;
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 kill_output
Packit 7ef13a
Packit 7ef13a
Remove any pending output on a file descriptor.
Packit 7ef13a
Packit 7ef13a
    $mux->kill_output($fh);
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub kill_output
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my $fh   = shift;
Packit 7ef13a
    return unless $fh && exists($self->{_fhs}{"$fh"});
Packit 7ef13a
Packit 7ef13a
    $self->{_fhs}{"$fh"}{outbuffer} = '';
Packit 7ef13a
    fd_set($self->{_writers}, $fh, 0);
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 outbuffer
Packit 7ef13a
Packit 7ef13a
Return or set the output buffer for a descriptor
Packit 7ef13a
Packit 7ef13a
    $output = $mux->outbuffer($fh);
Packit 7ef13a
    $mux->outbuffer($fh, $output);
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub outbuffer
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my $fh   = shift;
Packit 7ef13a
    return unless $fh && exists($self->{_fhs}{"$fh"});
Packit 7ef13a
Packit 7ef13a
    if (@_) {
Packit 7ef13a
        $self->{_fhs}{"$fh"}{outbuffer} = $_[0] if @_;
Packit 7ef13a
        fd_set($self->{_writers}, $fh, 0) if !$_[0];
Packit 7ef13a
    }
Packit 7ef13a
Packit 7ef13a
    $self->{_fhs}{"$fh"}{outbuffer};
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 inbuffer
Packit 7ef13a
Packit 7ef13a
Return or set the input buffer for a descriptor
Packit 7ef13a
Packit 7ef13a
    $input = $mux->inbuffer($fh);
Packit 7ef13a
    $mux->inbuffer($fh, $input);
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub inbuffer
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my $fh   = shift;
Packit 7ef13a
    return unless $fh && exists($self->{_fhs}{"$fh"});
Packit 7ef13a
Packit 7ef13a
    if (@_) {
Packit 7ef13a
        $self->{_fhs}{"$fh"}{inbuffer} = $_[0] if @_;
Packit 7ef13a
    }
Packit 7ef13a
Packit 7ef13a
    return $self->{_fhs}{"$fh"}{inbuffer};
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 set_timeout
Packit 7ef13a
Packit 7ef13a
Set the timer for a file handle.  The timeout value is a certain number of
Packit 7ef13a
seconds in the future, after which the C<mux_timeout> callback is called.
Packit 7ef13a
Packit 7ef13a
If the C<Time::HiRes> module is installed, the timers may be specified in
Packit 7ef13a
fractions of a second.
Packit 7ef13a
Packit 7ef13a
Timers are not reset automatically.
Packit 7ef13a
Packit 7ef13a
    $mux->set_timeout($fh, 23.6);
Packit 7ef13a
Packit 7ef13a
Use C<$mux-E<gt>set_timeout($fh, undef)> to cancel a timer.
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub set_timeout
Packit 7ef13a
{
Packit 7ef13a
    my $self     = shift;
Packit 7ef13a
    my $fh       = shift;
Packit 7ef13a
    my $timeout  = shift;
Packit 7ef13a
    return unless $fh && exists($self->{_fhs}{"$fh"});
Packit 7ef13a
Packit 7ef13a
    if (defined $timeout) {
Packit 7ef13a
        $self->_addTimer($fh, $timeout + time);
Packit 7ef13a
    } else {
Packit 7ef13a
        $self->_removeTimer($fh);
Packit 7ef13a
    }
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 handles
Packit 7ef13a
Packit 7ef13a
Returns a list of handles that the C<IO::Multiplex> object knows about,
Packit 7ef13a
excluding listen sockets.
Packit 7ef13a
Packit 7ef13a
    @handles = $mux->handles;
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub handles
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
Packit 7ef13a
    return grep(!$self->{_fhs}{"$_"}{listen}, values %{$self->{_handles}});
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
sub _addTimer {
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my $fh   = shift;
Packit 7ef13a
    my $time = shift;
Packit 7ef13a
Packit 7ef13a
    # Set a key so that we can quickly tell if a given $fh has
Packit 7ef13a
    # a timer set
Packit 7ef13a
    $self->{_timerkeys}{"$fh"} = 1;
Packit 7ef13a
Packit 7ef13a
    # Store the timeout in an array, and resort it
Packit 7ef13a
    @{$self->{_timers}} = sort { $a->[1] <=> $b->[1] } (@{$self->{_timers}}, [ $fh, $time ] );
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
sub _removeTimer {
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my $fh   = shift;
Packit 7ef13a
Packit 7ef13a
    # Return quickly if no timer is set
Packit 7ef13a
    return unless exists $self->{_timerkeys}{"$fh"};
Packit 7ef13a
Packit 7ef13a
    # Remove the timeout from the sorted array
Packit 7ef13a
    @{$self->{_timers}} = grep { $_->[0] ne $fh } @{$self->{_timers}};
Packit 7ef13a
Packit 7ef13a
    # Get rid of the key
Packit 7ef13a
    delete $self->{_timerkeys}{"$fh"};
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
Packit 7ef13a
=head2 loop
Packit 7ef13a
Packit 7ef13a
Enter the main loop and start processing IO events.
Packit 7ef13a
Packit 7ef13a
    $mux->loop;
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub loop
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my $heartbeat = shift;
Packit 7ef13a
    $self->{_endloop} = 0;
Packit 7ef13a
Packit 7ef13a
    while (!$self->{_endloop} && keys %{$self->{_fhs}}) {
Packit 7ef13a
        my $rv;
Packit 7ef13a
        my $data;
Packit 7ef13a
        my $rdready = "";
Packit 7ef13a
        my $wrready = "";
Packit 7ef13a
        my $timeout = undef;
Packit 7ef13a
Packit 7ef13a
        foreach my $fh (values %{$self->{_handles}}) {
Packit 7ef13a
            fd_set($rdready, $fh, 1) if
Packit 7ef13a
                ref($fh) =~ /SSL/ &&
Packit 7ef13a
                $fh->can("pending") &&
Packit 7ef13a
                $fh->pending;
Packit 7ef13a
        }
Packit 7ef13a
Packit 7ef13a
        if (!length $rdready) {
Packit 7ef13a
            if (@{$self->{_timers}}) {
Packit 7ef13a
                $timeout = $self->{_timers}[0][1] - time;
Packit 7ef13a
            }
Packit 7ef13a
Packit 7ef13a
            my $numready = select($rdready=$self->{_readers},
Packit 7ef13a
                                  $wrready=$self->{_writers},
Packit 7ef13a
                                  undef,
Packit 7ef13a
                                  $timeout);
Packit 7ef13a
Packit 7ef13a
            unless(defined($numready)) {
Packit 7ef13a
                if ($! == EINTR || $! == EAGAIN) {
Packit 7ef13a
                    next;
Packit 7ef13a
                } else {
Packit 7ef13a
                    last;
Packit 7ef13a
                }
Packit 7ef13a
            }
Packit 7ef13a
        }
Packit 7ef13a
Packit 7ef13a
        &{ $heartbeat } ($rdready, $wrready) if $heartbeat;
Packit 7ef13a
Packit 7ef13a
        foreach my $k (keys %{$self->{_handles}}) {
Packit 7ef13a
            my $fh = $self->{_handles}->{$k} or next;
Packit 7ef13a
Packit 7ef13a
            # Avoid creating a permanent empty hash ref for "$fh"
Packit 7ef13a
            # by attempting to access its {object} element
Packit 7ef13a
            # if it has already been closed.
Packit 7ef13a
            next unless exists $self->{_fhs}{"$fh"};
Packit 7ef13a
Packit 7ef13a
            # It is not easy to replace $self->{_fhs}{"$fh"} with a
Packit 7ef13a
            # variable, because some mux_* routines may remove it as
Packit 7ef13a
            # side-effect.
Packit 7ef13a
Packit 7ef13a
            # Get the callback object.
Packit 7ef13a
            my $obj = $self->{_fhs}{"$fh"}{object} ||
Packit 7ef13a
                $self->{_object};
Packit 7ef13a
Packit 7ef13a
            # Is this descriptor ready for reading?
Packit 7ef13a
            if (fd_isset($rdready, $fh))
Packit 7ef13a
            {
Packit 7ef13a
                if ($self->{_fhs}{"$fh"}{listen}) {
Packit 7ef13a
                    # It's a server socket, so a new connection is
Packit 7ef13a
                    # waiting to be accepted
Packit 7ef13a
                    my $client = $fh->accept;
Packit 7ef13a
                    next unless ($client);
Packit 7ef13a
                    $self->add($client);
Packit 7ef13a
                    $obj->mux_connection($self, $client)
Packit 7ef13a
                        if $obj && $obj->can("mux_connection");
Packit 7ef13a
                } else {
Packit 7ef13a
                    if ($self->is_udp($fh)) {
Packit 7ef13a
                        $rv = recv($fh, $data, BUFSIZ, 0);
Packit 7ef13a
                        if (defined $rv) {
Packit 7ef13a
                            # Remember where the last UDP packet came from
Packit 7ef13a
                            $self->{_fhs}{"$fh"}{udp_peer} = $rv;
Packit 7ef13a
                        }
Packit 7ef13a
                    } else {
Packit 7ef13a
                        $rv = &POSIX::read(fileno($fh), $data, BUFSIZ);
Packit 7ef13a
                    }
Packit 7ef13a
Packit 7ef13a
                    if (defined($rv) && length($data)) {
Packit 7ef13a
                        # Append the data to the client's receive buffer,
Packit 7ef13a
                        # and call process_input to see if anything needs to
Packit 7ef13a
                        # be done.
Packit 7ef13a
                        $self->{_fhs}{"$fh"}{inbuffer} .= $data;
Packit 7ef13a
                        $obj->mux_input($self, $fh,
Packit 7ef13a
                                        \$self->{_fhs}{"$fh"}{inbuffer})
Packit 7ef13a
                            if $obj && $obj->can("mux_input");
Packit 7ef13a
                    } else {
Packit 7ef13a
                        unless (defined $rv) {
Packit 7ef13a
                            next if
Packit 7ef13a
                                $! == EINTR ||
Packit 7ef13a
                                $! == EAGAIN ||
Packit 7ef13a
                                $! == EWOULDBLOCK;
Packit 7ef13a
			    warn "IO::Multiplex read error: $!"
Packit 7ef13a
                                if $! != ECONNRESET;
Packit 7ef13a
                        }
Packit 7ef13a
                        # There's an error, or we received EOF.  If
Packit 7ef13a
                        # there's pending data to be written, we leave
Packit 7ef13a
                        # the connection open so it can be sent.  If
Packit 7ef13a
                        # the other end is closed for writing, the
Packit 7ef13a
                        # send will error and we close down there.
Packit 7ef13a
                        # Either way, we remove it from _readers as
Packit 7ef13a
                        # we're no longer interested in reading from
Packit 7ef13a
                        # it.
Packit 7ef13a
                        fd_set($self->{_readers}, $fh, 0);
Packit 7ef13a
                        $obj->mux_eof($self, $fh,
Packit 7ef13a
                                      \$self->{_fhs}{"$fh"}{inbuffer})
Packit 7ef13a
                            if $obj && $obj->can("mux_eof");
Packit 7ef13a
Packit 7ef13a
                        if (exists $self->{_fhs}{"$fh"}) {
Packit 7ef13a
                            $self->{_fhs}{"$fh"}{inbuffer} = '';
Packit 7ef13a
                            # The mux_eof handler could have responded
Packit 7ef13a
                            # with a shutdown for writing.
Packit 7ef13a
                            $self->close($fh)
Packit 7ef13a
                                unless exists $self->{_fhs}{"$fh"}
Packit 7ef13a
                                    && length $self->{_fhs}{"$fh"}{outbuffer};
Packit 7ef13a
                        }
Packit 7ef13a
                        next;
Packit 7ef13a
                    }
Packit 7ef13a
                }
Packit 7ef13a
            }  # end if readable
Packit 7ef13a
            next unless exists $self->{_fhs}{"$fh"};
Packit 7ef13a
Packit 7ef13a
            if (fd_isset($wrready, $fh)) {
Packit 7ef13a
                unless (length $self->{_fhs}{"$fh"}{outbuffer}) {
Packit 7ef13a
                    fd_set($self->{_writers}, $fh, 0);
Packit 7ef13a
                    $obj->mux_outbuffer_empty($self, $fh)
Packit 7ef13a
                        if ($obj && $obj->can("mux_outbuffer_empty"));
Packit 7ef13a
                    next;
Packit 7ef13a
                }
Packit 7ef13a
                $rv = &POSIX::write(fileno($fh),
Packit 7ef13a
                                    $self->{_fhs}{"$fh"}{outbuffer},
Packit 7ef13a
                                    length($self->{_fhs}{"$fh"}{outbuffer}));
Packit 7ef13a
                unless (defined($rv)) {
Packit 7ef13a
                    # We got an error writing to it.  If it's
Packit 7ef13a
                    # EWOULDBLOCK (shouldn't happen if select told us
Packit 7ef13a
                    # we can write) or EAGAIN, or EINTR we don't worry
Packit 7ef13a
                    # about it.  otherwise, close it down.
Packit 7ef13a
                    unless ($! == EWOULDBLOCK ||
Packit 7ef13a
                            $! == EINTR ||
Packit 7ef13a
                            $! == EAGAIN) {
Packit 7ef13a
                        if ($! == EPIPE) {
Packit 7ef13a
                            $obj->mux_epipe($self, $fh)
Packit 7ef13a
                                if $obj && $obj->can("mux_epipe");
Packit 7ef13a
                        } else {
Packit 7ef13a
                            warn "IO::Multiplex: write error: $!\n";
Packit 7ef13a
                        }
Packit 7ef13a
                        $self->close($fh);
Packit 7ef13a
                    }
Packit 7ef13a
                    next;
Packit 7ef13a
                }
Packit 7ef13a
                substr($self->{_fhs}{"$fh"}{outbuffer}, 0, $rv) = '';
Packit 7ef13a
                unless (length $self->{_fhs}{"$fh"}{outbuffer}) {
Packit 7ef13a
                    # Mark us as not writable if there's nothing more to
Packit 7ef13a
                    # write
Packit 7ef13a
                    fd_set($self->{_writers}, $fh, 0);
Packit 7ef13a
                    $obj->mux_outbuffer_empty($self, $fh)
Packit 7ef13a
                        if ($obj && $obj->can("mux_outbuffer_empty"));
Packit 7ef13a
Packit 7ef13a
                    if (   $self->{_fhs}{"$fh"}
Packit 7ef13a
                        && $self->{_fhs}{"$fh"}{shutdown}) {
Packit 7ef13a
                        # If we've been marked for shutdown after write
Packit 7ef13a
                        # do it.
Packit 7ef13a
                        shutdown($fh, 1);
Packit 7ef13a
                        $self->{_fhs}{"$fh"}{outbuffer} = '';
Packit 7ef13a
                        unless (length $self->{_fhs}{"$fh"}{inbuffer}) {
Packit 7ef13a
                            # We'd previously been shutdown for reading
Packit 7ef13a
                            # also, so close out completely
Packit 7ef13a
                            $self->close($fh);
Packit 7ef13a
                            next;
Packit 7ef13a
                        }
Packit 7ef13a
                    }
Packit 7ef13a
                }
Packit 7ef13a
            }  # End if writeable
Packit 7ef13a
Packit 7ef13a
            next unless exists $self->{_fhs}{"$fh"};
Packit 7ef13a
Packit 7ef13a
        }  # End foreach $fh (...)
Packit 7ef13a
Packit 7ef13a
        $self->_checkTimeouts() if @{$self->{_timers}};
Packit 7ef13a
Packit 7ef13a
    } # End while(loop)
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
sub _checkTimeouts {
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
Packit 7ef13a
    # Get the current time
Packit 7ef13a
    my $time = time;
Packit 7ef13a
Packit 7ef13a
    # Copy all of the timers that should go off into
Packit 7ef13a
    # a temporary array. This allows us to modify the
Packit 7ef13a
    # real array as we process the timers, without
Packit 7ef13a
    # interfering with the loop.
Packit 7ef13a
Packit 7ef13a
    my @timers = ();
Packit 7ef13a
    foreach my $timer (@{$self->{_timers}}) {
Packit 7ef13a
        # If the timer is in the future, we can stop
Packit 7ef13a
        last if $timer->[1] > $time;
Packit 7ef13a
        push @timers, $timer;
Packit 7ef13a
    }
Packit 7ef13a
Packit 7ef13a
    foreach my $timer (@timers) {
Packit 7ef13a
        my $fh = $timer->[0];
Packit 7ef13a
        $self->_removeTimer($fh);
Packit 7ef13a
Packit 7ef13a
        next unless exists $self->{_fhs}{"$fh"};
Packit 7ef13a
Packit 7ef13a
        my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object};
Packit 7ef13a
        $obj->mux_timeout($self, $fh) if $obj && $obj->can("mux_timeout");
Packit 7ef13a
    }
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
Packit 7ef13a
=head2 endloop
Packit 7ef13a
Packit 7ef13a
Prematurly terminate the loop.  The loop will automatically terminate
Packit 7ef13a
when there are no remaining descriptors to be watched.
Packit 7ef13a
Packit 7ef13a
    $mux->endloop;
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub endloop
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    $self->{_endloop} = 1;
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 udp_peer
Packit 7ef13a
Packit 7ef13a
Get peer endpoint of where the last udp packet originated.
Packit 7ef13a
Packit 7ef13a
    $saddr = $mux->udp_peer($fh);
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub udp_peer {
Packit 7ef13a
  my $self = shift;
Packit 7ef13a
  my $fh = shift;
Packit 7ef13a
  return $self->{_fhs}{"$fh"}{udp_peer};
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 is_udp
Packit 7ef13a
Packit 7ef13a
Sometimes UDP packets require special attention.
Packit 7ef13a
This method will tell if a file handle is of type UDP.
Packit 7ef13a
Packit 7ef13a
    $is_udp = $mux->is_udp($fh);
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub is_udp {
Packit 7ef13a
  my $self = shift;
Packit 7ef13a
  my $fh = shift;
Packit 7ef13a
  return $self->{_fhs}{"$fh"}{udp_true};
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 write
Packit 7ef13a
Packit 7ef13a
Send output to a file handle.
Packit 7ef13a
Packit 7ef13a
    $mux->write($fh, "'ere I am, JH!\n");
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub write
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my $fh   = shift;
Packit 7ef13a
    my $data = shift;
Packit 7ef13a
    return unless $fh && exists($self->{_fhs}{"$fh"});
Packit 7ef13a
Packit 7ef13a
    if ($self->{_fhs}{"$fh"}{shutdown}) {
Packit 7ef13a
        $! = EPIPE;
Packit 7ef13a
        return undef;
Packit 7ef13a
    }
Packit 7ef13a
    if ($self->is_udp($fh)) {
Packit 7ef13a
        if (my $udp_peer = $self->udp_peer($fh)) {
Packit 7ef13a
            # Send the packet back to the last peer that said something
Packit 7ef13a
            return send($fh, $data, 0, $udp_peer);
Packit 7ef13a
        } else {
Packit 7ef13a
            # No udp_peer yet?
Packit 7ef13a
            # This better be a connect()ed UDP socket
Packit 7ef13a
            # or else this will fail with ENOTCONN
Packit 7ef13a
            return send($fh, $data, 0);
Packit 7ef13a
        }
Packit 7ef13a
    }
Packit 7ef13a
    $self->{_fhs}{"$fh"}{outbuffer} .= $data;
Packit 7ef13a
    fd_set($self->{_writers}, $fh, 1);
Packit 7ef13a
    return length($data);
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 shutdown
Packit 7ef13a
Packit 7ef13a
Shut down a socket for reading or writing or both.  See the C<shutdown>
Packit 7ef13a
Perl documentation for further details.
Packit 7ef13a
Packit 7ef13a
If the shutdown is for reading, it happens immediately.  However,
Packit 7ef13a
shutdowns for writing are delayed until any pending output has been
Packit 7ef13a
successfully written to the socket.
Packit 7ef13a
Packit 7ef13a
    $mux->shutdown($socket, 1);
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub shutdown
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my $fh = shift;
Packit 7ef13a
    my $which = shift;
Packit 7ef13a
    return unless $fh && exists($self->{_fhs}{"$fh"});
Packit 7ef13a
Packit 7ef13a
    if ($which == 0 || $which == 2) {
Packit 7ef13a
        # Shutdown for reading.  We can do this now.
Packit 7ef13a
        shutdown($fh, 0);
Packit 7ef13a
        # The mux_eof hook must be run from the main loop to consume
Packit 7ef13a
        # the rest of the inbuffer if there is anything left.
Packit 7ef13a
        # It will also remove $fh from _readers.
Packit 7ef13a
    }
Packit 7ef13a
Packit 7ef13a
    if ($which == 1 || $which == 2) {
Packit 7ef13a
        # Shutdown for writing.  Only do this now if there is no pending
Packit 7ef13a
        # data.
Packit 7ef13a
        if(length $self->{_fhs}{"$fh"}{outbuffer}) {
Packit 7ef13a
            $self->{_fhs}{"$fh"}{shutdown} = 1;
Packit 7ef13a
        } else {
Packit 7ef13a
            shutdown($fh, 1);
Packit 7ef13a
            $self->{_fhs}{"$fh"}{outbuffer} = '';
Packit 7ef13a
        }
Packit 7ef13a
    }
Packit 7ef13a
    # Delete the descriptor if it's totally gone.
Packit 7ef13a
    unless (length $self->{_fhs}{"$fh"}{inbuffer} ||
Packit 7ef13a
            length $self->{_fhs}{"$fh"}{outbuffer}) {
Packit 7ef13a
        $self->close($fh);
Packit 7ef13a
    }
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
=head2 close
Packit 7ef13a
Packit 7ef13a
Close a handle.  Always use this method to close a handle that is being
Packit 7ef13a
watched by the multiplexer.
Packit 7ef13a
Packit 7ef13a
    $mux->close($fh);
Packit 7ef13a
Packit 7ef13a
=cut
Packit 7ef13a
Packit 7ef13a
sub close
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my $fh = shift;
Packit 7ef13a
    return unless exists $self->{_fhs}{"$fh"};
Packit 7ef13a
Packit 7ef13a
    my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object};
Packit 7ef13a
    warn "closing with read buffer"  if length $self->{_fhs}{"$fh"}{inbuffer};
Packit 7ef13a
    warn "closing with write buffer" if length $self->{_fhs}{"$fh"}{outbuffer};
Packit 7ef13a
Packit 7ef13a
    fd_set($self->{_readers}, $fh, 0);
Packit 7ef13a
    fd_set($self->{_writers}, $fh, 0);
Packit 7ef13a
Packit 7ef13a
    delete $self->{_fhs}{"$fh"};
Packit 7ef13a
    delete $self->{_handles}{"$fh"};
Packit 7ef13a
    untie *$fh;
Packit 7ef13a
    close $fh;
Packit 7ef13a
    $obj->mux_close($self, $fh) if $obj && $obj->can("mux_close");
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
# We set non-blocking mode on all descriptors.  If we don't, then send
Packit 7ef13a
# might block if the data is larger than the kernel can accept all at once,
Packit 7ef13a
# even though select told us we can write.  With non-blocking mode, we
Packit 7ef13a
# get a partial write in those circumstances, which is what we want.
Packit 7ef13a
Packit 7ef13a
sub nonblock
Packit 7ef13a
{   my $fh = shift;
Packit 7ef13a
Packit 7ef13a
    if(IsWin)
Packit 7ef13a
    {   ioctl($fh, 0x8004667e, pack("L!", 1));
Packit 7ef13a
    }
Packit 7ef13a
    else
Packit 7ef13a
    {   my $flags = fcntl($fh, F_GETFL, 0)
Packit 7ef13a
            or die "fcntl F_GETFL: $!\n";
Packit 7ef13a
        fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
Packit 7ef13a
            or die "fcntl F_SETFL $!\n";
Packit 7ef13a
    }
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
sub fd_set
Packit 7ef13a
{
Packit 7ef13a
     vec($_[0], fileno($_[1]), 1) = $_[2];
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
sub fd_isset
Packit 7ef13a
{
Packit 7ef13a
    return vec($_[0], fileno($_[1]), 1);
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
# We tie handles into this package to handle write buffering.
Packit 7ef13a
Packit 7ef13a
package IO::Multiplex::Handle;
Packit 7ef13a
Packit 7ef13a
use strict;
Packit 7ef13a
use Tie::Handle;
Packit 7ef13a
use Carp;
Packit 7ef13a
use vars qw(@ISA);
Packit 7ef13a
@ISA = qw(Tie::Handle);
Packit 7ef13a
Packit 7ef13a
sub FILENO
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    return ($self->{_mux}->{_fhs}->{"$self->{_fh}"}->{fileno});
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
Packit 7ef13a
sub TIEHANDLE
Packit 7ef13a
{
Packit 7ef13a
    my $package = shift;
Packit 7ef13a
    my $mux = shift;
Packit 7ef13a
    my $fh  = shift;
Packit 7ef13a
Packit 7ef13a
    my $self = bless { _mux   => $mux,
Packit 7ef13a
                       _fh    => $fh } => $package;
Packit 7ef13a
    return $self;
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
sub WRITE
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    my ($msg, $len, $offset) = @_;
Packit 7ef13a
    $offset ||= 0;
Packit 7ef13a
    return $self->{_mux}->write($self->{_fh}, substr($msg, $offset, $len));
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
sub CLOSE
Packit 7ef13a
{
Packit 7ef13a
    my $self = shift;
Packit 7ef13a
    return $self->{_mux}->shutdown($self->{_fh}, 2);
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
sub READ
Packit 7ef13a
{
Packit 7ef13a
    carp "Do not read from a muxed file handle";
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
sub READLINE
Packit 7ef13a
{
Packit 7ef13a
    carp "Do not read from a muxed file handle";
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
sub FETCH
Packit 7ef13a
{
Packit 7ef13a
    return "Fnord";
Packit 7ef13a
}
Packit 7ef13a
Packit 7ef13a
sub UNTIE {}
Packit 7ef13a
Packit 7ef13a
1;
Packit 7ef13a
Packit 7ef13a
__END__
Packit 7ef13a
Packit 7ef13a
=head1 CALLBACK INTERFACE
Packit 7ef13a
Packit 7ef13a
Callback objects should support the following interface.  You do not have
Packit 7ef13a
to provide all of these methods, just provide the ones you are interested in.
Packit 7ef13a
Packit 7ef13a
All methods receive a reference to the callback object (or package) as
Packit 7ef13a
their first argument, in the traditional object oriented
Packit 7ef13a
way. References to the C<IO::Multiplex> object and the relevant file
Packit 7ef13a
handle are also provided.  This will be assumed in the method
Packit 7ef13a
descriptions.
Packit 7ef13a
Packit 7ef13a
=head2 mux_input
Packit 7ef13a
Packit 7ef13a
Called when input is ready on a descriptor.  It is passed a reference to
Packit 7ef13a
the input buffer.  It should remove any input that it has consumed, and
Packit 7ef13a
leave any partially received data in the buffer.
Packit 7ef13a
Packit 7ef13a
    sub mux_input {
Packit 7ef13a
        my $self = shift;
Packit 7ef13a
        my $mux  = shift;
Packit 7ef13a
        my $fh   = shift;
Packit 7ef13a
        my $data = shift;
Packit 7ef13a
Packit 7ef13a
        # Process each line in the input, leaving partial lines
Packit 7ef13a
        # in the input buffer
Packit 7ef13a
        while ($$data =~ s/^(.*?\n)//) {
Packit 7ef13a
            $self->process_command($1);
Packit 7ef13a
        }
Packit 7ef13a
    }
Packit 7ef13a
Packit 7ef13a
=head2 mux_eof
Packit 7ef13a
Packit 7ef13a
This is called when an end-of-file condition is present on the descriptor.
Packit 7ef13a
This is does not nessecarily mean that the descriptor has been closed, as
Packit 7ef13a
the other end of a socket could have used C<shutdown> to close just half
Packit 7ef13a
of the socket, leaving us free to write data back down the still open
Packit 7ef13a
half.  Like mux_input, it is also passed a reference to the input buffer.
Packit 7ef13a
It should consume the entire buffer or else it will just be lost.
Packit 7ef13a
Packit 7ef13a
In this example, we send a final reply to the other end of the socket,
Packit 7ef13a
and then shut it down for writing.  Since it is also shut down for reading
Packit 7ef13a
(implicly by the EOF condition), it will be closed once the output has
Packit 7ef13a
been sent, after which the mux_close callback will be called.
Packit 7ef13a
Packit 7ef13a
    sub mux_eof {
Packit 7ef13a
        my $self = shift;
Packit 7ef13a
        my $mux  = shift;
Packit 7ef13a
        my $fh   = shift;
Packit 7ef13a
Packit 7ef13a
        print $fh "Well, goodbye then!\n";
Packit 7ef13a
        $mux->shutdown($fh, 1);
Packit 7ef13a
    }
Packit 7ef13a
Packit 7ef13a
=head2 mux_close
Packit 7ef13a
Packit 7ef13a
Called when a handle has been completely closed.  At the time that
Packit 7ef13a
C<mux_close> is called, the handle will have been removed from the
Packit 7ef13a
multiplexer, and untied.
Packit 7ef13a
Packit 7ef13a
=head2 mux_outbuffer_empty
Packit 7ef13a
Packit 7ef13a
Called after all pending output has been written to the file descriptor.
Packit 7ef13a
Packit 7ef13a
=head2 mux_connection
Packit 7ef13a
Packit 7ef13a
Called upon a new connection being accepted on a listen socket.
Packit 7ef13a
Packit 7ef13a
=head2 mux_timeout
Packit 7ef13a
Packit 7ef13a
Called when a timer expires.
Packit 7ef13a
Packit 7ef13a
=head1 AUTHOR
Packit 7ef13a
Packit 7ef13a
Copyright 1999 Bruce J Keeler <bruce@gridpoint.com>
Packit 7ef13a
Packit 7ef13a
Copyright 2001-2008 Rob Brown <bbb@cpan.org>
Packit 7ef13a
Packit 7ef13a
Released under the same terms as Perl itself.
Packit 7ef13a
Packit 7ef13a
$Id: Multiplex.pm,v 1.45 2015/04/09 21:27:54 rob Exp $
Packit 7ef13a
Packit 7ef13a
=cut