Blame lib/Parallel/ForkManager.pm

Packit cbc316
package Parallel::ForkManager;
Packit cbc316
our $AUTHORITY = 'cpan:DLUX';
Packit cbc316
# ABSTRACT:  A simple parallel processing fork manager
Packit cbc316
$Parallel::ForkManager::VERSION = '1.19';
Packit cbc316
use POSIX ":sys_wait_h";
Packit cbc316
use Storable qw(store retrieve);
Packit cbc316
use File::Spec;
Packit cbc316
use File::Temp ();
Packit cbc316
use File::Path ();
Packit cbc316
use Carp;
Packit cbc316
Packit cbc316
use strict;
Packit cbc316
Packit cbc316
sub new {
Packit cbc316
  my ($c,$processes,$tempdir)=@_;
Packit cbc316
Packit cbc316
  my $h={
Packit cbc316
    max_proc   => $processes,
Packit cbc316
    processes  => {},
Packit cbc316
    in_child   => 0,
Packit cbc316
    parent_pid => $$,
Packit cbc316
    auto_cleanup => ($tempdir ? 0 : 1),
Packit cbc316
    waitpid_blocking_sleep => 1,
Packit cbc316
  };
Packit cbc316
Packit cbc316
Packit cbc316
  # determine temporary directory for storing data structures
Packit cbc316
  # add it to Parallel::ForkManager object so children can use it
Packit cbc316
  # We don't let it clean up so it won't do it in the child process
Packit cbc316
  # but we have our own DESTROY to do that.
Packit cbc316
  if (not defined($tempdir) or not length($tempdir)) {
Packit cbc316
    $tempdir = File::Temp::tempdir(CLEANUP => 0);
Packit cbc316
  }
Packit cbc316
  die qq|Temporary directory "$tempdir" doesn't exist or is not a directory.| unless (-e $tempdir && -d _);  # ensure temp dir exists and is indeed a directory
Packit cbc316
  $h->{tempdir} = $tempdir;
Packit cbc316
Packit cbc316
  return bless($h,ref($c)||$c);
Packit cbc316
};
Packit cbc316
Packit cbc316
sub start {
Packit cbc316
  my ($s,$identification)=@_;
Packit cbc316
Packit cbc316
  die "Cannot start another process while you are in the child process"
Packit cbc316
    if $s->{in_child};
Packit cbc316
  while ($s->{max_proc} && ( keys %{ $s->{processes} } ) >= $s->{max_proc}) {
Packit cbc316
    $s->on_wait;
Packit cbc316
    $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
Packit cbc316
  };
Packit cbc316
  $s->wait_children;
Packit cbc316
  if ($s->{max_proc}) {
Packit cbc316
    my $pid=fork();
Packit cbc316
    die "Cannot fork: $!" if !defined $pid;
Packit cbc316
    if ($pid) {
Packit cbc316
      $s->{processes}->{$pid}=$identification;
Packit cbc316
      $s->on_start($pid,$identification);
Packit cbc316
    } else {
Packit cbc316
      $s->{in_child}=1 if !$pid;
Packit cbc316
    }
Packit cbc316
    return $pid;
Packit cbc316
  } else {
Packit cbc316
    $s->{processes}->{$$}=$identification;
Packit cbc316
    $s->on_start($$,$identification);
Packit cbc316
    return 0; # Simulating the child which returns 0
Packit cbc316
  }
Packit cbc316
}
Packit cbc316
Packit cbc316
sub start_child {
Packit cbc316
    my $self = shift;
Packit cbc316
    my $sub = pop;
Packit cbc316
    my $identification = shift;
Packit cbc316
Packit cbc316
    $self->start( $identification ) # in the parent
Packit cbc316
            # ... or the child
Packit cbc316
        or $self->finish( 0, $sub->() );
Packit cbc316
}
Packit cbc316
Packit cbc316
Packit cbc316
sub finish {
Packit cbc316
  my ($s, $x, $r)=@_;
Packit cbc316
Packit cbc316
  if ( $s->{in_child} ) {
Packit cbc316
    if (defined($r)) {  # store the child's data structure
Packit cbc316
      my $storable_tempfile = File::Spec->catfile($s->{tempdir}, 'Parallel-ForkManager-' . $s->{parent_pid} . '-' . $$ . '.txt');
Packit cbc316
      my $stored = eval { return &store($r, $storable_tempfile); };
Packit cbc316
Packit cbc316
      # handle Storables errors, IE logcarp or carp returning undef, or die (via logcroak or croak)
Packit cbc316
      if (not $stored or $@) {
Packit cbc316
        warn(qq|The storable module was unable to store the child's data structure to the temp file "$storable_tempfile":  | . join(', ', $@));
Packit cbc316
      }
Packit cbc316
    }
Packit cbc316
    CORE::exit($x || 0);
Packit cbc316
  }
Packit cbc316
  if ($s->{max_proc} == 0) { # max_proc == 0
Packit cbc316
    $s->on_finish($$, $x ,$s->{processes}->{$$}, 0, 0, $r);
Packit cbc316
    delete $s->{processes}->{$$};
Packit cbc316
  }
Packit cbc316
  return 0;
Packit cbc316
}
Packit cbc316
Packit cbc316
sub wait_children {
Packit cbc316
  my ($s)=@_;
Packit cbc316
Packit cbc316
  return if !keys %{$s->{processes}};
Packit cbc316
  my $kid;
Packit cbc316
  do {
Packit cbc316
    $kid = $s->wait_one_child(&WNOHANG);
Packit cbc316
  } while defined $kid and ( $kid > 0 or $kid < -1 ); # AS 5.6/Win32 returns negative PIDs
Packit cbc316
};
Packit cbc316
Packit cbc316
*wait_childs=*wait_children; # compatibility
Packit cbc316
*reap_finished_children=*wait_children; # behavioral synonym for clarity
Packit cbc316
Packit cbc316
sub wait_one_child {
Packit cbc316
  my ($s,$par)=@_;
Packit cbc316
Packit cbc316
  my $kid;
Packit cbc316
  while (1) {
Packit cbc316
    $kid = $s->_waitpid(-1,$par||=0);
Packit cbc316
Packit cbc316
    last unless defined $kid;
Packit cbc316
Packit cbc316
    last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs
Packit cbc316
    redo if !exists $s->{processes}->{$kid};
Packit cbc316
    my $id = delete $s->{processes}->{$kid};
Packit cbc316
Packit cbc316
    # retrieve child data structure, if any
Packit cbc316
    my $retrieved = undef;
Packit cbc316
    my $storable_tempfile = File::Spec->catfile($s->{tempdir}, 'Parallel-ForkManager-' . $s->{parent_pid} . '-' . $kid . '.txt');
Packit cbc316
    if (-e $storable_tempfile) {  # child has option of not storing anything, so we need to see if it did or not
Packit cbc316
      $retrieved = eval { return &retrieve($storable_tempfile); };
Packit cbc316
Packit cbc316
      # handle Storables errors
Packit cbc316
      if (not $retrieved or $@) {
Packit cbc316
        warn(qq|The storable module was unable to retrieve the child's data structure from the temporary file "$storable_tempfile":  | . join(', ', $@));
Packit cbc316
      }
Packit cbc316
Packit cbc316
      # clean up after ourselves
Packit cbc316
      unlink $storable_tempfile;
Packit cbc316
    }
Packit cbc316
Packit cbc316
    $s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0, $retrieved);
Packit cbc316
    last;
Packit cbc316
  }
Packit cbc316
  $kid;
Packit cbc316
};
Packit cbc316
Packit cbc316
sub wait_all_children {
Packit cbc316
  my ($s)=@_;
Packit cbc316
Packit cbc316
  while (keys %{ $s->{processes} }) {
Packit cbc316
    $s->on_wait;
Packit cbc316
    $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
Packit cbc316
  };
Packit cbc316
}
Packit cbc316
Packit cbc316
*wait_all_childs=*wait_all_children; # compatibility;
Packit cbc316
Packit cbc316
sub max_procs { $_[0]->{max_proc}; }
Packit cbc316
Packit cbc316
sub is_child  { $_[0]->{in_child} }
Packit cbc316
Packit cbc316
sub is_parent { !$_[0]->{in_child} }
Packit cbc316
Packit cbc316
sub running_procs {
Packit cbc316
    my $self = shift;
Packit cbc316
Packit cbc316
    my @pids = keys %{ $self->{processes} };
Packit cbc316
    return @pids;
Packit cbc316
}
Packit cbc316
Packit cbc316
sub wait_for_available_procs {
Packit cbc316
    my( $self, $nbr ) = @_;
Packit cbc316
    $nbr ||= 1;
Packit cbc316
Packit cbc316
    croak "nbr processes '$nbr' higher than the max nbr of processes (@{[ $self->max_procs ]})"
Packit cbc316
        if $nbr > $self->max_procs;
Packit cbc316
Packit cbc316
    $self->wait_one_child until $self->max_procs - $self->running_procs >= $nbr;
Packit cbc316
}
Packit cbc316
Packit cbc316
sub run_on_finish {
Packit cbc316
  my ($s,$code,$pid)=@_;
Packit cbc316
Packit cbc316
  $s->{on_finish}->{$pid || 0}=$code;
Packit cbc316
}
Packit cbc316
Packit cbc316
sub on_finish {
Packit cbc316
  my ($s,$pid,@par)=@_;
Packit cbc316
Packit cbc316
  my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0;
Packit cbc316
  $code->($pid,@par);
Packit cbc316
};
Packit cbc316
Packit cbc316
sub run_on_wait {
Packit cbc316
  my ($s,$code, $period)=@_;
Packit cbc316
Packit cbc316
  $s->{on_wait}=$code;
Packit cbc316
  $s->{on_wait_period} = $period;
Packit cbc316
}
Packit cbc316
Packit cbc316
sub on_wait {
Packit cbc316
  my ($s)=@_;
Packit cbc316
Packit cbc316
  if(ref($s->{on_wait}) eq 'CODE') {
Packit cbc316
    $s->{on_wait}->();
Packit cbc316
    if (defined $s->{on_wait_period}) {
Packit cbc316
        local $SIG{CHLD} = sub { } if ! defined $SIG{CHLD};
Packit cbc316
        select undef, undef, undef, $s->{on_wait_period}
Packit cbc316
    };
Packit cbc316
  };
Packit cbc316
};
Packit cbc316
Packit cbc316
sub run_on_start {
Packit cbc316
  my ($s,$code)=@_;
Packit cbc316
Packit cbc316
  $s->{on_start}=$code;
Packit cbc316
}
Packit cbc316
Packit cbc316
sub on_start {
Packit cbc316
  my ($s,@par)=@_;
Packit cbc316
Packit cbc316
  $s->{on_start}->(@par) if ref($s->{on_start}) eq 'CODE';
Packit cbc316
};
Packit cbc316
Packit cbc316
sub set_max_procs {
Packit cbc316
  my ($s, $mp)=@_;
Packit cbc316
Packit cbc316
  $s->{max_proc} = $mp;
Packit cbc316
}
Packit cbc316
Packit cbc316
sub set_waitpid_blocking_sleep {
Packit cbc316
    my( $self, $period ) = @_;
Packit cbc316
    $self->{waitpid_blocking_sleep} = $period;
Packit cbc316
}
Packit cbc316
Packit cbc316
sub waitpid_blocking_sleep {
Packit cbc316
    $_[0]->{waitpid_blocking_sleep};
Packit cbc316
}
Packit cbc316
Packit cbc316
sub _waitpid { # Call waitpid() in the standard Unix fashion.
Packit cbc316
    my( $self, undef, $flag ) = @_;
Packit cbc316
Packit cbc316
    return $flag ? $self->_waitpid_non_blocking : $self->_waitpid_blocking;
Packit cbc316
}
Packit cbc316
Packit cbc316
sub _waitpid_non_blocking {
Packit cbc316
    my $self = shift;
Packit cbc316
Packit cbc316
    for my $pid ( $self->running_procs ) {
Packit cbc316
        my $p = waitpid $pid, &WNOHANG or next;
Packit cbc316
Packit cbc316
        return $pid if $p != -1;
Packit cbc316
Packit cbc316
        warn "child process '$pid' disappeared. A call to `waitpid` outside of Parallel::ForkManager might have reaped it.\n";
Packit cbc316
        # it's gone. let's clean the process entry
Packit cbc316
        delete $self->{processes}{$pid};
Packit cbc316
    }
Packit cbc316
Packit cbc316
    return;
Packit cbc316
}
Packit cbc316
Packit cbc316
sub _waitpid_blocking {
Packit cbc316
    my $self = shift;
Packit cbc316
Packit cbc316
    # pseudo-blocking
Packit cbc316
    if( my $sleep_period = $self->{waitpid_blocking_sleep} ) {
Packit cbc316
        while() {
Packit cbc316
            my $pid = $self->_waitpid_non_blocking;
Packit cbc316
Packit cbc316
            return $pid if defined $pid;
Packit cbc316
Packit cbc316
            return unless $self->running_procs;
Packit cbc316
Packit cbc316
            select undef, undef, undef, $sleep_period;
Packit cbc316
        }
Packit cbc316
    }
Packit cbc316
Packit cbc316
    return waitpid -1, 0;
Packit cbc316
}
Packit cbc316
Packit cbc316
sub DESTROY {
Packit cbc316
  my ($self) = @_;
Packit cbc316
Packit cbc316
  if ($self->{auto_cleanup} && $self->{parent_pid} == $$ && -d $self->{tempdir}) {
Packit cbc316
    File::Path::remove_tree($self->{tempdir});
Packit cbc316
  }
Packit cbc316
}
Packit cbc316
Packit cbc316
1;
Packit cbc316
Packit cbc316
__END__
Packit cbc316
Packit cbc316
=pod
Packit cbc316
Packit cbc316
=encoding UTF-8
Packit cbc316
Packit cbc316
=head1 NAME
Packit cbc316
Packit cbc316
Parallel::ForkManager - A simple parallel processing fork manager
Packit cbc316
Packit cbc316
=head1 VERSION
Packit cbc316
Packit cbc316
version 1.19
Packit cbc316
Packit cbc316
=head1 SYNOPSIS
Packit cbc316
Packit cbc316
  use Parallel::ForkManager;
Packit cbc316
Packit cbc316
  my $pm = Parallel::ForkManager->new($MAX_PROCESSES);
Packit cbc316
Packit cbc316
  DATA_LOOP:
Packit cbc316
  foreach my $data (@all_data) {
Packit cbc316
    # Forks and returns the pid for the child:
Packit cbc316
    my $pid = $pm->start and next DATA_LOOP;
Packit cbc316
Packit cbc316
    ... do some work with $data in the child process ...
Packit cbc316
Packit cbc316
    $pm->finish; # Terminates the child process
Packit cbc316
  }
Packit cbc316
Packit cbc316
=head1 DESCRIPTION
Packit cbc316
Packit cbc316
This module is intended for use in operations that can be done in parallel
Packit cbc316
where the number of processes to be forked off should be limited. Typical
Packit cbc316
use is a downloader which will be retrieving hundreds/thousands of files.
Packit cbc316
Packit cbc316
The code for a downloader would look something like this:
Packit cbc316
Packit cbc316
  use LWP::Simple;
Packit cbc316
  use Parallel::ForkManager;
Packit cbc316
Packit cbc316
  ...
Packit cbc316
Packit cbc316
  my @links=(
Packit cbc316
    ["http://www.foo.bar/rulez.data","rulez_data.txt"],
Packit cbc316
    ["http://new.host/more_data.doc","more_data.doc"],
Packit cbc316
    ...
Packit cbc316
  );
Packit cbc316
Packit cbc316
  ...
Packit cbc316
Packit cbc316
  # Max 30 processes for parallel download
Packit cbc316
  my $pm = Parallel::ForkManager->new(30);
Packit cbc316
Packit cbc316
  LINKS:
Packit cbc316
  foreach my $linkarray (@links) {
Packit cbc316
    $pm->start and next LINKS; # do the fork
Packit cbc316
Packit cbc316
    my ($link, $fn) = @$linkarray;
Packit cbc316
    warn "Cannot get $fn from $link"
Packit cbc316
      if getstore($link, $fn) != RC_OK;
Packit cbc316
Packit cbc316
    $pm->finish; # do the exit in the child process
Packit cbc316
  }
Packit cbc316
  $pm->wait_all_children;
Packit cbc316
Packit cbc316
First you need to instantiate the ForkManager with the "new" constructor.
Packit cbc316
You must specify the maximum number of processes to be created. If you
Packit cbc316
specify 0, then NO fork will be done; this is good for debugging purposes.
Packit cbc316
Packit cbc316
Next, use $pm->start to do the fork. $pm returns 0 for the child process,
Packit cbc316
and child pid for the parent process (see also L<perlfunc(1p)/fork()>).
Packit cbc316
The "and next" skips the internal loop in the parent process. NOTE:
Packit cbc316
$pm->start dies if the fork fails.
Packit cbc316
Packit cbc316
$pm->finish terminates the child process (assuming a fork was done in the
Packit cbc316
"start").
Packit cbc316
Packit cbc316
NOTE: You cannot use $pm->start if you are already in the child process.
Packit cbc316
If you want to manage another set of subprocesses in the child process,
Packit cbc316
you must instantiate another Parallel::ForkManager object!
Packit cbc316
Packit cbc316
=head1 METHODS
Packit cbc316
Packit cbc316
The comment letter indicates where the method should be run. P for parent,
Packit cbc316
C for child.
Packit cbc316
Packit cbc316
=over 5
Packit cbc316
Packit cbc316
=item new $processes
Packit cbc316
Packit cbc316
Instantiate a new Parallel::ForkManager object. You must specify the maximum
Packit cbc316
number of children to fork off. If you specify 0 (zero), then no children
Packit cbc316
will be forked. This is intended for debugging purposes.
Packit cbc316
Packit cbc316
The optional second parameter, $tempdir, is only used if you want the
Packit cbc316
children to send back a reference to some data (see RETRIEVING DATASTRUCTURES
Packit cbc316
below). If not provided, it is set via a call to L<File::Temp>::tempdir().
Packit cbc316
Packit cbc316
The new method will die if the temporary directory does not exist or it is not
Packit cbc316
a directory.
Packit cbc316
Packit cbc316
=item start [ $process_identifier ]
Packit cbc316
Packit cbc316
This method does the fork. It returns the pid of the child process for
Packit cbc316
the parent, and 0 for the child process. If the $processes parameter
Packit cbc316
for the constructor is 0 then, assuming you're in the child process,
Packit cbc316
$pm->start simply returns 0.
Packit cbc316
Packit cbc316
An optional $process_identifier can be provided to this method... It is used by
Packit cbc316
the "run_on_finish" callback (see CALLBACKS) for identifying the finished
Packit cbc316
process.
Packit cbc316
Packit cbc316
=item start_child [ $process_identifier, ] \&callback
Packit cbc316
Packit cbc316
Like C<start>, but will run the C<&callback> as the child. If the callback returns anything,
Packit cbc316
it'll be passed as the data to transmit back to the parent process via C<finish()>.
Packit cbc316
Packit cbc316
=item finish [ $exit_code [, $data_structure_reference] ]
Packit cbc316
Packit cbc316
Closes the child process by exiting and accepts an optional exit code
Packit cbc316
(default exit code is 0) which can be retrieved in the parent via callback.
Packit cbc316
If the second optional parameter is provided, the child attempts to send
Packit cbc316
its contents back to the parent. If you use the program in debug mode
Packit cbc316
($processes == 0), this method just calls the callback.
Packit cbc316
Packit cbc316
If the $data_structure_reference is provided, then it is serialized and
Packit cbc316
passed to the parent process. See RETRIEVING DATASTRUCTURES for more info.
Packit cbc316
Packit cbc316
=item set_max_procs $processes
Packit cbc316
Packit cbc316
Allows you to set a new maximum number of children to maintain.
Packit cbc316
Packit cbc316
=item wait_all_children
Packit cbc316
Packit cbc316
You can call this method to wait for all the processes which have been
Packit cbc316
forked. This is a blocking wait.
Packit cbc316
Packit cbc316
=item reap_finished_children
Packit cbc316
Packit cbc316
This is a non-blocking call to reap children and execute callbacks independent
Packit cbc316
of calls to "start" or "wait_all_children". Use this in scenarios where "start"
Packit cbc316
is called infrequently but you would like the callbacks executed quickly.
Packit cbc316
Packit cbc316
=item is_parent
Packit cbc316
Packit cbc316
Returns C<true> if within the parent or C<false> if within the child.
Packit cbc316
Packit cbc316
=item is_child
Packit cbc316
Packit cbc316
Returns C<true> if within the child or C<false> if within the parent.
Packit cbc316
Packit cbc316
=item max_procs 
Packit cbc316
Packit cbc316
Returns the maximal number of processes the object will fork.
Packit cbc316
Packit cbc316
=item running_procs
Packit cbc316
Packit cbc316
Returns the pids of the forked processes currently monitored by the
Packit cbc316
C<Parallel::ForkManager>. Note that children are still reported as running
Packit cbc316
until the fork manager harvest them, via the next call to
Packit cbc316
C<start> or C<wait_all_children>.
Packit cbc316
Packit cbc316
    my @pids = $pm->running_procs;
Packit cbc316
Packit cbc316
    my $nbr_children =- $pm->running_procs;
Packit cbc316
Packit cbc316
=item wait_for_available_procs( $n )
Packit cbc316
Packit cbc316
Wait until C<$n> available process slots are available.
Packit cbc316
If C<$n> is not given, defaults to I<1>.
Packit cbc316
Packit cbc316
=item waitpid_blocking_sleep 
Packit cbc316
Packit cbc316
Returns the sleep period, in seconds, of the pseudo-blocking calls. The sleep
Packit cbc316
period can be a fraction of second. 
Packit cbc316
Packit cbc316
Returns C<0> if disabled. 
Packit cbc316
Packit cbc316
Defaults to 1 second.
Packit cbc316
Packit cbc316
See I<BLOCKING CALLS> for more details.
Packit cbc316
Packit cbc316
=item set_waitpid_blocking_sleep $seconds
Packit cbc316
Packit cbc316
Sets the the sleep period, in seconds, of the pseudo-blocking calls.
Packit cbc316
Set to C<0> to disable.
Packit cbc316
Packit cbc316
See I<BLOCKING CALLS> for more details.
Packit cbc316
Packit cbc316
=back
Packit cbc316
Packit cbc316
=head1 CALLBACKS
Packit cbc316
Packit cbc316
You can define callbacks in the code, which are called on events like starting
Packit cbc316
a process or upon finish. Declare these before the first call to start().
Packit cbc316
Packit cbc316
The callbacks can be defined with the following methods:
Packit cbc316
Packit cbc316
=over 4
Packit cbc316
Packit cbc316
=item run_on_finish $code [, $pid ]
Packit cbc316
Packit cbc316
You can define a subroutine which is called when a child is terminated. It is
Packit cbc316
called in the parent process.
Packit cbc316
Packit cbc316
The parameters of the $code are the following:
Packit cbc316
Packit cbc316
  - pid of the process, which is terminated
Packit cbc316
  - exit code of the program
Packit cbc316
  - identification of the process (if provided in the "start" method)
Packit cbc316
  - exit signal (0-127: signal name)
Packit cbc316
  - core dump (1 if there was core dump at exit)
Packit cbc316
  - datastructure reference or undef (see RETRIEVING DATASTRUCTURES)
Packit cbc316
Packit cbc316
=item run_on_start $code
Packit cbc316
Packit cbc316
You can define a subroutine which is called when a child is started. It called
Packit cbc316
after the successful startup of a child in the parent process.
Packit cbc316
Packit cbc316
The parameters of the $code are the following:
Packit cbc316
Packit cbc316
  - pid of the process which has been started
Packit cbc316
  - identification of the process (if provided in the "start" method)
Packit cbc316
Packit cbc316
=item run_on_wait $code, [$period]
Packit cbc316
Packit cbc316
You can define a subroutine which is called when the child process needs to wait
Packit cbc316
for the startup. If $period is not defined, then one call is done per
Packit cbc316
child. If $period is defined, then $code is called periodically and the
Packit cbc316
module waits for $period seconds between the two calls. Note, $period can be
Packit cbc316
fractional number also. The exact "$period seconds" is not guaranteed,
Packit cbc316
signals can shorten and the process scheduler can make it longer (on busy
Packit cbc316
systems).
Packit cbc316
Packit cbc316
The $code called in the "start" and the "wait_all_children" method also.
Packit cbc316
Packit cbc316
No parameters are passed to the $code on the call.
Packit cbc316
Packit cbc316
=back
Packit cbc316
Packit cbc316
=head1 BLOCKING CALLS
Packit cbc316
Packit cbc316
When it comes to waiting for child processes to terminate, C<Parallel::ForkManager> is between 
Packit cbc316
a fork and a hard place (if you excuse the terrible pun). The underlying Perl C<waitpid> function
Packit cbc316
that the module relies on can block until either one specific or any child process 
Packit cbc316
terminate, but not for a process part of a given group.
Packit cbc316
Packit cbc316
This means that the module can do one of two things when it waits for 
Packit cbc316
one of its child processes to terminate:
Packit cbc316
Packit cbc316
=over
Packit cbc316
Packit cbc316
=item Only wait for its own child processes
Packit cbc316
Packit cbc316
This is done via a loop using a C<waitpid> non-blocking call and a sleep statement.
Packit cbc316
The code does something along the lines of
Packit cbc316
Packit cbc316
    while(1) {
Packit cbc316
        if ( any of the P::FM child process terminated ) {
Packit cbc316
            return its pid
Packit cbc316
        }
Packit cbc316
Packit cbc316
        sleep $sleep_period
Packit cbc316
    }
Packit cbc316
Packit cbc316
This is the default behavior that the module will use.
Packit cbc316
This is not the most efficient way to wait for child processes, but it's
Packit cbc316
the safest way to ensure that C<Parallel::ForkManager> won't interfere with 
Packit cbc316
any other part of the codebase. 
Packit cbc316
Packit cbc316
The sleep period is set via the method C<set_waitpid_blocking_sleep>.
Packit cbc316
Packit cbc316
=item Block until any process terminate
Packit cbc316
Packit cbc316
Alternatively, C<Parallel::ForkManager> can call C<waitpid> such that it will
Packit cbc316
block until any child process terminate. If the child process was not one of
Packit cbc316
the monitored subprocesses, the wait will resume. This is more efficient, but mean
Packit cbc316
that C<P::FM> can captures (and discards) the termination notification that a different
Packit cbc316
part of the code might be waiting for. 
Packit cbc316
Packit cbc316
If this is a race condition
Packit cbc316
that doesn't apply to your codebase, you can set the 
Packit cbc316
I<waitpid_blocking_sleep> period to C<0>, which will enable C<waitpid> call blocking.
Packit cbc316
Packit cbc316
    my $pm = Parallel::ForkManager->new( 4 );
Packit cbc316
Packit cbc316
    $pm->set_waitpid_blocking_sleep(0);  # true blocking calls enabled
Packit cbc316
Packit cbc316
    for ( 1..100 ) {
Packit cbc316
        $pm->start and next;
Packit cbc316
Packit cbc316
        ...; # do work
Packit cbc316
Packit cbc316
        $pm->finish;
Packit cbc316
    }
Packit cbc316
Packit cbc316
=back
Packit cbc316
Packit cbc316
=head1 RETRIEVING DATASTRUCTURES from child processes
Packit cbc316
Packit cbc316
The ability for the parent to retrieve data structures is new as of version
Packit cbc316
0.7.6.
Packit cbc316
Packit cbc316
Each child process may optionally send 1 data structure back to the parent.
Packit cbc316
By data structure, we mean a reference to a string, hash or array. The
Packit cbc316
contents of the data structure are written out to temporary files on disc
Packit cbc316
using the L<Storable> modules' store() method. The reference is then
Packit cbc316
retrieved from within the code you send to the run_on_finish callback.
Packit cbc316
Packit cbc316
The data structure can be any scalar perl data structure which makes sense:
Packit cbc316
string, numeric value or a reference to an array, hash or object.
Packit cbc316
Packit cbc316
There are 2 steps involved in retrieving data structures:
Packit cbc316
Packit cbc316
1) A reference to the data structure the child wishes to send back to the
Packit cbc316
parent is provided as the second argument to the finish() call. It is up
Packit cbc316
to the child to decide whether or not to send anything back to the parent.
Packit cbc316
Packit cbc316
2) The data structure reference is retrieved using the callback provided in
Packit cbc316
the run_on_finish() method.
Packit cbc316
Packit cbc316
Keep in mind that data structure retrieval is not the same as returning a
Packit cbc316
data structure from a method call. That is not what actually occurs. The
Packit cbc316
data structure referenced in a given child process is serialized and
Packit cbc316
written out to a file by L<Storable>. The file is subsequently read back
Packit cbc316
into memory and a new data structure belonging to the parent process is
Packit cbc316
created. Please consider the performance penalty it can imply, so try to
Packit cbc316
keep the returned structure small.
Packit cbc316
Packit cbc316
=head1 EXAMPLES
Packit cbc316
Packit cbc316
=head2 Parallel get
Packit cbc316
Packit cbc316
This small example can be used to get URLs in parallel.
Packit cbc316
Packit cbc316
  use Parallel::ForkManager;
Packit cbc316
  use LWP::Simple;
Packit cbc316
Packit cbc316
  my $pm = Parallel::ForkManager->new(10);
Packit cbc316
Packit cbc316
  LINKS:
Packit cbc316
  for my $link (@ARGV) {
Packit cbc316
    $pm->start and next LINKS;
Packit cbc316
    my ($fn) = $link =~ /^.*\/(.*?)$/;
Packit cbc316
    if (!$fn) {
Packit cbc316
      warn "Cannot determine filename from $fn\n";
Packit cbc316
    } else {
Packit cbc316
      $0 .= " " . $fn;
Packit cbc316
      print "Getting $fn from $link\n";
Packit cbc316
      my $rc = getstore($link, $fn);
Packit cbc316
      print "$link downloaded. response code: $rc\n";
Packit cbc316
    };
Packit cbc316
    $pm->finish;
Packit cbc316
  };
Packit cbc316
Packit cbc316
=head2 Callbacks
Packit cbc316
Packit cbc316
Example of a program using callbacks to get child exit codes:
Packit cbc316
Packit cbc316
  use strict;
Packit cbc316
  use Parallel::ForkManager;
Packit cbc316
Packit cbc316
  my $max_procs = 5;
Packit cbc316
  my @names = qw( Fred Jim Lily Steve Jessica Bob Dave Christine Rico Sara );
Packit cbc316
  # hash to resolve PID's back to child specific information
Packit cbc316
Packit cbc316
  my $pm = Parallel::ForkManager->new($max_procs);
Packit cbc316
Packit cbc316
  # Setup a callback for when a child finishes up so we can
Packit cbc316
  # get it's exit code
Packit cbc316
  $pm->run_on_finish( sub {
Packit cbc316
      my ($pid, $exit_code, $ident) = @_;
Packit cbc316
      print "** $ident just got out of the pool ".
Packit cbc316
        "with PID $pid and exit code: $exit_code\n";
Packit cbc316
  });
Packit cbc316
Packit cbc316
  $pm->run_on_start( sub {
Packit cbc316
      my ($pid, $ident)=@_;
Packit cbc316
      print "** $ident started, pid: $pid\n";
Packit cbc316
  });
Packit cbc316
Packit cbc316
  $pm->run_on_wait( sub {
Packit cbc316
      print "** Have to wait for one children ...\n"
Packit cbc316
    },
Packit cbc316
    0.5
Packit cbc316
  );
Packit cbc316
Packit cbc316
  NAMES:
Packit cbc316
  foreach my $child ( 0 .. $#names ) {
Packit cbc316
    my $pid = $pm->start($names[$child]) and next NAMES;
Packit cbc316
Packit cbc316
    # This code is the child process
Packit cbc316
    print "This is $names[$child], Child number $child\n";
Packit cbc316
    sleep ( 2 * $child );
Packit cbc316
    print "$names[$child], Child $child is about to get out...\n";
Packit cbc316
    sleep 1;
Packit cbc316
    $pm->finish($child); # pass an exit code to finish
Packit cbc316
  }
Packit cbc316
Packit cbc316
  print "Waiting for Children...\n";
Packit cbc316
  $pm->wait_all_children;
Packit cbc316
  print "Everybody is out of the pool!\n";
Packit cbc316
Packit cbc316
=head2 Data structure retrieval
Packit cbc316
Packit cbc316
In this simple example, each child sends back a string reference.
Packit cbc316
Packit cbc316
  use Parallel::ForkManager 0.7.6;
Packit cbc316
  use strict;
Packit cbc316
Packit cbc316
  my $pm = Parallel::ForkManager->new(2, '/server/path/to/temp/dir/');
Packit cbc316
Packit cbc316
  # data structure retrieval and handling
Packit cbc316
  $pm -> run_on_finish ( # called BEFORE the first call to start()
Packit cbc316
    sub {
Packit cbc316
      my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure_reference) = @_;
Packit cbc316
Packit cbc316
      # retrieve data structure from child
Packit cbc316
      if (defined($data_structure_reference)) {  # children are not forced to send anything
Packit cbc316
        my $string = ${$data_structure_reference};  # child passed a string reference
Packit cbc316
        print "$string\n";
Packit cbc316
      }
Packit cbc316
      else {  # problems occurring during storage or retrieval will throw a warning
Packit cbc316
        print qq|No message received from child process $pid!\n|;
Packit cbc316
      }
Packit cbc316
    }
Packit cbc316
  );
Packit cbc316
Packit cbc316
  # prep random statement components
Packit cbc316
  my @foods = ('chocolate', 'ice cream', 'peanut butter', 'pickles', 'pizza', 'bacon', 'pancakes', 'spaghetti', 'cookies');
Packit cbc316
  my @preferences = ('loves', q|can't stand|, 'always wants more', 'will walk 100 miles for', 'only eats', 'would starve rather than eat');
Packit cbc316
Packit cbc316
  # run the parallel processes
Packit cbc316
  PERSONS:
Packit cbc316
  foreach my $person (qw(Fred Wilma Ernie Bert Lucy Ethel Curly Moe Larry)) {
Packit cbc316
    $pm->start() and next PERSONS;
Packit cbc316
Packit cbc316
    # generate a random statement about food preferences
Packit cbc316
    my $statement = $person . ' ' . $preferences[int(rand @preferences)] . ' ' . $foods[int(rand @foods)];
Packit cbc316
Packit cbc316
    # send it back to the parent process
Packit cbc316
    $pm->finish(0, \$statement);  # note that it's a scalar REFERENCE, not the scalar itself
Packit cbc316
  }
Packit cbc316
  $pm->wait_all_children;
Packit cbc316
Packit cbc316
A second datastructure retrieval example demonstrates how children decide
Packit cbc316
whether or not to send anything back, what to send and how the parent should
Packit cbc316
process whatever is retrieved.
Packit cbc316
Packit cbc316
=for example begin
Packit cbc316
Packit cbc316
  use Parallel::ForkManager 0.7.6;
Packit cbc316
  use Data::Dumper;  # to display the data structures retrieved.
Packit cbc316
  use strict;
Packit cbc316
Packit cbc316
  my $pm = Parallel::ForkManager->new(20);  # using the system temp dir $L
Packit cbc316
Packit cbc316
  # data structure retrieval and handling
Packit cbc316
  my %retrieved_responses = ();  # for collecting responses
Packit cbc316
  $pm -> run_on_finish (
Packit cbc316
    sub {
Packit cbc316
      my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure_reference) = @_;
Packit cbc316
Packit cbc316
      # see what the child sent us, if anything
Packit cbc316
      if (defined($data_structure_reference)) {  # test rather than assume child sent anything
Packit cbc316
        my $reftype = ref($data_structure_reference);
Packit cbc316
        print qq|ident "$ident" returned a "$reftype" reference.\n\n|;
Packit cbc316
        if (1) {  # simple on/off switch to display the contents
Packit cbc316
          print &Dumper($data_structure_reference) . qq|end of "$ident" sent structure\n\n|;
Packit cbc316
        }
Packit cbc316
Packit cbc316
        # we can also collect retrieved data structures for processing after all children have exited
Packit cbc316
        $retrieved_responses{$ident} = $data_structure_reference;
Packit cbc316
      } else {
Packit cbc316
        print qq|ident "$ident" did not send anything.\n\n|;
Packit cbc316
      }
Packit cbc316
    }
Packit cbc316
  );
Packit cbc316
Packit cbc316
  # generate a list of instructions
Packit cbc316
  my @instructions = (  # a unique identifier and what the child process should send
Packit cbc316
    {'name' => '%ENV keys as a string', 'send' => 'keys'},
Packit cbc316
    {'name' => 'Send Nothing'},  # not instructing the child to send anything back to the parent
Packit cbc316
    {'name' => 'Childs %ENV', 'send' => 'all'},
Packit cbc316
    {'name' => 'Child chooses randomly', 'send' => 'random'},
Packit cbc316
    {'name' => 'Invalid send instructions', 'send' => 'Na Na Nana Na'},
Packit cbc316
    {'name' => 'ENV values in an array', 'send' => 'values'},
Packit cbc316
  );
Packit cbc316
Packit cbc316
  INSTRUCTS:
Packit cbc316
  foreach my $instruction (@instructions) {
Packit cbc316
    $pm->start($instruction->{'name'}) and next INSTRUCTS;  # this time we are using an explicit, unique child process identifier
Packit cbc316
Packit cbc316
    # last step in child processing
Packit cbc316
    $pm->finish(0) unless $instruction->{'send'};  # no data structure is sent unless this child is told what to send.
Packit cbc316
Packit cbc316
    if ($instruction->{'send'} eq 'keys') {
Packit cbc316
      $pm->finish(0, \join(', ', keys %ENV));
Packit cbc316
Packit cbc316
    } elsif ($instruction->{'send'} eq 'values') {
Packit cbc316
      $pm->finish(0, [values %ENV]);  # kinda useless without knowing which keys they belong to...
Packit cbc316
Packit cbc316
    } elsif ($instruction->{'send'} eq 'all') {
Packit cbc316
      $pm->finish(0, \%ENV);  # remember, we are not "returning" anything, just copying the hash to disc
Packit cbc316
Packit cbc316
    # demonstrate clearly that the child determines what type of reference to send
Packit cbc316
    } elsif ($instruction->{'send'} eq 'random') {
Packit cbc316
      my $string = q|I'm just a string.|;
Packit cbc316
      my @array = qw(I am an array);
Packit cbc316
      my %hash = (type => 'associative array', synonym => 'hash', cool => 'very :)');
Packit cbc316
      my $return_choice = ('string', 'array', 'hash')[int(rand 3)];  # randomly choose return data type
Packit cbc316
      $pm->finish(0, \$string) if ($return_choice eq 'string');
Packit cbc316
      $pm->finish(0, \@array) if ($return_choice eq 'array');
Packit cbc316
      $pm->finish(0, \%hash) if ($return_choice eq 'hash');
Packit cbc316
Packit cbc316
    # as a responsible child, inform parent that their instruction was invalid
Packit cbc316
    } else {
Packit cbc316
      $pm->finish(0, \qq|Invalid instructions: "$instruction->{'send'}".|);  # ordinarily I wouldn't include invalid input in a response...
Packit cbc316
    }
Packit cbc316
  }
Packit cbc316
  $pm->wait_all_children;  # blocks until all forked processes have exited
Packit cbc316
Packit cbc316
  # post fork processing of returned data structures
Packit cbc316
  for (sort keys %retrieved_responses) {
Packit cbc316
    print qq|Post processing "$_"...\n|;
Packit cbc316
  }
Packit cbc316
Packit cbc316
=for example end
Packit cbc316
Packit cbc316
=head1 SECURITY
Packit cbc316
Packit cbc316
Parallel::ForkManager uses temporary files when 
Packit cbc316
a child process returns information to its parent process. The filenames are
Packit cbc316
based on the process of the parent and child processes, so they are 
Packit cbc316
fairly easy to guess. So if security is a concern in your environment, make sure
Packit cbc316
the directory used by Parallel::ForkManager is restricted to the current user
Packit cbc316
only (the default behavior is to create a directory,
Packit cbc316
via L<File::Temp>'s C<tempdir>, which does that).
Packit cbc316
Packit cbc316
=head1 TROUBLESHOOTING
Packit cbc316
Packit cbc316
=head2 PerlIO::gzip and Parallel::ForkManager do not play nice together
Packit cbc316
Packit cbc316
If you are using L<PerlIO::gzip> in your child processes, you may end up with 
Packit cbc316
garbled files. This is not really P::FM's fault, but rather a problem between
Packit cbc316
L<PerlIO::gzip> and C<fork()> (see L<https://rt.cpan.org/Public/Bug/Display.html?id=114557>).
Packit cbc316
Packit cbc316
Fortunately, it seems there is an easy way to fix the problem by
Packit cbc316
adding the "unix" layer? I.e.,
Packit cbc316
Packit cbc316
    open(IN, '<:unix:gzip', ...
Packit cbc316
Packit cbc316
=head1 BUGS AND LIMITATIONS
Packit cbc316
Packit cbc316
Do not use Parallel::ForkManager in an environment, where other child
Packit cbc316
processes can affect the run of the main program, so using this module
Packit cbc316
is not recommended in an environment where fork() / wait() is already used.
Packit cbc316
Packit cbc316
If you want to use more than one copies of the Parallel::ForkManager, then
Packit cbc316
you have to make sure that all children processes are terminated, before you
Packit cbc316
use the second object in the main program.
Packit cbc316
Packit cbc316
You are free to use a new copy of Parallel::ForkManager in the child
Packit cbc316
processes, although I don't think it makes sense.
Packit cbc316
Packit cbc316
=head1 CREDITS
Packit cbc316
Packit cbc316
  Michael Gang (bug report)
Packit cbc316
  Noah Robin <sitz@onastick.net> (documentation tweaks)
Packit cbc316
  Chuck Hirstius <chirstius@megapathdsl.net> (callback exit status, example)
Packit cbc316
  Grant Hopwood <hopwoodg@valero.com> (win32 port)
Packit cbc316
  Mark Southern <mark_southern@merck.com> (bugfix)
Packit cbc316
  Ken Clarke <www.perlprogrammer.net>  (datastructure retrieval)
Packit cbc316
Packit cbc316
=head1 AUTHORS
Packit cbc316
Packit cbc316
=over 4
Packit cbc316
Packit cbc316
=item *
Packit cbc316
Packit Service 6fff66
dLux (Szabó, Balázs) <dlux@dlux.hu>
Packit cbc316
Packit cbc316
=item *
Packit cbc316
Packit cbc316
Yanick Champoux <yanick@cpan.org>
Packit cbc316
Packit cbc316
=item *
Packit cbc316
Packit cbc316
Gabor Szabo <gabor@szabgab.com>
Packit cbc316
Packit cbc316
=back
Packit cbc316
Packit cbc316
=head1 COPYRIGHT AND LICENSE
Packit cbc316
Packit Service 6fff66
This software is copyright (c) 2000 by Balázs Szabó.
Packit cbc316
Packit cbc316
This is free software; you can redistribute it and/or modify it under
Packit cbc316
the same terms as the Perl 5 programming language system itself.
Packit cbc316
Packit cbc316
=cut