Posts Tagged ‘message queues’

What has been on my mind recently is if I write some software to do a task, say process some files and dump some other files to disk, how can that process inform another process that the files are ready?

Using Shell

There are millions of ways to do this. The easiest is probably to wrap both processes in a shell script:


The magic of unix means that process_created_files is triggered as soon as create_files exits.

This has a couple of problems. What if there were any errors in create_files? What if create_files takes a long time, and we want to start processing as soon as the first file has been created?

There is a great trick to connect two processes together using the shell (hint: using pipes) that I’m going to talk about next time. For now, let’s focus on the more heavy-weight options.

Using a Filewatcher

There are a few modules on CPAN that make it easy to build a filewatcher. A couple of examples are:

Maybe it is just me, but watching for file changes seems a bit 1970s, so moving swiftly on…

Using Third-Party Mechanisms

For connecting 1 producer to 1 consumer, I really like message queues. Apache has a crufy enterprise protocol called AMQP

RestMQ is a queue built on top of Redis.

Or you could use the Pub/Sub mechanism built into Redis directly.

ZeroMQ pretty much solves my problem without having to drop down to raw sockets.

# —

Unfortunately, none of these options are available to me. That brings me on to the raw socket options.

Using Raw Sockets

If the producer provides a server for interested consumers to connect to, you can have a bunch of consumers listening for events. Even better, you can add one layer of indirection and allow for many to many interactions.

An AnyEvent Notifier

The core of this code was taken from my Emulating POSIX Signals post.

use 5.010;

use strict;
use warnings;

package Notifier;

use constant CTRL_D => 4;
use constant DEBUG => $ENV{NOTIFIER_DEBUG};

my $cr = "\015\012";

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;

sub new
    my $class = shift;
    my $self = {
        pid => 0,
        watchers => {},
    bless $self, $class;
    return $self;

sub disconnect_socket
    my ($self, $handle, $pid) = @_;
    $handle->on_drain(sub {
        say "disconnecting PID $pid...";
    delete $self->{watchers}{$pid};

sub get_other_handles
    my ($self, $pid) = @_;
    return map { $self->{watchers}{$_}{handle} }
           grep { $_ != $pid } keys %{$self->{watchers}};

sub broadcast_line
    my ($self, $handles_ref, $line) = @_;
    foreach my $handle (@$handles_ref) {
        $handle->push_write($line . ${cr});

sub watch_socket
    my ($self, $sock, $host, $port) = @_;

    # Closure variables
    my $pid = ++$self->{pid};

    my $handle; $handle = AnyEvent::Handle->new(
        fh => $sock,
        on_error => sub {
            say "Error: $! (PID $pid)";
            delete $self->{watchers}{$pid};
        on_eof => sub {
            say "Disconnected (PID $pid)";
            delete $self->{watchers}{$pid};
        on_read => sub {
            my $buffer = $handle->rbuf();
            $handle->rbuf() = '';

            if (length($buffer) == 1 and ord($buffer) == CTRL_D) {
                $handle->push_write('Received EOF.  ' .
                                    "Closing connection...${cr}");
                $self->disconnect_socket($handle, $pid);
            } else {
                my @handles = $self->get_other_handles($pid);
                foreach my $line (split /\r?\n/, $buffer) {
                    say "$host/$port : $line";
                    if (lc($line) =~ /quit|exit/) {
                        $self->disconnect_socket($handle, $pid);
                    } else {
                        $self->broadcast_line(\@handles, $line);

    $self->{watchers}{$pid} = { handle => $handle };
    $handle->push_write("Connected.  (PID $pid)${cr}");

sub prepare_handler
    my ($fh, $host, $port) = @_;
    DEBUG && warn "Listening on $host:$port\n";

sub _accept_handler
    my $self = shift;

    return sub {
        my ($sock, $peer_host, $peer_port) = @_;

        DEBUG && warn "Accepted connection from $peer_host:$peer_port\n";
        if (! $sock) {
            warn '$sock undefined' . "\n";

        $self->watch_socket($sock, $peer_host, $peer_port);

sub start_listen
    my ($self, $host, $port) = @_;

    $self->{server} = tcp_server($host,

package main;

my $host = undef;
my $port = 12345;

my $kernel = Notifier->new();
$kernel->start_listen($host, $port);


Read Full Post »