|
Packit |
cbc316 |
package Parallel::ForkManager;
|
|
Packit |
cbc316 |
our $AUTHORITY = 'cpan:DLUX';
|
|
Packit |
cbc316 |
# ABSTRACT: A simple parallel processing fork manager
|
|
Packit |
cbc316 |
$Parallel::ForkManager::VERSION = '1.19';
|
|
Packit |
cbc316 |
use POSIX ":sys_wait_h";
|
|
Packit |
cbc316 |
use Storable qw(store retrieve);
|
|
Packit |
cbc316 |
use File::Spec;
|
|
Packit |
cbc316 |
use File::Temp ();
|
|
Packit |
cbc316 |
use File::Path ();
|
|
Packit |
cbc316 |
use Carp;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
use strict;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub new {
|
|
Packit |
cbc316 |
my ($c,$processes,$tempdir)=@_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my $h={
|
|
Packit |
cbc316 |
max_proc => $processes,
|
|
Packit |
cbc316 |
processes => {},
|
|
Packit |
cbc316 |
in_child => 0,
|
|
Packit |
cbc316 |
parent_pid => $$,
|
|
Packit |
cbc316 |
auto_cleanup => ($tempdir ? 0 : 1),
|
|
Packit |
cbc316 |
waitpid_blocking_sleep => 1,
|
|
Packit |
cbc316 |
};
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# determine temporary directory for storing data structures
|
|
Packit |
cbc316 |
# add it to Parallel::ForkManager object so children can use it
|
|
Packit |
cbc316 |
# We don't let it clean up so it won't do it in the child process
|
|
Packit |
cbc316 |
# but we have our own DESTROY to do that.
|
|
Packit |
cbc316 |
if (not defined($tempdir) or not length($tempdir)) {
|
|
Packit |
cbc316 |
$tempdir = File::Temp::tempdir(CLEANUP => 0);
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
die qq|Temporary directory "$tempdir" doesn't exist or is not a directory.| unless (-e $tempdir && -d _); # ensure temp dir exists and is indeed a directory
|
|
Packit |
cbc316 |
$h->{tempdir} = $tempdir;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
return bless($h,ref($c)||$c);
|
|
Packit |
cbc316 |
};
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub start {
|
|
Packit |
cbc316 |
my ($s,$identification)=@_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
die "Cannot start another process while you are in the child process"
|
|
Packit |
cbc316 |
if $s->{in_child};
|
|
Packit |
cbc316 |
while ($s->{max_proc} && ( keys %{ $s->{processes} } ) >= $s->{max_proc}) {
|
|
Packit |
cbc316 |
$s->on_wait;
|
|
Packit |
cbc316 |
$s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
|
|
Packit |
cbc316 |
};
|
|
Packit |
cbc316 |
$s->wait_children;
|
|
Packit |
cbc316 |
if ($s->{max_proc}) {
|
|
Packit |
cbc316 |
my $pid=fork();
|
|
Packit |
cbc316 |
die "Cannot fork: $!" if !defined $pid;
|
|
Packit |
cbc316 |
if ($pid) {
|
|
Packit |
cbc316 |
$s->{processes}->{$pid}=$identification;
|
|
Packit |
cbc316 |
$s->on_start($pid,$identification);
|
|
Packit |
cbc316 |
} else {
|
|
Packit |
cbc316 |
$s->{in_child}=1 if !$pid;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
return $pid;
|
|
Packit |
cbc316 |
} else {
|
|
Packit |
cbc316 |
$s->{processes}->{$$}=$identification;
|
|
Packit |
cbc316 |
$s->on_start($$,$identification);
|
|
Packit |
cbc316 |
return 0; # Simulating the child which returns 0
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub start_child {
|
|
Packit |
cbc316 |
my $self = shift;
|
|
Packit |
cbc316 |
my $sub = pop;
|
|
Packit |
cbc316 |
my $identification = shift;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$self->start( $identification ) # in the parent
|
|
Packit |
cbc316 |
# ... or the child
|
|
Packit |
cbc316 |
or $self->finish( 0, $sub->() );
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub finish {
|
|
Packit |
cbc316 |
my ($s, $x, $r)=@_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
if ( $s->{in_child} ) {
|
|
Packit |
cbc316 |
if (defined($r)) { # store the child's data structure
|
|
Packit |
cbc316 |
my $storable_tempfile = File::Spec->catfile($s->{tempdir}, 'Parallel-ForkManager-' . $s->{parent_pid} . '-' . $$ . '.txt');
|
|
Packit |
cbc316 |
my $stored = eval { return &store($r, $storable_tempfile); };
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# handle Storables errors, IE logcarp or carp returning undef, or die (via logcroak or croak)
|
|
Packit |
cbc316 |
if (not $stored or $@) {
|
|
Packit |
cbc316 |
warn(qq|The storable module was unable to store the child's data structure to the temp file "$storable_tempfile": | . join(', ', $@));
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
CORE::exit($x || 0);
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
if ($s->{max_proc} == 0) { # max_proc == 0
|
|
Packit |
cbc316 |
$s->on_finish($$, $x ,$s->{processes}->{$$}, 0, 0, $r);
|
|
Packit |
cbc316 |
delete $s->{processes}->{$$};
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
return 0;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub wait_children {
|
|
Packit |
cbc316 |
my ($s)=@_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
return if !keys %{$s->{processes}};
|
|
Packit |
cbc316 |
my $kid;
|
|
Packit |
cbc316 |
do {
|
|
Packit |
cbc316 |
$kid = $s->wait_one_child(&WNOHANG);
|
|
Packit |
cbc316 |
} while defined $kid and ( $kid > 0 or $kid < -1 ); # AS 5.6/Win32 returns negative PIDs
|
|
Packit |
cbc316 |
};
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
*wait_childs=*wait_children; # compatibility
|
|
Packit |
cbc316 |
*reap_finished_children=*wait_children; # behavioral synonym for clarity
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub wait_one_child {
|
|
Packit |
cbc316 |
my ($s,$par)=@_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my $kid;
|
|
Packit |
cbc316 |
while (1) {
|
|
Packit |
cbc316 |
$kid = $s->_waitpid(-1,$par||=0);
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
last unless defined $kid;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs
|
|
Packit |
cbc316 |
redo if !exists $s->{processes}->{$kid};
|
|
Packit |
cbc316 |
my $id = delete $s->{processes}->{$kid};
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# retrieve child data structure, if any
|
|
Packit |
cbc316 |
my $retrieved = undef;
|
|
Packit |
cbc316 |
my $storable_tempfile = File::Spec->catfile($s->{tempdir}, 'Parallel-ForkManager-' . $s->{parent_pid} . '-' . $kid . '.txt');
|
|
Packit |
cbc316 |
if (-e $storable_tempfile) { # child has option of not storing anything, so we need to see if it did or not
|
|
Packit |
cbc316 |
$retrieved = eval { return &retrieve($storable_tempfile); };
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# handle Storables errors
|
|
Packit |
cbc316 |
if (not $retrieved or $@) {
|
|
Packit |
cbc316 |
warn(qq|The storable module was unable to retrieve the child's data structure from the temporary file "$storable_tempfile": | . join(', ', $@));
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# clean up after ourselves
|
|
Packit |
cbc316 |
unlink $storable_tempfile;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0, $retrieved);
|
|
Packit |
cbc316 |
last;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
$kid;
|
|
Packit |
cbc316 |
};
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub wait_all_children {
|
|
Packit |
cbc316 |
my ($s)=@_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
while (keys %{ $s->{processes} }) {
|
|
Packit |
cbc316 |
$s->on_wait;
|
|
Packit |
cbc316 |
$s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
|
|
Packit |
cbc316 |
};
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
*wait_all_childs=*wait_all_children; # compatibility;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub max_procs { $_[0]->{max_proc}; }
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub is_child { $_[0]->{in_child} }
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub is_parent { !$_[0]->{in_child} }
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub running_procs {
|
|
Packit |
cbc316 |
my $self = shift;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my @pids = keys %{ $self->{processes} };
|
|
Packit |
cbc316 |
return @pids;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub wait_for_available_procs {
|
|
Packit |
cbc316 |
my( $self, $nbr ) = @_;
|
|
Packit |
cbc316 |
$nbr ||= 1;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
croak "nbr processes '$nbr' higher than the max nbr of processes (@{[ $self->max_procs ]})"
|
|
Packit |
cbc316 |
if $nbr > $self->max_procs;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$self->wait_one_child until $self->max_procs - $self->running_procs >= $nbr;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub run_on_finish {
|
|
Packit |
cbc316 |
my ($s,$code,$pid)=@_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$s->{on_finish}->{$pid || 0}=$code;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub on_finish {
|
|
Packit |
cbc316 |
my ($s,$pid,@par)=@_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0;
|
|
Packit |
cbc316 |
$code->($pid,@par);
|
|
Packit |
cbc316 |
};
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub run_on_wait {
|
|
Packit |
cbc316 |
my ($s,$code, $period)=@_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$s->{on_wait}=$code;
|
|
Packit |
cbc316 |
$s->{on_wait_period} = $period;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub on_wait {
|
|
Packit |
cbc316 |
my ($s)=@_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
if(ref($s->{on_wait}) eq 'CODE') {
|
|
Packit |
cbc316 |
$s->{on_wait}->();
|
|
Packit |
cbc316 |
if (defined $s->{on_wait_period}) {
|
|
Packit |
cbc316 |
local $SIG{CHLD} = sub { } if ! defined $SIG{CHLD};
|
|
Packit |
cbc316 |
select undef, undef, undef, $s->{on_wait_period}
|
|
Packit |
cbc316 |
};
|
|
Packit |
cbc316 |
};
|
|
Packit |
cbc316 |
};
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub run_on_start {
|
|
Packit |
cbc316 |
my ($s,$code)=@_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$s->{on_start}=$code;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub on_start {
|
|
Packit |
cbc316 |
my ($s,@par)=@_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$s->{on_start}->(@par) if ref($s->{on_start}) eq 'CODE';
|
|
Packit |
cbc316 |
};
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub set_max_procs {
|
|
Packit |
cbc316 |
my ($s, $mp)=@_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$s->{max_proc} = $mp;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub set_waitpid_blocking_sleep {
|
|
Packit |
cbc316 |
my( $self, $period ) = @_;
|
|
Packit |
cbc316 |
$self->{waitpid_blocking_sleep} = $period;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub waitpid_blocking_sleep {
|
|
Packit |
cbc316 |
$_[0]->{waitpid_blocking_sleep};
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub _waitpid { # Call waitpid() in the standard Unix fashion.
|
|
Packit |
cbc316 |
my( $self, undef, $flag ) = @_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
return $flag ? $self->_waitpid_non_blocking : $self->_waitpid_blocking;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub _waitpid_non_blocking {
|
|
Packit |
cbc316 |
my $self = shift;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
for my $pid ( $self->running_procs ) {
|
|
Packit |
cbc316 |
my $p = waitpid $pid, &WNOHANG or next;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
return $pid if $p != -1;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
warn "child process '$pid' disappeared. A call to `waitpid` outside of Parallel::ForkManager might have reaped it.\n";
|
|
Packit |
cbc316 |
# it's gone. let's clean the process entry
|
|
Packit |
cbc316 |
delete $self->{processes}{$pid};
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
return;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub _waitpid_blocking {
|
|
Packit |
cbc316 |
my $self = shift;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# pseudo-blocking
|
|
Packit |
cbc316 |
if( my $sleep_period = $self->{waitpid_blocking_sleep} ) {
|
|
Packit |
cbc316 |
while() {
|
|
Packit |
cbc316 |
my $pid = $self->_waitpid_non_blocking;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
return $pid if defined $pid;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
return unless $self->running_procs;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
select undef, undef, undef, $sleep_period;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
return waitpid -1, 0;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sub DESTROY {
|
|
Packit |
cbc316 |
my ($self) = @_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
if ($self->{auto_cleanup} && $self->{parent_pid} == $$ && -d $self->{tempdir}) {
|
|
Packit |
cbc316 |
File::Path::remove_tree($self->{tempdir});
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
1;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
__END__
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=pod
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=encoding UTF-8
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 NAME
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Parallel::ForkManager - A simple parallel processing fork manager
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 VERSION
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
version 1.19
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 SYNOPSIS
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
use Parallel::ForkManager;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my $pm = Parallel::ForkManager->new($MAX_PROCESSES);
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
DATA_LOOP:
|
|
Packit |
cbc316 |
foreach my $data (@all_data) {
|
|
Packit |
cbc316 |
# Forks and returns the pid for the child:
|
|
Packit |
cbc316 |
my $pid = $pm->start and next DATA_LOOP;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
... do some work with $data in the child process ...
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$pm->finish; # Terminates the child process
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 DESCRIPTION
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
This module is intended for use in operations that can be done in parallel
|
|
Packit |
cbc316 |
where the number of processes to be forked off should be limited. Typical
|
|
Packit |
cbc316 |
use is a downloader which will be retrieving hundreds/thousands of files.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
The code for a downloader would look something like this:
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
use LWP::Simple;
|
|
Packit |
cbc316 |
use Parallel::ForkManager;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
...
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my @links=(
|
|
Packit |
cbc316 |
["http://www.foo.bar/rulez.data","rulez_data.txt"],
|
|
Packit |
cbc316 |
["http://new.host/more_data.doc","more_data.doc"],
|
|
Packit |
cbc316 |
...
|
|
Packit |
cbc316 |
);
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
...
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# Max 30 processes for parallel download
|
|
Packit |
cbc316 |
my $pm = Parallel::ForkManager->new(30);
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
LINKS:
|
|
Packit |
cbc316 |
foreach my $linkarray (@links) {
|
|
Packit |
cbc316 |
$pm->start and next LINKS; # do the fork
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my ($link, $fn) = @$linkarray;
|
|
Packit |
cbc316 |
warn "Cannot get $fn from $link"
|
|
Packit |
cbc316 |
if getstore($link, $fn) != RC_OK;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$pm->finish; # do the exit in the child process
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
$pm->wait_all_children;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
First you need to instantiate the ForkManager with the "new" constructor.
|
|
Packit |
cbc316 |
You must specify the maximum number of processes to be created. If you
|
|
Packit |
cbc316 |
specify 0, then NO fork will be done; this is good for debugging purposes.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Next, use $pm->start to do the fork. $pm returns 0 for the child process,
|
|
Packit |
cbc316 |
and child pid for the parent process (see also L<perlfunc(1p)/fork()>).
|
|
Packit |
cbc316 |
The "and next" skips the internal loop in the parent process. NOTE:
|
|
Packit |
cbc316 |
$pm->start dies if the fork fails.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$pm->finish terminates the child process (assuming a fork was done in the
|
|
Packit |
cbc316 |
"start").
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
NOTE: You cannot use $pm->start if you are already in the child process.
|
|
Packit |
cbc316 |
If you want to manage another set of subprocesses in the child process,
|
|
Packit |
cbc316 |
you must instantiate another Parallel::ForkManager object!
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 METHODS
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
The comment letter indicates where the method should be run. P for parent,
|
|
Packit |
cbc316 |
C for child.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=over 5
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item new $processes
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Instantiate a new Parallel::ForkManager object. You must specify the maximum
|
|
Packit |
cbc316 |
number of children to fork off. If you specify 0 (zero), then no children
|
|
Packit |
cbc316 |
will be forked. This is intended for debugging purposes.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
The optional second parameter, $tempdir, is only used if you want the
|
|
Packit |
cbc316 |
children to send back a reference to some data (see RETRIEVING DATASTRUCTURES
|
|
Packit |
cbc316 |
below). If not provided, it is set via a call to L<File::Temp>::tempdir().
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
The new method will die if the temporary directory does not exist or it is not
|
|
Packit |
cbc316 |
a directory.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item start [ $process_identifier ]
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
This method does the fork. It returns the pid of the child process for
|
|
Packit |
cbc316 |
the parent, and 0 for the child process. If the $processes parameter
|
|
Packit |
cbc316 |
for the constructor is 0 then, assuming you're in the child process,
|
|
Packit |
cbc316 |
$pm->start simply returns 0.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
An optional $process_identifier can be provided to this method... It is used by
|
|
Packit |
cbc316 |
the "run_on_finish" callback (see CALLBACKS) for identifying the finished
|
|
Packit |
cbc316 |
process.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item start_child [ $process_identifier, ] \&callback
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Like C<start>, but will run the C<&callback> as the child. If the callback returns anything,
|
|
Packit |
cbc316 |
it'll be passed as the data to transmit back to the parent process via C<finish()>.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item finish [ $exit_code [, $data_structure_reference] ]
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Closes the child process by exiting and accepts an optional exit code
|
|
Packit |
cbc316 |
(default exit code is 0) which can be retrieved in the parent via callback.
|
|
Packit |
cbc316 |
If the second optional parameter is provided, the child attempts to send
|
|
Packit |
cbc316 |
its contents back to the parent. If you use the program in debug mode
|
|
Packit |
cbc316 |
($processes == 0), this method just calls the callback.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
If the $data_structure_reference is provided, then it is serialized and
|
|
Packit |
cbc316 |
passed to the parent process. See RETRIEVING DATASTRUCTURES for more info.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item set_max_procs $processes
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Allows you to set a new maximum number of children to maintain.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item wait_all_children
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
You can call this method to wait for all the processes which have been
|
|
Packit |
cbc316 |
forked. This is a blocking wait.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item reap_finished_children
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
This is a non-blocking call to reap children and execute callbacks independent
|
|
Packit |
cbc316 |
of calls to "start" or "wait_all_children". Use this in scenarios where "start"
|
|
Packit |
cbc316 |
is called infrequently but you would like the callbacks executed quickly.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item is_parent
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Returns C<true> if within the parent or C<false> if within the child.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item is_child
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Returns C<true> if within the child or C<false> if within the parent.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item max_procs
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Returns the maximal number of processes the object will fork.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item running_procs
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Returns the pids of the forked processes currently monitored by the
|
|
Packit |
cbc316 |
C<Parallel::ForkManager>. Note that children are still reported as running
|
|
Packit |
cbc316 |
until the fork manager harvest them, via the next call to
|
|
Packit |
cbc316 |
C<start> or C<wait_all_children>.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my @pids = $pm->running_procs;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my $nbr_children =- $pm->running_procs;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item wait_for_available_procs( $n )
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Wait until C<$n> available process slots are available.
|
|
Packit |
cbc316 |
If C<$n> is not given, defaults to I<1>.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item waitpid_blocking_sleep
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Returns the sleep period, in seconds, of the pseudo-blocking calls. The sleep
|
|
Packit |
cbc316 |
period can be a fraction of second.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Returns C<0> if disabled.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Defaults to 1 second.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
See I<BLOCKING CALLS> for more details.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item set_waitpid_blocking_sleep $seconds
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Sets the the sleep period, in seconds, of the pseudo-blocking calls.
|
|
Packit |
cbc316 |
Set to C<0> to disable.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
See I<BLOCKING CALLS> for more details.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=back
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 CALLBACKS
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
You can define callbacks in the code, which are called on events like starting
|
|
Packit |
cbc316 |
a process or upon finish. Declare these before the first call to start().
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
The callbacks can be defined with the following methods:
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=over 4
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item run_on_finish $code [, $pid ]
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
You can define a subroutine which is called when a child is terminated. It is
|
|
Packit |
cbc316 |
called in the parent process.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
The parameters of the $code are the following:
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
- pid of the process, which is terminated
|
|
Packit |
cbc316 |
- exit code of the program
|
|
Packit |
cbc316 |
- identification of the process (if provided in the "start" method)
|
|
Packit |
cbc316 |
- exit signal (0-127: signal name)
|
|
Packit |
cbc316 |
- core dump (1 if there was core dump at exit)
|
|
Packit |
cbc316 |
- datastructure reference or undef (see RETRIEVING DATASTRUCTURES)
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item run_on_start $code
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
You can define a subroutine which is called when a child is started. It called
|
|
Packit |
cbc316 |
after the successful startup of a child in the parent process.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
The parameters of the $code are the following:
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
- pid of the process which has been started
|
|
Packit |
cbc316 |
- identification of the process (if provided in the "start" method)
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item run_on_wait $code, [$period]
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
You can define a subroutine which is called when the child process needs to wait
|
|
Packit |
cbc316 |
for the startup. If $period is not defined, then one call is done per
|
|
Packit |
cbc316 |
child. If $period is defined, then $code is called periodically and the
|
|
Packit |
cbc316 |
module waits for $period seconds between the two calls. Note, $period can be
|
|
Packit |
cbc316 |
fractional number also. The exact "$period seconds" is not guaranteed,
|
|
Packit |
cbc316 |
signals can shorten and the process scheduler can make it longer (on busy
|
|
Packit |
cbc316 |
systems).
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
The $code called in the "start" and the "wait_all_children" method also.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
No parameters are passed to the $code on the call.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=back
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 BLOCKING CALLS
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
When it comes to waiting for child processes to terminate, C<Parallel::ForkManager> is between
|
|
Packit |
cbc316 |
a fork and a hard place (if you excuse the terrible pun). The underlying Perl C<waitpid> function
|
|
Packit |
cbc316 |
that the module relies on can block until either one specific or any child process
|
|
Packit |
cbc316 |
terminate, but not for a process part of a given group.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
This means that the module can do one of two things when it waits for
|
|
Packit |
cbc316 |
one of its child processes to terminate:
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=over
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item Only wait for its own child processes
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
This is done via a loop using a C<waitpid> non-blocking call and a sleep statement.
|
|
Packit |
cbc316 |
The code does something along the lines of
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
while(1) {
|
|
Packit |
cbc316 |
if ( any of the P::FM child process terminated ) {
|
|
Packit |
cbc316 |
return its pid
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
sleep $sleep_period
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
This is the default behavior that the module will use.
|
|
Packit |
cbc316 |
This is not the most efficient way to wait for child processes, but it's
|
|
Packit |
cbc316 |
the safest way to ensure that C<Parallel::ForkManager> won't interfere with
|
|
Packit |
cbc316 |
any other part of the codebase.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
The sleep period is set via the method C<set_waitpid_blocking_sleep>.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item Block until any process terminate
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Alternatively, C<Parallel::ForkManager> can call C<waitpid> such that it will
|
|
Packit |
cbc316 |
block until any child process terminate. If the child process was not one of
|
|
Packit |
cbc316 |
the monitored subprocesses, the wait will resume. This is more efficient, but mean
|
|
Packit |
cbc316 |
that C<P::FM> can captures (and discards) the termination notification that a different
|
|
Packit |
cbc316 |
part of the code might be waiting for.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
If this is a race condition
|
|
Packit |
cbc316 |
that doesn't apply to your codebase, you can set the
|
|
Packit |
cbc316 |
I<waitpid_blocking_sleep> period to C<0>, which will enable C<waitpid> call blocking.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my $pm = Parallel::ForkManager->new( 4 );
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$pm->set_waitpid_blocking_sleep(0); # true blocking calls enabled
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
for ( 1..100 ) {
|
|
Packit |
cbc316 |
$pm->start and next;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
...; # do work
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$pm->finish;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=back
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 RETRIEVING DATASTRUCTURES from child processes
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
The ability for the parent to retrieve data structures is new as of version
|
|
Packit |
cbc316 |
0.7.6.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Each child process may optionally send 1 data structure back to the parent.
|
|
Packit |
cbc316 |
By data structure, we mean a reference to a string, hash or array. The
|
|
Packit |
cbc316 |
contents of the data structure are written out to temporary files on disc
|
|
Packit |
cbc316 |
using the L<Storable> modules' store() method. The reference is then
|
|
Packit |
cbc316 |
retrieved from within the code you send to the run_on_finish callback.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
The data structure can be any scalar perl data structure which makes sense:
|
|
Packit |
cbc316 |
string, numeric value or a reference to an array, hash or object.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
There are 2 steps involved in retrieving data structures:
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
1) A reference to the data structure the child wishes to send back to the
|
|
Packit |
cbc316 |
parent is provided as the second argument to the finish() call. It is up
|
|
Packit |
cbc316 |
to the child to decide whether or not to send anything back to the parent.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
2) The data structure reference is retrieved using the callback provided in
|
|
Packit |
cbc316 |
the run_on_finish() method.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Keep in mind that data structure retrieval is not the same as returning a
|
|
Packit |
cbc316 |
data structure from a method call. That is not what actually occurs. The
|
|
Packit |
cbc316 |
data structure referenced in a given child process is serialized and
|
|
Packit |
cbc316 |
written out to a file by L<Storable>. The file is subsequently read back
|
|
Packit |
cbc316 |
into memory and a new data structure belonging to the parent process is
|
|
Packit |
cbc316 |
created. Please consider the performance penalty it can imply, so try to
|
|
Packit |
cbc316 |
keep the returned structure small.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 EXAMPLES
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head2 Parallel get
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
This small example can be used to get URLs in parallel.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
use Parallel::ForkManager;
|
|
Packit |
cbc316 |
use LWP::Simple;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my $pm = Parallel::ForkManager->new(10);
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
LINKS:
|
|
Packit |
cbc316 |
for my $link (@ARGV) {
|
|
Packit |
cbc316 |
$pm->start and next LINKS;
|
|
Packit |
cbc316 |
my ($fn) = $link =~ /^.*\/(.*?)$/;
|
|
Packit |
cbc316 |
if (!$fn) {
|
|
Packit |
cbc316 |
warn "Cannot determine filename from $fn\n";
|
|
Packit |
cbc316 |
} else {
|
|
Packit |
cbc316 |
$0 .= " " . $fn;
|
|
Packit |
cbc316 |
print "Getting $fn from $link\n";
|
|
Packit |
cbc316 |
my $rc = getstore($link, $fn);
|
|
Packit |
cbc316 |
print "$link downloaded. response code: $rc\n";
|
|
Packit |
cbc316 |
};
|
|
Packit |
cbc316 |
$pm->finish;
|
|
Packit |
cbc316 |
};
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head2 Callbacks
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Example of a program using callbacks to get child exit codes:
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
use strict;
|
|
Packit |
cbc316 |
use Parallel::ForkManager;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my $max_procs = 5;
|
|
Packit |
cbc316 |
my @names = qw( Fred Jim Lily Steve Jessica Bob Dave Christine Rico Sara );
|
|
Packit |
cbc316 |
# hash to resolve PID's back to child specific information
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my $pm = Parallel::ForkManager->new($max_procs);
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# Setup a callback for when a child finishes up so we can
|
|
Packit |
cbc316 |
# get it's exit code
|
|
Packit |
cbc316 |
$pm->run_on_finish( sub {
|
|
Packit |
cbc316 |
my ($pid, $exit_code, $ident) = @_;
|
|
Packit |
cbc316 |
print "** $ident just got out of the pool ".
|
|
Packit |
cbc316 |
"with PID $pid and exit code: $exit_code\n";
|
|
Packit |
cbc316 |
});
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$pm->run_on_start( sub {
|
|
Packit |
cbc316 |
my ($pid, $ident)=@_;
|
|
Packit |
cbc316 |
print "** $ident started, pid: $pid\n";
|
|
Packit |
cbc316 |
});
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
$pm->run_on_wait( sub {
|
|
Packit |
cbc316 |
print "** Have to wait for one children ...\n"
|
|
Packit |
cbc316 |
},
|
|
Packit |
cbc316 |
0.5
|
|
Packit |
cbc316 |
);
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
NAMES:
|
|
Packit |
cbc316 |
foreach my $child ( 0 .. $#names ) {
|
|
Packit |
cbc316 |
my $pid = $pm->start($names[$child]) and next NAMES;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# This code is the child process
|
|
Packit |
cbc316 |
print "This is $names[$child], Child number $child\n";
|
|
Packit |
cbc316 |
sleep ( 2 * $child );
|
|
Packit |
cbc316 |
print "$names[$child], Child $child is about to get out...\n";
|
|
Packit |
cbc316 |
sleep 1;
|
|
Packit |
cbc316 |
$pm->finish($child); # pass an exit code to finish
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
print "Waiting for Children...\n";
|
|
Packit |
cbc316 |
$pm->wait_all_children;
|
|
Packit |
cbc316 |
print "Everybody is out of the pool!\n";
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head2 Data structure retrieval
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
In this simple example, each child sends back a string reference.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
use Parallel::ForkManager 0.7.6;
|
|
Packit |
cbc316 |
use strict;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my $pm = Parallel::ForkManager->new(2, '/server/path/to/temp/dir/');
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# data structure retrieval and handling
|
|
Packit |
cbc316 |
$pm -> run_on_finish ( # called BEFORE the first call to start()
|
|
Packit |
cbc316 |
sub {
|
|
Packit |
cbc316 |
my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure_reference) = @_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# retrieve data structure from child
|
|
Packit |
cbc316 |
if (defined($data_structure_reference)) { # children are not forced to send anything
|
|
Packit |
cbc316 |
my $string = ${$data_structure_reference}; # child passed a string reference
|
|
Packit |
cbc316 |
print "$string\n";
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
else { # problems occurring during storage or retrieval will throw a warning
|
|
Packit |
cbc316 |
print qq|No message received from child process $pid!\n|;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
);
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# prep random statement components
|
|
Packit |
cbc316 |
my @foods = ('chocolate', 'ice cream', 'peanut butter', 'pickles', 'pizza', 'bacon', 'pancakes', 'spaghetti', 'cookies');
|
|
Packit |
cbc316 |
my @preferences = ('loves', q|can't stand|, 'always wants more', 'will walk 100 miles for', 'only eats', 'would starve rather than eat');
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# run the parallel processes
|
|
Packit |
cbc316 |
PERSONS:
|
|
Packit |
cbc316 |
foreach my $person (qw(Fred Wilma Ernie Bert Lucy Ethel Curly Moe Larry)) {
|
|
Packit |
cbc316 |
$pm->start() and next PERSONS;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# generate a random statement about food preferences
|
|
Packit |
cbc316 |
my $statement = $person . ' ' . $preferences[int(rand @preferences)] . ' ' . $foods[int(rand @foods)];
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# send it back to the parent process
|
|
Packit |
cbc316 |
$pm->finish(0, \$statement); # note that it's a scalar REFERENCE, not the scalar itself
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
$pm->wait_all_children;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
A second datastructure retrieval example demonstrates how children decide
|
|
Packit |
cbc316 |
whether or not to send anything back, what to send and how the parent should
|
|
Packit |
cbc316 |
process whatever is retrieved.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=for example begin
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
use Parallel::ForkManager 0.7.6;
|
|
Packit |
cbc316 |
use Data::Dumper; # to display the data structures retrieved.
|
|
Packit |
cbc316 |
use strict;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
my $pm = Parallel::ForkManager->new(20); # using the system temp dir $L
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# data structure retrieval and handling
|
|
Packit |
cbc316 |
my %retrieved_responses = (); # for collecting responses
|
|
Packit |
cbc316 |
$pm -> run_on_finish (
|
|
Packit |
cbc316 |
sub {
|
|
Packit |
cbc316 |
my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure_reference) = @_;
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# see what the child sent us, if anything
|
|
Packit |
cbc316 |
if (defined($data_structure_reference)) { # test rather than assume child sent anything
|
|
Packit |
cbc316 |
my $reftype = ref($data_structure_reference);
|
|
Packit |
cbc316 |
print qq|ident "$ident" returned a "$reftype" reference.\n\n|;
|
|
Packit |
cbc316 |
if (1) { # simple on/off switch to display the contents
|
|
Packit |
cbc316 |
print &Dumper($data_structure_reference) . qq|end of "$ident" sent structure\n\n|;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# we can also collect retrieved data structures for processing after all children have exited
|
|
Packit |
cbc316 |
$retrieved_responses{$ident} = $data_structure_reference;
|
|
Packit |
cbc316 |
} else {
|
|
Packit |
cbc316 |
print qq|ident "$ident" did not send anything.\n\n|;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
);
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# generate a list of instructions
|
|
Packit |
cbc316 |
my @instructions = ( # a unique identifier and what the child process should send
|
|
Packit |
cbc316 |
{'name' => '%ENV keys as a string', 'send' => 'keys'},
|
|
Packit |
cbc316 |
{'name' => 'Send Nothing'}, # not instructing the child to send anything back to the parent
|
|
Packit |
cbc316 |
{'name' => 'Childs %ENV', 'send' => 'all'},
|
|
Packit |
cbc316 |
{'name' => 'Child chooses randomly', 'send' => 'random'},
|
|
Packit |
cbc316 |
{'name' => 'Invalid send instructions', 'send' => 'Na Na Nana Na'},
|
|
Packit |
cbc316 |
{'name' => 'ENV values in an array', 'send' => 'values'},
|
|
Packit |
cbc316 |
);
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
INSTRUCTS:
|
|
Packit |
cbc316 |
foreach my $instruction (@instructions) {
|
|
Packit |
cbc316 |
$pm->start($instruction->{'name'}) and next INSTRUCTS; # this time we are using an explicit, unique child process identifier
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# last step in child processing
|
|
Packit |
cbc316 |
$pm->finish(0) unless $instruction->{'send'}; # no data structure is sent unless this child is told what to send.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
if ($instruction->{'send'} eq 'keys') {
|
|
Packit |
cbc316 |
$pm->finish(0, \join(', ', keys %ENV));
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
} elsif ($instruction->{'send'} eq 'values') {
|
|
Packit |
cbc316 |
$pm->finish(0, [values %ENV]); # kinda useless without knowing which keys they belong to...
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
} elsif ($instruction->{'send'} eq 'all') {
|
|
Packit |
cbc316 |
$pm->finish(0, \%ENV); # remember, we are not "returning" anything, just copying the hash to disc
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# demonstrate clearly that the child determines what type of reference to send
|
|
Packit |
cbc316 |
} elsif ($instruction->{'send'} eq 'random') {
|
|
Packit |
cbc316 |
my $string = q|I'm just a string.|;
|
|
Packit |
cbc316 |
my @array = qw(I am an array);
|
|
Packit |
cbc316 |
my %hash = (type => 'associative array', synonym => 'hash', cool => 'very :)');
|
|
Packit |
cbc316 |
my $return_choice = ('string', 'array', 'hash')[int(rand 3)]; # randomly choose return data type
|
|
Packit |
cbc316 |
$pm->finish(0, \$string) if ($return_choice eq 'string');
|
|
Packit |
cbc316 |
$pm->finish(0, \@array) if ($return_choice eq 'array');
|
|
Packit |
cbc316 |
$pm->finish(0, \%hash) if ($return_choice eq 'hash');
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# as a responsible child, inform parent that their instruction was invalid
|
|
Packit |
cbc316 |
} else {
|
|
Packit |
cbc316 |
$pm->finish(0, \qq|Invalid instructions: "$instruction->{'send'}".|); # ordinarily I wouldn't include invalid input in a response...
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
$pm->wait_all_children; # blocks until all forked processes have exited
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
# post fork processing of returned data structures
|
|
Packit |
cbc316 |
for (sort keys %retrieved_responses) {
|
|
Packit |
cbc316 |
print qq|Post processing "$_"...\n|;
|
|
Packit |
cbc316 |
}
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=for example end
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 SECURITY
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Parallel::ForkManager uses temporary files when
|
|
Packit |
cbc316 |
a child process returns information to its parent process. The filenames are
|
|
Packit |
cbc316 |
based on the process of the parent and child processes, so they are
|
|
Packit |
cbc316 |
fairly easy to guess. So if security is a concern in your environment, make sure
|
|
Packit |
cbc316 |
the directory used by Parallel::ForkManager is restricted to the current user
|
|
Packit |
cbc316 |
only (the default behavior is to create a directory,
|
|
Packit |
cbc316 |
via L<File::Temp>'s C<tempdir>, which does that).
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 TROUBLESHOOTING
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head2 PerlIO::gzip and Parallel::ForkManager do not play nice together
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
If you are using L<PerlIO::gzip> in your child processes, you may end up with
|
|
Packit |
cbc316 |
garbled files. This is not really P::FM's fault, but rather a problem between
|
|
Packit |
cbc316 |
L<PerlIO::gzip> and C<fork()> (see L<https://rt.cpan.org/Public/Bug/Display.html?id=114557>).
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Fortunately, it seems there is an easy way to fix the problem by
|
|
Packit |
cbc316 |
adding the "unix" layer? I.e.,
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
open(IN, '<:unix:gzip', ...
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 BUGS AND LIMITATIONS
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Do not use Parallel::ForkManager in an environment, where other child
|
|
Packit |
cbc316 |
processes can affect the run of the main program, so using this module
|
|
Packit |
cbc316 |
is not recommended in an environment where fork() / wait() is already used.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
If you want to use more than one copies of the Parallel::ForkManager, then
|
|
Packit |
cbc316 |
you have to make sure that all children processes are terminated, before you
|
|
Packit |
cbc316 |
use the second object in the main program.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
You are free to use a new copy of Parallel::ForkManager in the child
|
|
Packit |
cbc316 |
processes, although I don't think it makes sense.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 CREDITS
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Michael Gang (bug report)
|
|
Packit |
cbc316 |
Noah Robin <sitz@onastick.net> (documentation tweaks)
|
|
Packit |
cbc316 |
Chuck Hirstius <chirstius@megapathdsl.net> (callback exit status, example)
|
|
Packit |
cbc316 |
Grant Hopwood <hopwoodg@valero.com> (win32 port)
|
|
Packit |
cbc316 |
Mark Southern <mark_southern@merck.com> (bugfix)
|
|
Packit |
cbc316 |
Ken Clarke <www.perlprogrammer.net> (datastructure retrieval)
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 AUTHORS
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=over 4
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item *
|
|
Packit |
cbc316 |
|
|
Packit Service |
6fff66 |
dLux (Szabó, Balázs) <dlux@dlux.hu>
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item *
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Yanick Champoux <yanick@cpan.org>
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=item *
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
Gabor Szabo <gabor@szabgab.com>
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=back
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=head1 COPYRIGHT AND LICENSE
|
|
Packit |
cbc316 |
|
|
Packit Service |
6fff66 |
This software is copyright (c) 2000 by Balázs Szabó.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
This is free software; you can redistribute it and/or modify it under
|
|
Packit |
cbc316 |
the same terms as the Perl 5 programming language system itself.
|
|
Packit |
cbc316 |
|
|
Packit |
cbc316 |
=cut
|