Feeds:
Posts
Comments

Posts Tagged ‘connecting software systems’

For the producer demonstrated last time, it is easy to make a consumer using AnyEvent. Not only that, I can borrow most of the code from my list_processes script.

The utility functions are the same as in the unix pipe producer/consumer.

sub consumer
{
    my ($host, $port, $cb) = @_;

    my $cv = AE::cv();

    my $handle; $handle = AnyEvent::Handle->new(
        connect  => [$host => $port],
        on_error => sub {
            print("Connection error: $!\n");
            $handle->destroy();
        },
        on_eof => sub {
            print "Connection closed\n";
            $handle->destroy();
            $cv->send();
        }
    );

    # We need to consume the first line which contains the PID message
    $handle->push_read(line => sub {});

    $handle->on_read(sub {
        my $handle = shift;
        my $data = $handle->rbuf();
        $handle->rbuf() = '';
        $data =~ tr/\r//d;
        foreach my $line (split /\n/, $data) {
            $cb->($line);
        }
    });

    return $cv;
}

The callback function checks the subject is correct and then calls process_file(...). It should be fairly easy to see how to extend this for much more complex producers and consumers.

sub process_file
{
    my $file = shift;
    my_log "Processing file [$file]";
    # File processing logic here ...
}

sub handle_line
{
    my $line = shift;

    my ($subject, $action) = split /\s+/, $line;
    if ($subject =~ m{^/producer/file-creator/new-file}) {
        process_file($action);
    } else {
        my_log "FROM PRODUCER [$line]";
    }
}

my $cv = consumer('localhost', 12345, \&handle_line);
$cv->recv();

Read Full Post »

Previously we looked at connecting one producer to one consumer using a unix pipe. If we want to do many to many connections, we can use the AnyEvent Notifier I mentioned in Connecting Software Systems.

Instead of echoing a notification that a work unit has been created to STDOUT I write it to a socket connected to the notifier.

IO::Socket makes this absurdly easy.

use IO::Socket ':crlf';
use constant SHUTDOWN_SOCK_RW => 2;

my $sock = IO::Socket::INET->new(PeerAddr => 'localhost:12345');

for (1..5) {
    my_log "Iteration $_";
    my $filename = create_file($top);
    print $sock "/producer/file-creator/new-file $filename" . CRLF;
    my_log "PRODUCED $filename";
}

$sock->shutdown(SHUTDOWN_SOCK_RW);
$sock->close();

I often use a subject<SPACE>message format to make it easy for clients to filter between important messages.

I’ll demonstrate the consumer next time.

Read Full Post »

In my recent Connecting Software Systems post, I included a teaser:

There is a great trick to connect two processes together using the shell (hint: using pipes) that I’m going to talk about next time

If you need to connect a single producer to a single consumer, you can easily connect them using a unix pipe without changing much of the underlying code. The trick is to write to STDOUT whenever work is available for the consumer. The consumer reads work requests from STDIN.

Script Preamble

The preamble will be mostly common to both the producer and the consumer. I set STDOUT to autoflush to avoid any slowness due to buffering.

use 5.010;

use strict;
use warnings;

use File::Path 'make_path';
use IO::Handle;
use POSIX 'strftime';

use File::Slurp;

STDOUT->autoflush(1);

sub hms
{
    return strftime('%H:%M:%S', localtime(time()));
}

sub my_log
{
    print '[ ', hms(), ' ] : ', @_, "\n";
}

The Producer

The work files are created in /var/tmp/pro-co. Hopefully create_file would be doing something a bit more useful in a real application!

sub create_file
{
    my $top = shift;

    my $filename = $top . '/' . int(rand(1_000_000)) . '.txt';
    my $content = rand(); # or something more useful...
    # simulate taking some time for processing
    sleep rand(5);
    write_file($filename, $content);
    return $filename;
}

# --

my $top = '/var/tmp/pro-co';

make_path($top);

for (1..5) {
    my_log "Iteration $_";
    my $filename = create_file($top);
    my_log "PRODUCED $filename";
}

The Consumer

The consumer removes the timestamp from the producer output. Then, any line that indicates a unit of work (marked in the example by PRODUCED) is passed to process_file().

sub process_file
{
    my $file = shift;
    my_log "Processing file [$file]";
    # File processing logic here ...
}

# --

while (defined(my $line = <STDIN>)) {
    # Strip the timestamp
    $line =~ s{\[\s[0-9:]+\s\]\s:\s}{};
    chomp $line;

    if ($line =~ /^PRODUCED\s+(.+)/) {
        process_file($1);
    } else {
        my_log "FROM PRODUCER [$line]";
    }
}

The Example Run

As you can see from the output below, the consumer was able to process the work as it became available.

$ ./producer.pl | tee producer.log | ./consumer.pl
[ 09:22:20 ] : FROM PRODUCER [Iteration 1]
[ 09:22:20 ] : Processing file [/var/tmp/pro-co/944319.txt]
[ 09:22:20 ] : FROM PRODUCER [Iteration 2]
[ 09:22:23 ] : Processing file [/var/tmp/pro-co/141765.txt]
[ 09:22:23 ] : FROM PRODUCER [Iteration 3]
[ 09:22:27 ] : Processing file [/var/tmp/pro-co/463599.txt]
[ 09:22:27 ] : FROM PRODUCER [Iteration 4]
[ 09:22:28 ] : Processing file [/var/tmp/pro-co/423055.txt]
[ 09:22:28 ] : FROM PRODUCER [Iteration 5]
[ 09:22:28 ] : Processing file [/var/tmp/pro-co/233909.txt]
$ cat producer.log
[ 09:22:20 ] : Iteration 1
[ 09:22:20 ] : PRODUCED /var/tmp/pro-co/944319.txt
[ 09:22:20 ] : Iteration 2
[ 09:22:23 ] : PRODUCED /var/tmp/pro-co/141765.txt
[ 09:22:23 ] : Iteration 3
[ 09:22:27 ] : PRODUCED /var/tmp/pro-co/463599.txt
[ 09:22:27 ] : Iteration 4
[ 09:22:28 ] : PRODUCED /var/tmp/pro-co/423055.txt
[ 09:22:28 ] : Iteration 5
[ 09:22:28 ] : PRODUCED /var/tmp/pro-co/233909.txt

Read Full Post »

What has been on my mind recently is if I write some software to do a task, say process some files and dump some other files to disk, how can that process inform another process that the files are ready?

Using Shell

There are millions of ways to do this. The easiest is probably to wrap both processes in a shell script:

create_files
process_created_files

The magic of unix means that process_created_files is triggered as soon as create_files exits.

This has a couple of problems. What if there were any errors in create_files? What if create_files takes a long time, and we want to start processing as soon as the first file has been created?

There is a great trick to connect two processes together using the shell (hint: using pipes) that I’m going to talk about next time. For now, let’s focus on the more heavy-weight options.

Using a Filewatcher

There are a few modules on CPAN that make it easy to build a filewatcher. A couple of examples are:

Maybe it is just me, but watching for file changes seems a bit 1970s, so moving swiftly on…

Using Third-Party Mechanisms

For connecting 1 producer to 1 consumer, I really like message queues. Apache has a crufy enterprise protocol called AMQP

RestMQ is a queue built on top of Redis.

Or you could use the Pub/Sub mechanism built into Redis directly.

ZeroMQ pretty much solves my problem without having to drop down to raw sockets.

# —

Unfortunately, none of these options are available to me. That brings me on to the raw socket options.

Using Raw Sockets

If the producer provides a server for interested consumers to connect to, you can have a bunch of consumers listening for events. Even better, you can add one layer of indirection and allow for many to many interactions.

An AnyEvent Notifier

The core of this code was taken from my Emulating POSIX Signals post.

use 5.010;

use strict;
use warnings;

package Notifier;

use constant CTRL_D => 4;
use constant DEBUG => $ENV{NOTIFIER_DEBUG};

my $cr = "\015\012";

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;

sub new
{
    my $class = shift;
    my $self = {
        pid => 0,
        watchers => {},
    };
    bless $self, $class;
    return $self;
}

sub disconnect_socket
{
    my ($self, $handle, $pid) = @_;
    $handle->on_drain(sub {
        say "disconnecting PID $pid...";
        $handle->destroy();
    });
    delete $self->{watchers}{$pid};
}

sub get_other_handles
{
    my ($self, $pid) = @_;
    return map { $self->{watchers}{$_}{handle} }
           grep { $_ != $pid } keys %{$self->{watchers}};
}

sub broadcast_line
{
    my ($self, $handles_ref, $line) = @_;
    foreach my $handle (@$handles_ref) {
        $handle->push_write($line . ${cr});
    }
}

sub watch_socket
{
    my ($self, $sock, $host, $port) = @_;

    # Closure variables
    my $pid = ++$self->{pid};

    my $handle; $handle = AnyEvent::Handle->new(
        fh => $sock,
        on_error => sub {
            say "Error: $! (PID $pid)";
            delete $self->{watchers}{$pid};
            $handle->destroy();
        },
        on_eof => sub {
            say "Disconnected (PID $pid)";
            delete $self->{watchers}{$pid};
            $handle->destroy();
        },
        on_read => sub {
            my $buffer = $handle->rbuf();
            $handle->rbuf() = '';

            if (length($buffer) == 1 and ord($buffer) == CTRL_D) {
                $handle->push_write('Received EOF.  ' .
                                    "Closing connection...${cr}");
                $self->disconnect_socket($handle, $pid);
            } else {
                my @handles = $self->get_other_handles($pid);
                foreach my $line (split /\r?\n/, $buffer) {
                    say "$host/$port : $line";
                    if (lc($line) =~ /quit|exit/) {
                        $self->disconnect_socket($handle, $pid);
                        return;
                    } else {
                        $self->broadcast_line(\@handles, $line);
                    }
                }
            }
        }
    );

    $handle->keepalive();
    $self->{watchers}{$pid} = { handle => $handle };
    $handle->push_write("Connected.  (PID $pid)${cr}");
}

sub prepare_handler
{
    my ($fh, $host, $port) = @_;
    DEBUG && warn "Listening on $host:$port\n";
}

sub _accept_handler
{
    my $self = shift;

    return sub {
        my ($sock, $peer_host, $peer_port) = @_;

        DEBUG && warn "Accepted connection from $peer_host:$peer_port\n";
        if (! $sock) {
            warn '$sock undefined' . "\n";
            return;
        }

        $self->watch_socket($sock, $peer_host, $peer_port);
    };
}

sub start_listen
{
    my ($self, $host, $port) = @_;

    $self->{server} = tcp_server($host,
                                 $port,
                                 $self->_accept_handler(),
                                 \&prepare_handler);
}

package main;

my $host = undef;
my $port = 12345;

my $kernel = Notifier->new();
$kernel->start_listen($host, $port);

AE::cv->recv();

Read Full Post »

Follow

Get every new post delivered to your Inbox.