Feeds:
Posts
Comments

Archive for July, 2011

Parallel::Iterator

While looking at the Job Manager script from last week, I omitted the section where each job section of the batch returns the result to the manager.

The job serialises a hash containing the results to disk using Storable. When the jobs have all finished, the manager retrieves the data using the identifier.

my $results = {};
my $id = $manager->identifier();
foreach (>/tmp/*_$id.result<) {
    if (! m{^/tmp/(\d+)_}) {
        say "Error: unable to retrieve id from $_";
        next;
    }
    $results->{$1} = retrieve($_);
}

use Data::Dumper;
print Dumper($results);

Now it turns out, there is yet another handy cpan module called parallel::iterator, which can return the output of each job in an output list. (Under the covers, it has pipes between the processes and serialises the data between them using Storable).

And I was going to say, it would be nice if folks on Ironman talked about useful modules they came across from time to time.

Except they do already. dagolden already spoke about parallel::iterator here.

Wouldn’t it be handy if you could tag your ironman posts with a hashtag, like #cpanmodules and clicking on the hashtag would return the results?

Ironman: #cpanmodules #fork

Read Full Post »

Wanted: A guide to CPAN

The other day I was looking at a script that ran a bunch of more or less independent jobs in batches of four.

I’ve reproduced the core of the script as best as I can remember it.

Job

It has a class to represent the jobs themselves.

package Job;

use Moose;

has identifier => (
    is => 'ro',
    required => 1,
);

has cmd => (
    is => 'ro',
    required => 1,
);

no Moose;
__PACKAGE__->meta->make_immutable;

Job Manager

and a class that tries to ensure that 4 jobs are running in parallel wherever possible.

package JobManager;

use Moose;

use POSIX 'strftime';

has identifier => (
    is => 'ro',
    default => sub { strftime('%H%M%S', localtime(time())); },
);

has max_processes => (
    is => 'ro',
    default => 4,
);

has _job_id => (
    is => 'ro',
    writer => '_set_job_id',
    init_arg => undef,
    default => 1,
);

has queued_jobs => (
    is => 'ro',
    traits => ['Array'],
    isa => 'ArrayRef[Job]',
    default => sub { [] },
    handles => {
        enqueue_job => 'push',
        dequeue_job => 'shift',
        exist_queued_jobs => 'count',
    },
);

has running_jobs => (
    is => 'ro',
    traits => ['Hash'],
    isa => 'HashRef[Job]',
    default => sub { {} },
    handles => {
        add_running_job => 'set',
        delete_running_job => 'delete',
        num_jobs => 'count',
    },
);

sub next_job_id
{
    my $self = shift;
    my $job_id = $self->_job_id();
    $self->_set_job_id($job_id + 1);
    return sprintf "%02d", $job_id;
}

sub run_job
{
    my ($self, $job) = @_;

    my ($identifier, $cmd) = ($job->identifier(), $job->cmd());
    my $pid = fork();
    if (! defined($pid)) {
        say "Failed to run job $identifier";
    } elsif ($pid) {
        say "Running job $identifier ($pid)";
        $self->add_running_job($pid, $job);
    } else {
        system("$cmd > /tmp/$identifier.output 2>&1");
        exit;
    }
}

sub add_job
{
    my ($self, $name, $cmd) = @_;

    my $job = Job->new(
        identifier => (sprintf "%s_${name}_%s",
                               $self->next_job_id(), $self->identifier()),
        cmd => $cmd);

    if ($self->num_jobs() > $self->max_processes()) {
        $self->enqueue_job($job);
    } else {
        $self->run_job($job);
    }
}

sub main_loop
{
    my $self = shift;

    while (1) {
        my $pid = wait();
        last if ($pid < 0);
        say "Child $pid has exited";

        $self->delete_running_job($pid);
        while ($self->num_jobs() < $self->max_processes()) {
            last unless $self->exist_queued_jobs();
            $self->run_job($self->dequeue_job());
        }
    }
}

no Moose;
__PACKAGE__->meta->make_immutable;

Test Code

My test code to check if I got the code more or less correct.

my $manager = JobManager->new();

$manager->add_job('echo', 'sleep 10 ; echo hello');
for (1..9) {
    $manager->add_job('echo', 'sleep 2 ; echo hello');
}

$manager->main_loop();
jared@localhost $ ls -ltr /tmp/*echo*
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/05_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/04_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/03_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/02_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/08_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/07_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/06_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/10_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/09_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/01_echo_193228.output

Conclusion

I took two lessons away.


Parallel::Queue would have greatly simplified the core of this script. How many CPAN modules could my code benefit from equally if only I knew about them?


fork() is nice and easy to deal with. The code to manage the processes isn’t hugely complicated and seems pretty robust (careful, I may not have duplicated the robustness here).

Read Full Post »

Follow

Get every new post delivered to your Inbox.