The other day I was looking at a script that ran a bunch of more or less independent jobs in batches of four.
I’ve reproduced the core of the script as best as I can remember it.
Job
It has a class to represent the jobs themselves.
package Job; use Moose; has identifier => ( is => 'ro', required => 1, ); has cmd => ( is => 'ro', required => 1, ); no Moose; __PACKAGE__->meta->make_immutable;
Job Manager
and a class that tries to ensure that 4 jobs are running in parallel wherever possible.
package JobManager; use Moose; use POSIX 'strftime'; has identifier => ( is => 'ro', default => sub { strftime('%H%M%S', localtime(time())); }, ); has max_processes => ( is => 'ro', default => 4, ); has _job_id => ( is => 'ro', writer => '_set_job_id', init_arg => undef, default => 1, ); has queued_jobs => ( is => 'ro', traits => ['Array'], isa => 'ArrayRef[Job]', default => sub { [] }, handles => { enqueue_job => 'push', dequeue_job => 'shift', exist_queued_jobs => 'count', }, ); has running_jobs => ( is => 'ro', traits => ['Hash'], isa => 'HashRef[Job]', default => sub { {} }, handles => { add_running_job => 'set', delete_running_job => 'delete', num_jobs => 'count', }, ); sub next_job_id { my $self = shift; my $job_id = $self->_job_id(); $self->_set_job_id($job_id + 1); return sprintf "%02d", $job_id; } sub run_job { my ($self, $job) = @_; my ($identifier, $cmd) = ($job->identifier(), $job->cmd()); my $pid = fork(); if (! defined($pid)) { say "Failed to run job $identifier"; } elsif ($pid) { say "Running job $identifier ($pid)"; $self->add_running_job($pid, $job); } else { system("$cmd > /tmp/$identifier.output 2>&1"); exit; } } sub add_job { my ($self, $name, $cmd) = @_; my $job = Job->new( identifier => (sprintf "%s_${name}_%s", $self->next_job_id(), $self->identifier()), cmd => $cmd); if ($self->num_jobs() > $self->max_processes()) { $self->enqueue_job($job); } else { $self->run_job($job); } } sub main_loop { my $self = shift; while (1) { my $pid = wait(); last if ($pid < 0); say "Child $pid has exited"; $self->delete_running_job($pid); while ($self->num_jobs() < $self->max_processes()) { last unless $self->exist_queued_jobs(); $self->run_job($self->dequeue_job()); } } } no Moose; __PACKAGE__->meta->make_immutable;
Test Code
My test code to check if I got the code more or less correct.
my $manager = JobManager->new(); $manager->add_job('echo', 'sleep 10 ; echo hello'); for (1..9) { $manager->add_job('echo', 'sleep 2 ; echo hello'); } $manager->main_loop();
jared@localhost $ ls -ltr /tmp/*echo* -rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/05_echo_193228.output -rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/04_echo_193228.output -rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/03_echo_193228.output -rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/02_echo_193228.output -rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/08_echo_193228.output -rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/07_echo_193228.output -rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/06_echo_193228.output -rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/10_echo_193228.output -rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/09_echo_193228.output -rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/01_echo_193228.output
Conclusion
I took two lessons away.
Parallel::Queue would have greatly simplified the core of this script. How many CPAN modules could my code benefit from equally if only I knew about them?
fork()
is nice and easy to deal with. The code to manage the processes isn’t hugely complicated and seems pretty robust (careful, I may not have duplicated the robustness here).
I have often thought that CPAN’s current discoverability is rather poor. It’s long been on my mind to create a wiki where CPAN stuff could be easily categorized – like the category pages that you now see on search.cpan.org, but maintained. You could then go to “System, Networking, IPC” > “Process Management”, and find Parallel::Queue and related modules there.
http://www.modernperlbooks.com/mt/2011/06/four-new-perl-books-underway.html
Kensho book is coming, it will benefit you.
http://p3rl.org/wiki
collects information about modules for common tasks and contrasts them.
I want to add to Peter’s post, many modules are tagged and authors can also add tags to their distro meta file, that’s a taxonomy scheme additional to categories and not fully exploited yet.
See also: IPC::ConcurrencyLimit which was extracted from our code at booking.com. We use the algorithm quite heavily. The module itself is used by some of our newer code, too, for running on up to 24 cores.
If you use it, just take care when forking since the underlying flocks are cloned, too, so you either need to release them in one of the processes or spawn processes another way. I usually opt for having an executable that can run in different modes, one of which is “spawn” to run all the other modes using system()’s fork&exec pattern.
@Peter – indexing “stuff” well seems to be a hard problem. There would definitely be value in a list of a subset of modules that are worth looking at and importantly why they are worth looking at. Is that what you intended for your wiki or was it to be more comprehensive?
@anonymous – I’m looking forward to all 4 books mentioned by chromatic although possibly the Kensho book the least if it only covers Task::Kensho modules. I am moderately familiar with all of the modules on the Task::Kensho list apart from DBIC.
A much more comprehensive list than Task::Kensho (that included stuff like Parallel::Queue) along with a very brief summary of each module would be great.
@Steffen – cheers for the pointer to IPC::ConcurrencyLimit. I definitely can see some places where I’d use it in our codebase (although it would not fit in the script I was talking about in the post of course)