Feeds:
Posts
Comments

Posts Tagged ‘anyevent’

For the producer demonstrated last time, it is easy to make a consumer using AnyEvent. Not only that, I can borrow most of the code from my list_processes script.

The utility functions are the same as in the unix pipe producer/consumer.

sub consumer
{
    my ($host, $port, $cb) = @_;

    my $cv = AE::cv();

    my $handle; $handle = AnyEvent::Handle->new(
        connect  => [$host => $port],
        on_error => sub {
            print("Connection error: $!\n");
            $handle->destroy();
        },
        on_eof => sub {
            print "Connection closed\n";
            $handle->destroy();
            $cv->send();
        }
    );

    # We need to consume the first line which contains the PID message
    $handle->push_read(line => sub {});

    $handle->on_read(sub {
        my $handle = shift;
        my $data = $handle->rbuf();
        $handle->rbuf() = '';
        $data =~ tr/\r//d;
        foreach my $line (split /\n/, $data) {
            $cb->($line);
        }
    });

    return $cv;
}

The callback function checks the subject is correct and then calls process_file(...). It should be fairly easy to see how to extend this for much more complex producers and consumers.

sub process_file
{
    my $file = shift;
    my_log "Processing file [$file]";
    # File processing logic here ...
}

sub handle_line
{
    my $line = shift;

    my ($subject, $action) = split /\s+/, $line;
    if ($subject =~ m{^/producer/file-creator/new-file}) {
        process_file($action);
    } else {
        my_log "FROM PRODUCER [$line]";
    }
}

my $cv = consumer('localhost', 12345, \&handle_line);
$cv->recv();

Read Full Post »

Previously we looked at connecting one producer to one consumer using a unix pipe. If we want to do many to many connections, we can use the AnyEvent Notifier I mentioned in Connecting Software Systems.

Instead of echoing a notification that a work unit has been created to STDOUT I write it to a socket connected to the notifier.

IO::Socket makes this absurdly easy.

use IO::Socket ':crlf';
use constant SHUTDOWN_SOCK_RW => 2;

my $sock = IO::Socket::INET->new(PeerAddr => 'localhost:12345');

for (1..5) {
    my_log "Iteration $_";
    my $filename = create_file($top);
    print $sock "/producer/file-creator/new-file $filename" . CRLF;
    my_log "PRODUCED $filename";
}

$sock->shutdown(SHUTDOWN_SOCK_RW);
$sock->close();

I often use a subject<SPACE>message format to make it easy for clients to filter between important messages.

I’ll demonstrate the consumer next time.

Read Full Post »

What has been on my mind recently is if I write some software to do a task, say process some files and dump some other files to disk, how can that process inform another process that the files are ready?

Using Shell

There are millions of ways to do this. The easiest is probably to wrap both processes in a shell script:

create_files
process_created_files

The magic of unix means that process_created_files is triggered as soon as create_files exits.

This has a couple of problems. What if there were any errors in create_files? What if create_files takes a long time, and we want to start processing as soon as the first file has been created?

There is a great trick to connect two processes together using the shell (hint: using pipes) that I’m going to talk about next time. For now, let’s focus on the more heavy-weight options.

Using a Filewatcher

There are a few modules on CPAN that make it easy to build a filewatcher. A couple of examples are:

Maybe it is just me, but watching for file changes seems a bit 1970s, so moving swiftly on…

Using Third-Party Mechanisms

For connecting 1 producer to 1 consumer, I really like message queues. Apache has a crufy enterprise protocol called AMQP

RestMQ is a queue built on top of Redis.

Or you could use the Pub/Sub mechanism built into Redis directly.

ZeroMQ pretty much solves my problem without having to drop down to raw sockets.

# —

Unfortunately, none of these options are available to me. That brings me on to the raw socket options.

Using Raw Sockets

If the producer provides a server for interested consumers to connect to, you can have a bunch of consumers listening for events. Even better, you can add one layer of indirection and allow for many to many interactions.

An AnyEvent Notifier

The core of this code was taken from my Emulating POSIX Signals post.

use 5.010;

use strict;
use warnings;

package Notifier;

use constant CTRL_D => 4;
use constant DEBUG => $ENV{NOTIFIER_DEBUG};

my $cr = "\015\012";

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;

sub new
{
    my $class = shift;
    my $self = {
        pid => 0,
        watchers => {},
    };
    bless $self, $class;
    return $self;
}

sub disconnect_socket
{
    my ($self, $handle, $pid) = @_;
    $handle->on_drain(sub {
        say "disconnecting PID $pid...";
        $handle->destroy();
    });
    delete $self->{watchers}{$pid};
}

sub get_other_handles
{
    my ($self, $pid) = @_;
    return map { $self->{watchers}{$_}{handle} }
           grep { $_ != $pid } keys %{$self->{watchers}};
}

sub broadcast_line
{
    my ($self, $handles_ref, $line) = @_;
    foreach my $handle (@$handles_ref) {
        $handle->push_write($line . ${cr});
    }
}

sub watch_socket
{
    my ($self, $sock, $host, $port) = @_;

    # Closure variables
    my $pid = ++$self->{pid};

    my $handle; $handle = AnyEvent::Handle->new(
        fh => $sock,
        on_error => sub {
            say "Error: $! (PID $pid)";
            delete $self->{watchers}{$pid};
            $handle->destroy();
        },
        on_eof => sub {
            say "Disconnected (PID $pid)";
            delete $self->{watchers}{$pid};
            $handle->destroy();
        },
        on_read => sub {
            my $buffer = $handle->rbuf();
            $handle->rbuf() = '';

            if (length($buffer) == 1 and ord($buffer) == CTRL_D) {
                $handle->push_write('Received EOF.  ' .
                                    "Closing connection...${cr}");
                $self->disconnect_socket($handle, $pid);
            } else {
                my @handles = $self->get_other_handles($pid);
                foreach my $line (split /\r?\n/, $buffer) {
                    say "$host/$port : $line";
                    if (lc($line) =~ /quit|exit/) {
                        $self->disconnect_socket($handle, $pid);
                        return;
                    } else {
                        $self->broadcast_line(\@handles, $line);
                    }
                }
            }
        }
    );

    $handle->keepalive();
    $self->{watchers}{$pid} = { handle => $handle };
    $handle->push_write("Connected.  (PID $pid)${cr}");
}

sub prepare_handler
{
    my ($fh, $host, $port) = @_;
    DEBUG && warn "Listening on $host:$port\n";
}

sub _accept_handler
{
    my $self = shift;

    return sub {
        my ($sock, $peer_host, $peer_port) = @_;

        DEBUG && warn "Accepted connection from $peer_host:$peer_port\n";
        if (! $sock) {
            warn '$sock undefined' . "\n";
            return;
        }

        $self->watch_socket($sock, $peer_host, $peer_port);
    };
}

sub start_listen
{
    my ($self, $host, $port) = @_;

    $self->{server} = tcp_server($host,
                                 $port,
                                 $self->_accept_handler(),
                                 \&prepare_handler);
}

package main;

my $host = undef;
my $port = 12345;

my $kernel = Notifier->new();
$kernel->start_listen($host, $port);

AE::cv->recv();

Read Full Post »

$ date
Sun May  2 09:22:09 GMTDT 2010
$ perl -MPOSIX -le 'print "Week ",
>                   POSIX::strftime("%V", gmtime(time))'
Week 17

One of the differences between summarising emacs news and perl news is that there are many sources of perl news. This makes a good summary more valuable. For emacs news you only need to visit A Better Planet Emacs. For perl, there are hundreds of sources, including:

And many others I have missed.

Eweek Slideshow (Padre)

Eweek has a slideshow with a list of 25 reasons why Perl keeps rising in the enterprise. The interesting points are:

  • it is extensible and flexible, i.e. has good libraries including Catalyst, DBIx::Class and Plack
  • the new release 5.12 has improved unicode and datetime support
  • everyone uses it "used in virtually 100% of the Fortune 500"
  • it is built into all major OSes (apart from Windows)
  • it has good IDE support including Padre – a new Perl IDE and Komodo
  • it is in the top 10 of the TIOBE index (why do people still think that TIOBE is relevant?)

Sam Crawley also talks about Padre so I thought I’d better install it. Conclusion: it is great, but obviously as an emacs fan it isn’t for me. Having said that, if I could customise Padre as easily with perl as I can customise emacs with lisp, I would consider switching.

AnyEvent

My own posts have mainly covered AnyEvent.

Other News

Brian kelley decides to stick with Perl over Powershell on windows due to the availability of libraries and examples. (That’s where we beat everyone, no?)

xenoterracide has started a series on teaching perl. Here is the prologue and week 1a and 1b.

Pythian has another perl news roundup.

And finally, one you’ve probably seen on all the news links already. chromatic has a great post on volunteers and civility. It is advice I should often bear in mind myself.

Read Full Post »

This is part 4 in my occasional AnyEvent series

When I tried to move my AnyEvent-based code over to Windows today, I got a slightly unpleasant surprise.

$ perl kernel.pl
fcntl is not implemented at c:/strawberry/perl/site/lib/AnyEvent/Util.pm line 362.

(Actually, that path is not accurate – it is from simulating the problem on a different computer. The actual path is a network share.)

Drat I thought, maybe AnyEvent isn’t as portable as I thought. But hang on a minute. When doubting a widely used library ahead of your own code, it is usually your code that is at fault. In my experience anyhow. And besides, Twiggy runs on Windows. What have I done wrong?

So I had a look at the offending line of code.

# Sets the blocking state of the given filehandle (true == nonblocking,
# false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
# broken (i.e. windows) platforms.

BEGIN {
   *fh_nonblocking = AnyEvent::WIN32
      ? sub($$) {
          ioctl $_[0], 0x8004667e, pack "L", $_[1]; # FIONBIO
        }
      : sub($$) {
          fcntl $_[0], AnyEvent::F_SETFL, $_[1] ? AnyEvent::O_NONBLOCK : 0;
        }
   ;
}

While I was there, a quick glance down the comments gave me a picture of someone who enjoys working on Windows as much as I do :)

# perl's socketpair emulation fails on many vista machines, because
# vista returns fantasy port numbers.

...

# vista has completely broken peername/sockname that return
# fantasy ports. this combo seems to work, though.

...

# vista example (you can't make this shit up...):

I quickly tracked the root of the problem down to a file called constants.pl. Uh oh, take a look at the WIN32 subroutine.

package AnyEvent;
sub CYGWIN () { 0 }
sub WIN32 () { 0 }
...

That is actually a file generated from a Unix install of AnyEvent. constants.pl is generated at make time from a file called constants.pl.PL that looks like this.

sub i($$) {
   print "sub $_[0] () { ", $_[1]*1, " }\n";
}

print "package AnyEvent;\n";

our $WIN32 = $^O =~ /mswin32/i;

i CYGWIN => $^O =~ /cygwin/i;
i WIN32  => $WIN32;
...

And this is down to my company’s perl policy. A number of pure perl modules are installed on a windows network share which is samba mounted onto the unix servers. We’re primarily a C++/Java shop and Perl is something of a second-class citizen but there are a good number of modules available (including Moose, AnyEvent, POE, etc.)

The downside is that it is hard to get modules installed that aren’t there already, and we are not permitted to download them ourselves. Most of the time, this isn’t a problem.

Anyway, I tried to fix this in my own script using variants on:

package AnyEvent;
use constant WIN32 => 1;
package Kernel;

and

sub AnyEvent::WIN32 () { 1 }

both before and after the use AnyEvent lines. I couldn’t get round it though. This happened when the definitions were after the use lines.

$ perl kernel.pl
Constant subroutine AnyEvent::WIN32 redefined at c:/strawberry/perl/lib/constant.pm line 131.
Constant subroutine AnyEvent::WIN32 redefined at kernel.pl line 23.
fcntl is not implemented at c:/strawberry/perl/site/lib/AnyEvent/Util.pm line 362.

And this is when it was placed before.

$ perl kernel.pl
Constant subroutine WIN32 redefined at c:/strawberry/perl/site/lib/AnyEvent/constants.pl line 3.
fcntl is not implemented at c:/strawberry/perl/site/lib/AnyEvent/Util.pm line 362.

How frustrating!

Read Full Post »

list_processes aka ps

This is part 3 in my occasional AnyEvent series

Now that we have our userspace kernel, it is time to write the utilities to make it more pleasant to use. On Linux, I can simulate all of the commands using telnet, but my Windows box doesn’t have telnet installed. I will therefore need a list_processes command, a signal command and a library to help with registering new processes.

First up is list_processes which is modelled on the http_get function presented in AnyEvent::Intro. It simply needs to send:

list_processes\015\012

and then call the passed in function with each line of response. The result might be something like the following.

$ ./list_processes.pl
1: ./register.pl a b c
2: ./register.pl

list_processes.pl

use 5.010;

use strict;
use warnings;

use constant DEBUG => $ENV{KERNEL_DEBUG};

my $cr = "\015\012";

use AnyEvent::Handle;

sub list_processes
{
    my ($host, $port, $cb) = @_;

    my $cv = AE::cv();

    my $handle; $handle = AnyEvent::Handle->new(
        connect  => [$host => $port],
        on_error => sub {
            say("Connection error: $!");
            $handle->destroy();
        },
        on_eof => sub {
            DEBUG && say 'Connection closed';
            $handle->destroy();
            $cv->send();
        }
    );

    $handle->push_write('list_processes' . ${cr});

    $handle->on_read(sub {
        my $handle = shift;
        my $data = $handle->rbuf();
        $handle->rbuf() = '';
        $data =~ tr/\r//d;
        foreach my $line (split /\n/, $data) {
            $cb->($line);
        }
    });

    return $cv;
}

my $cv = list_processes('localhost', 12345, sub { say @_ } );
$cv->recv();

Read Full Post »

This is part 2 in my occasional AnyEvent series

As we already discovered, signals don’t work in Strawberry Perl [on Windows]. As they are so useful, we really want a way to emulate them. I can think of a few approaches, but the most obvious is to have a userspace "kernel" sitting on a socket which all processes register with.

The kernel will respond to a few commands

register: [args]
list_processes           # ps :)
signal: [pid] [message]  # similar to kill, but more flexible

A quick disclaimer: for the sake of brevity, I’ve omitted authentication and error handling.

Kernel Code

The server again uses AnyEvent and start_listen(), prepare_handler() and _accept_handler() should look pretty familiar.

Twiggy uses AE::io and checks errno for its conditions. I guess this is done for speed as AnyEvent::Handle is much easier to use.

sub watch_socket
{
    my ($self, $sock) = @_;

    # Closure variables
    my $pid = -1;

    my $handle; $handle = AnyEvent::Handle->new(
        fh  => $sock,
        on_error => sub {
            say "Error: $! (PID $pid)";
            $handle->destroy;
        },
        on_eof => sub {
            say "Disconnected (PID $pid)";
            delete $self->{watchers}{$pid};
            $handle->destroy;
        },
    );

    $handle->push_read(line => sub {
        my ($handle, $command) = @_;

        DEBUG && say "Received [$command]";

        # valid commands are:
        #     register: [args]
        #     list_processes
        #     signal: [pid] [message]

        if ($command =~ /^register:\s+(.+)/) {
            my $args = $1;
            $pid = ++$self->{pid};
            $self->register_watcher($handle, $pid, $args);
            # Don't set on_drain(...)
            return;
        } elsif ($command eq 'list_processes') {
            $self->list_processes($handle);
        } elsif ($command =~ /^signal:\s+(\d+)\s+(.+)/) {
            my ($pid, $message) = ($1, $2);
            $self->send_signal($handle, $pid, $message);
        } else {
            say 'Error: unrecognised command';
            $handle->push_write("Invalid command [$command]" . ${cr});
        }

        # For all commands apart from register, we want to close the
        # socket after sending the response which we do by setting
        # on_drain(...)
        $handle->on_drain(sub { $handle->destroy(); });
    });
}

register_watcher

register_watcher stores the handle and the information about the process in the object. It informs the process which PID has been given to the process. (I am still considering the registering process sending $$ to use as the PID).

sub register_watcher
{
    my ($self, $handle, $pid, $args) = @_;
    DEBUG && say "Registering [$args] (PID $pid)";

    $handle->keepalive(1);
    $self->{watchers}{$pid} = { handle => $handle, args => $args };
    $handle->push_write("Connected.  (PID $pid)" . ${cr});
    # on_read(...) needs to be set, otherwise disconnects from the
    # client side are not always detected
    $handle->on_read(sub { });
}

list_processes

list_processes gives a process list of processes that are registered with the kernel. This is similar, but uglier to ps(1).

sub list_processes
{
    my ($self, $handle) = @_;

    foreach my $pid (sort { $a <=> $b } keys %{$self->{watchers}}) {
        my $args = $self->{watchers}{$pid}{args};
        $handle->push_write("$pid: $args" . ${cr});
    }
}

send_signal

send_signal looks up the handle belonging to the process in $self->{watchers}. If it is registered, it is possible to send any message to the process, otherwise it returns an error message. I’ve added special handling to a disconnect message, allowing

This is the dangerous bit. kill(1) has some protection in that you are only able to send signals to processes you own (unless you are root). The implementation, as presented here, allows anyone who can telnet to the kernel port to send any message to any process that is registered. It would be necessary to add a layer of authentication and process ownership if the goal really was to provide something similar to unix signals.

sub send_signal
{
    my ($self, $handle, $pid, $message) = @_;

    if (! exists($self->{watchers}{$pid})) {
        say "Error: PID $pid does not exist";
        $handle->push_write("PID $pid does not exist" . ${cr});
    } else {
        my $process = $self->{watchers}{$pid}{handle};
        $process->push_write('signal: ' . $message . ${cr});
        if ($message eq 'disconnect') {
            delete $self->{watchers}{$pid};
            $process->on_drain(sub { $process->destroy(); });
        }
    }
}

kernel.pl – complete source

#!perl

use 5.010;

use strict;
use warnings;

package Kernel;

use constant DEBUG => $ENV{KERNEL_DEBUG};

my $cr = "\015\012";

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;

sub new
{
    my $class = shift;
    my $self = { pid => 0, watchers => {} };
    bless $self, $class;
    return $self;
}

sub register_watcher
{
    my ($self, $handle, $pid, $args) = @_;
    DEBUG && say "Registering [$args] (PID $pid)";

    $self->{watchers}{$pid} = { handle => $handle, args => $args };
    $handle->push_write("Connected.  (PID $pid)" . ${cr});
    $handle->on_read(sub { });
}

sub list_processes
{
    my ($self, $handle) = @_;

    foreach my $pid (sort keys %{$self->{watchers}}) {
        my $args = $self->{watchers}{$pid}{args};
        $handle->push_write("$pid: $args" . ${cr});
    }
}

sub send_signal
{
    my ($self, $handle, $pid, $message) = @_;

    if (! exists($self->{watchers}{$pid})) {
        say "Error: PID $pid does not exist";
        $handle->push_write("PID $pid does not exist" . ${cr});
    } else {
        my $process = $self->{watchers}{$pid}{handle};
        $process->push_write('signal: ' . $message . ${cr});
        if ($message eq 'disconnect') {
            delete $self->{watchers}{$pid};
            $process->on_drain(sub { $process->destroy(); });
        }
    }
}

sub watch_socket
{
    my ($self, $sock) = @_;

    # Closure variables
    my $pid = -1;

    my $handle; $handle = AnyEvent::Handle->new(
        fh  => $sock,
        on_error => sub {
            say "Error: $! (PID $pid)";
            $handle->destroy();
        },
        on_eof => sub {
            say "Disconnected (PID $pid)";
            delete $self->{watchers}{$pid};
            $handle->destroy();
        },
    );

    $handle->push_read(line => sub {
        my ($handle, $command) = @_;

        DEBUG && say "Received [$command]";

        if ($command =~ /^register:\s+(.+)/) {
            my $args = $1;
            $pid = ++$self->{pid};
            $handle->keepalive();
            $self->register_watcher($handle, $pid, $args);
            return;
        } elsif ($command eq 'list_processes') {
            $self->list_processes($handle);
        } elsif ($command =~ /^signal:\s+(\d+)\s+(.+)/) {
            my ($pid, $message) = ($1, $2);
            $self->send_signal($handle, $pid, $message);
        } else {
            say 'Error: unrecognised command';
            $handle->push_write("Invalid command [$command]" . ${cr});
        }

        $handle->on_drain(sub { $handle->destroy(); });
    });
}

sub prepare_handler
{
    my ($fh, $host, $port) = @_;
    DEBUG && warn "Listening on $host:$port\n";
}

sub _accept_handler
{
    my $self = shift;

    return sub {
        my ($sock, $peer_host, $peer_port) = @_;

        DEBUG && warn "Accepted connection from $peer_host:$peer_port\n";
        if (! $sock) {
            warn '$sock undefined' . "\n";
            return;
        }

        $self->watch_socket($sock);
    };
}

sub start_listen
{
    my ($self, $host, $port) = @_;

    $self->{server} = tcp_server($host,
                                 $port,
                                 $self->_accept_handler(),
                                 \&prepare_handler);
}

package main;

my $host = undef;
my $port = 12345;

my $kernel = Kernel->new();
$kernel->start_listen($host, $port);

AE::cv->recv();

Read Full Post »

Older Posts »

Follow

Get every new post delivered to your Inbox.