Feeds:
Posts
Comments

Posts Tagged ‘ipc’

IPC::ConcurrencyLimit

(This is just a note for myself).

IPC::ConcurrencyLimit is a handy module for implementing a number of concurrency patterns.

Steffan Mueller mentioned it in a comment on my blog back in 2011. Since then, there have been a couple of articles about it on the Booking.com dev blog

Read Full Post »

In my recent Connecting Software Systems post, I included a teaser:

There is a great trick to connect two processes together using the shell (hint: using pipes) that I’m going to talk about next time

If you need to connect a single producer to a single consumer, you can easily connect them using a unix pipe without changing much of the underlying code. The trick is to write to STDOUT whenever work is available for the consumer. The consumer reads work requests from STDIN.

Script Preamble

The preamble will be mostly common to both the producer and the consumer. I set STDOUT to autoflush to avoid any slowness due to buffering.

use 5.010;

use strict;
use warnings;

use File::Path 'make_path';
use IO::Handle;
use POSIX 'strftime';

use File::Slurp;

STDOUT->autoflush(1);

sub hms
{
    return strftime('%H:%M:%S', localtime(time()));
}

sub my_log
{
    print '[ ', hms(), ' ] : ', @_, "\n";
}

The Producer

The work files are created in /var/tmp/pro-co. Hopefully create_file would be doing something a bit more useful in a real application!

sub create_file
{
    my $top = shift;

    my $filename = $top . '/' . int(rand(1_000_000)) . '.txt';
    my $content = rand(); # or something more useful...
    # simulate taking some time for processing
    sleep rand(5);
    write_file($filename, $content);
    return $filename;
}

# --

my $top = '/var/tmp/pro-co';

make_path($top);

for (1..5) {
    my_log "Iteration $_";
    my $filename = create_file($top);
    my_log "PRODUCED $filename";
}

The Consumer

The consumer removes the timestamp from the producer output. Then, any line that indicates a unit of work (marked in the example by PRODUCED) is passed to process_file().

sub process_file
{
    my $file = shift;
    my_log "Processing file [$file]";
    # File processing logic here ...
}

# --

while (defined(my $line = <STDIN>)) {
    # Strip the timestamp
    $line =~ s{\[\s[0-9:]+\s\]\s:\s}{};
    chomp $line;

    if ($line =~ /^PRODUCED\s+(.+)/) {
        process_file($1);
    } else {
        my_log "FROM PRODUCER [$line]";
    }
}

The Example Run

As you can see from the output below, the consumer was able to process the work as it became available.

$ ./producer.pl | tee producer.log | ./consumer.pl
[ 09:22:20 ] : FROM PRODUCER [Iteration 1]
[ 09:22:20 ] : Processing file [/var/tmp/pro-co/944319.txt]
[ 09:22:20 ] : FROM PRODUCER [Iteration 2]
[ 09:22:23 ] : Processing file [/var/tmp/pro-co/141765.txt]
[ 09:22:23 ] : FROM PRODUCER [Iteration 3]
[ 09:22:27 ] : Processing file [/var/tmp/pro-co/463599.txt]
[ 09:22:27 ] : FROM PRODUCER [Iteration 4]
[ 09:22:28 ] : Processing file [/var/tmp/pro-co/423055.txt]
[ 09:22:28 ] : FROM PRODUCER [Iteration 5]
[ 09:22:28 ] : Processing file [/var/tmp/pro-co/233909.txt]
$ cat producer.log
[ 09:22:20 ] : Iteration 1
[ 09:22:20 ] : PRODUCED /var/tmp/pro-co/944319.txt
[ 09:22:20 ] : Iteration 2
[ 09:22:23 ] : PRODUCED /var/tmp/pro-co/141765.txt
[ 09:22:23 ] : Iteration 3
[ 09:22:27 ] : PRODUCED /var/tmp/pro-co/463599.txt
[ 09:22:27 ] : Iteration 4
[ 09:22:28 ] : PRODUCED /var/tmp/pro-co/423055.txt
[ 09:22:28 ] : Iteration 5
[ 09:22:28 ] : PRODUCED /var/tmp/pro-co/233909.txt

Read Full Post »

I’m a huge fan of code reuse, and I tend to trust other people’s public code more than my own private code. After all, if they put effort into making it public, they must have put a lot of thought into it. And more than likely it is probably their speciality. That’s why I don’t (for example) implement my own webservers :) Having said that, I’ve reimplemented more wheels than I care to admit. Here is me, providing my own version of identity – a built-in emacs function. How embarrassing. Almost put me off blogging that did.

So, I try to do a bit of due dilligence when I’m thinking about writing a simple thing that someone surely has already done. What I want, is an interprocess mutex. There must be a million ways of doing these things, so I have a quick look on CPAN. Searching for mutex brings up LockFile::NetLock which looks interesting (although slightly paranoid, and I don’t have my own ftp server).

IPC::Semaphore looks like it only supports SysV (i.e. not Windows) and the API is somewhat baroque. Win32::Semaphore has the opposite problem (I’m guessing). I did like the look of POSIX::RT::Semaphore but suspected it wouldn’t install on Windows.

dmake.EXE:  Error code 129, while making 'Semaphore.o'
  MJP/POSIX-RT-Semaphore-0.05.tar.gz
  C:\strawberry\c\bin\dmake.EXE -- NOT OK
Running make test
  Can't test without successful make
Running make install
  Make had returned bad status, install seems impossible

Yep, I was right. I’ll spare you the results from the other hour I spent googling various things like semaphore and process mutex.

Okay, let’s think about this. Maybe I could write my own simple little thing with a nice API, and if I find a decent module later I can delegate to it, or maybe delegate to Win32::Semaphore on Windows and the SysV version everywhere else. The ACE guys call that a wrapper facade. Does flock work on Windows?

So the plan is to call flock LOCK_EX on a file in the constructor, and then flock LOCK_UN in the destructor in a kinda RAII way. Famous last words, but what else could I possibly need?

package IPC::ProcessMutex;

use Carp;
use Fcntl;

sub import { }

sub new
{
    my $class = shift;
    my $file = shift;

    my $fh;
    if (! sysopen($fh, $file, O_CREAT)) {
        croak "Unable to sysopen $file";
    }

    my $self = { handle => $fh };
    bless $self, $class;

    flock $fh, Fcntl::LOCK_EX;

    return $self;
}

sub DESTROY
{
    my $self = shift;

    if (exists $self->{handle}) {
        flock $self->{handle}, Fcntl::LOCK_UN;
        close $self->{handle};
        delete $self->{handle};
    } else {
        carp 'DESTROY - handle was not defined';
    }
}

1;

And to avoid having to think to hard about whether the code actually works or not, I’ll check it with a little test ;)

use POSIX;
use FindBin;

use lib "$FindBin::Bin/lib/perl5";

use IPC::ProcessMutex;

sub my_log
{
    my $ts = POSIX::strftime('%H:%M:%S', localtime(time()));
    print "[ $ts ] : ", @_, "\n";
}

{
    my_log 'Getting mutex ...';
    my $mutex = IPC::ProcessMutex->new('.filename.lock');
    my_log 'Got mutex.  Sleeping 10 ...';
    sleep 10;
}

my_log 'Released mutex.';

Process 1

jared@win32 $ perl get-mutex.pl
[ 21:37:57 ] : Getting mutex ...
[ 21:37:57 ] : Got mutex.  Sleeping 10 ...
[ 21:38:07 ] : Released mutex.
jared@win32 $ perl get-mutex.pl
[ 21:38:15 ] : Getting mutex ...
[ 21:38:17 ] : Got mutex.  Sleeping 10 ...
[ 21:38:27 ] : Released mutex.
jared@win32 $

Process 2

jared@win32 $ perl get-mutex.pl
[ 21:38:02 ] : Getting mutex ...
[ 21:38:07 ] : Got mutex.  Sleeping 10 ...
[ 21:38:17 ] : Released mutex.
jared@win32 $ perl get-mutex.pl
[ 21:38:24 ] : Getting mutex ...
[ 21:38:27 ] : Got mutex.  Sleeping 10 ...
[ 21:38:37 ] : Released mutex.
jared@win32 $

Yes! Mission accomplished as they say. I’m tentatively going to go with that…

Read Full Post »

Follow

Get every new post delivered to your Inbox.