Feeds:
Posts
Comments

Posts Tagged ‘sockets’

Previously we looked at connecting one producer to one consumer using a unix pipe. If we want to do many to many connections, we can use the AnyEvent Notifier I mentioned in Connecting Software Systems.

Instead of echoing a notification that a work unit has been created to STDOUT I write it to a socket connected to the notifier.

IO::Socket makes this absurdly easy.

use IO::Socket ':crlf';
use constant SHUTDOWN_SOCK_RW => 2;

my $sock = IO::Socket::INET->new(PeerAddr => 'localhost:12345');

for (1..5) {
    my_log "Iteration $_";
    my $filename = create_file($top);
    print $sock "/producer/file-creator/new-file $filename" . CRLF;
    my_log "PRODUCED $filename";
}

$sock->shutdown(SHUTDOWN_SOCK_RW);
$sock->close();

I often use a subject<SPACE>message format to make it easy for clients to filter between important messages.

I’ll demonstrate the consumer next time.

Read Full Post »

What has been on my mind recently is if I write some software to do a task, say process some files and dump some other files to disk, how can that process inform another process that the files are ready?

Using Shell

There are millions of ways to do this. The easiest is probably to wrap both processes in a shell script:

create_files
process_created_files

The magic of unix means that process_created_files is triggered as soon as create_files exits.

This has a couple of problems. What if there were any errors in create_files? What if create_files takes a long time, and we want to start processing as soon as the first file has been created?

There is a great trick to connect two processes together using the shell (hint: using pipes) that I’m going to talk about next time. For now, let’s focus on the more heavy-weight options.

Using a Filewatcher

There are a few modules on CPAN that make it easy to build a filewatcher. A couple of examples are:

Maybe it is just me, but watching for file changes seems a bit 1970s, so moving swiftly on…

Using Third-Party Mechanisms

For connecting 1 producer to 1 consumer, I really like message queues. Apache has a crufy enterprise protocol called AMQP

RestMQ is a queue built on top of Redis.

Or you could use the Pub/Sub mechanism built into Redis directly.

ZeroMQ pretty much solves my problem without having to drop down to raw sockets.

# —

Unfortunately, none of these options are available to me. That brings me on to the raw socket options.

Using Raw Sockets

If the producer provides a server for interested consumers to connect to, you can have a bunch of consumers listening for events. Even better, you can add one layer of indirection and allow for many to many interactions.

An AnyEvent Notifier

The core of this code was taken from my Emulating POSIX Signals post.

use 5.010;

use strict;
use warnings;

package Notifier;

use constant CTRL_D => 4;
use constant DEBUG => $ENV{NOTIFIER_DEBUG};

my $cr = "\015\012";

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;

sub new
{
    my $class = shift;
    my $self = {
        pid => 0,
        watchers => {},
    };
    bless $self, $class;
    return $self;
}

sub disconnect_socket
{
    my ($self, $handle, $pid) = @_;
    $handle->on_drain(sub {
        say "disconnecting PID $pid...";
        $handle->destroy();
    });
    delete $self->{watchers}{$pid};
}

sub get_other_handles
{
    my ($self, $pid) = @_;
    return map { $self->{watchers}{$_}{handle} }
           grep { $_ != $pid } keys %{$self->{watchers}};
}

sub broadcast_line
{
    my ($self, $handles_ref, $line) = @_;
    foreach my $handle (@$handles_ref) {
        $handle->push_write($line . ${cr});
    }
}

sub watch_socket
{
    my ($self, $sock, $host, $port) = @_;

    # Closure variables
    my $pid = ++$self->{pid};

    my $handle; $handle = AnyEvent::Handle->new(
        fh => $sock,
        on_error => sub {
            say "Error: $! (PID $pid)";
            delete $self->{watchers}{$pid};
            $handle->destroy();
        },
        on_eof => sub {
            say "Disconnected (PID $pid)";
            delete $self->{watchers}{$pid};
            $handle->destroy();
        },
        on_read => sub {
            my $buffer = $handle->rbuf();
            $handle->rbuf() = '';

            if (length($buffer) == 1 and ord($buffer) == CTRL_D) {
                $handle->push_write('Received EOF.  ' .
                                    "Closing connection...${cr}");
                $self->disconnect_socket($handle, $pid);
            } else {
                my @handles = $self->get_other_handles($pid);
                foreach my $line (split /\r?\n/, $buffer) {
                    say "$host/$port : $line";
                    if (lc($line) =~ /quit|exit/) {
                        $self->disconnect_socket($handle, $pid);
                        return;
                    } else {
                        $self->broadcast_line(\@handles, $line);
                    }
                }
            }
        }
    );

    $handle->keepalive();
    $self->{watchers}{$pid} = { handle => $handle };
    $handle->push_write("Connected.  (PID $pid)${cr}");
}

sub prepare_handler
{
    my ($fh, $host, $port) = @_;
    DEBUG && warn "Listening on $host:$port\n";
}

sub _accept_handler
{
    my $self = shift;

    return sub {
        my ($sock, $peer_host, $peer_port) = @_;

        DEBUG && warn "Accepted connection from $peer_host:$peer_port\n";
        if (! $sock) {
            warn '$sock undefined' . "\n";
            return;
        }

        $self->watch_socket($sock, $peer_host, $peer_port);
    };
}

sub start_listen
{
    my ($self, $host, $port) = @_;

    $self->{server} = tcp_server($host,
                                 $port,
                                 $self->_accept_handler(),
                                 \&prepare_handler);
}

package main;

my $host = undef;
my $port = 12345;

my $kernel = Notifier->new();
$kernel->start_listen($host, $port);

AE::cv->recv();

Read Full Post »

Follow

Get every new post delivered to your Inbox.