Feeds:
Posts
Comments

Posts Tagged ‘server’

This is part 2 in my occasional AnyEvent series

As we already discovered, signals don’t work in Strawberry Perl [on Windows]. As they are so useful, we really want a way to emulate them. I can think of a few approaches, but the most obvious is to have a userspace "kernel" sitting on a socket which all processes register with.

The kernel will respond to a few commands

register: [args]
list_processes           # ps :)
signal: [pid] [message]  # similar to kill, but more flexible

A quick disclaimer: for the sake of brevity, I’ve omitted authentication and error handling.

Kernel Code

The server again uses AnyEvent and start_listen(), prepare_handler() and _accept_handler() should look pretty familiar.

Twiggy uses AE::io and checks errno for its conditions. I guess this is done for speed as AnyEvent::Handle is much easier to use.

sub watch_socket
{
    my ($self, $sock) = @_;

    # Closure variables
    my $pid = -1;

    my $handle; $handle = AnyEvent::Handle->new(
        fh  => $sock,
        on_error => sub {
            say "Error: $! (PID $pid)";
            $handle->destroy;
        },
        on_eof => sub {
            say "Disconnected (PID $pid)";
            delete $self->{watchers}{$pid};
            $handle->destroy;
        },
    );

    $handle->push_read(line => sub {
        my ($handle, $command) = @_;

        DEBUG && say "Received [$command]";

        # valid commands are:
        #     register: [args]
        #     list_processes
        #     signal: [pid] [message]

        if ($command =~ /^register:\s+(.+)/) {
            my $args = $1;
            $pid = ++$self->{pid};
            $self->register_watcher($handle, $pid, $args);
            # Don't set on_drain(...)
            return;
        } elsif ($command eq 'list_processes') {
            $self->list_processes($handle);
        } elsif ($command =~ /^signal:\s+(\d+)\s+(.+)/) {
            my ($pid, $message) = ($1, $2);
            $self->send_signal($handle, $pid, $message);
        } else {
            say 'Error: unrecognised command';
            $handle->push_write("Invalid command [$command]" . ${cr});
        }

        # For all commands apart from register, we want to close the
        # socket after sending the response which we do by setting
        # on_drain(...)
        $handle->on_drain(sub { $handle->destroy(); });
    });
}

register_watcher

register_watcher stores the handle and the information about the process in the object. It informs the process which PID has been given to the process. (I am still considering the registering process sending $$ to use as the PID).

sub register_watcher
{
    my ($self, $handle, $pid, $args) = @_;
    DEBUG && say "Registering [$args] (PID $pid)";

    $handle->keepalive(1);
    $self->{watchers}{$pid} = { handle => $handle, args => $args };
    $handle->push_write("Connected.  (PID $pid)" . ${cr});
    # on_read(...) needs to be set, otherwise disconnects from the
    # client side are not always detected
    $handle->on_read(sub { });
}

list_processes

list_processes gives a process list of processes that are registered with the kernel. This is similar, but uglier to ps(1).

sub list_processes
{
    my ($self, $handle) = @_;

    foreach my $pid (sort { $a <=> $b } keys %{$self->{watchers}}) {
        my $args = $self->{watchers}{$pid}{args};
        $handle->push_write("$pid: $args" . ${cr});
    }
}

send_signal

send_signal looks up the handle belonging to the process in $self->{watchers}. If it is registered, it is possible to send any message to the process, otherwise it returns an error message. I’ve added special handling to a disconnect message, allowing

This is the dangerous bit. kill(1) has some protection in that you are only able to send signals to processes you own (unless you are root). The implementation, as presented here, allows anyone who can telnet to the kernel port to send any message to any process that is registered. It would be necessary to add a layer of authentication and process ownership if the goal really was to provide something similar to unix signals.

sub send_signal
{
    my ($self, $handle, $pid, $message) = @_;

    if (! exists($self->{watchers}{$pid})) {
        say "Error: PID $pid does not exist";
        $handle->push_write("PID $pid does not exist" . ${cr});
    } else {
        my $process = $self->{watchers}{$pid}{handle};
        $process->push_write('signal: ' . $message . ${cr});
        if ($message eq 'disconnect') {
            delete $self->{watchers}{$pid};
            $process->on_drain(sub { $process->destroy(); });
        }
    }
}

kernel.pl – complete source

#!perl

use 5.010;

use strict;
use warnings;

package Kernel;

use constant DEBUG => $ENV{KERNEL_DEBUG};

my $cr = "\015\012";

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;

sub new
{
    my $class = shift;
    my $self = { pid => 0, watchers => {} };
    bless $self, $class;
    return $self;
}

sub register_watcher
{
    my ($self, $handle, $pid, $args) = @_;
    DEBUG && say "Registering [$args] (PID $pid)";

    $self->{watchers}{$pid} = { handle => $handle, args => $args };
    $handle->push_write("Connected.  (PID $pid)" . ${cr});
    $handle->on_read(sub { });
}

sub list_processes
{
    my ($self, $handle) = @_;

    foreach my $pid (sort keys %{$self->{watchers}}) {
        my $args = $self->{watchers}{$pid}{args};
        $handle->push_write("$pid: $args" . ${cr});
    }
}

sub send_signal
{
    my ($self, $handle, $pid, $message) = @_;

    if (! exists($self->{watchers}{$pid})) {
        say "Error: PID $pid does not exist";
        $handle->push_write("PID $pid does not exist" . ${cr});
    } else {
        my $process = $self->{watchers}{$pid}{handle};
        $process->push_write('signal: ' . $message . ${cr});
        if ($message eq 'disconnect') {
            delete $self->{watchers}{$pid};
            $process->on_drain(sub { $process->destroy(); });
        }
    }
}

sub watch_socket
{
    my ($self, $sock) = @_;

    # Closure variables
    my $pid = -1;

    my $handle; $handle = AnyEvent::Handle->new(
        fh  => $sock,
        on_error => sub {
            say "Error: $! (PID $pid)";
            $handle->destroy();
        },
        on_eof => sub {
            say "Disconnected (PID $pid)";
            delete $self->{watchers}{$pid};
            $handle->destroy();
        },
    );

    $handle->push_read(line => sub {
        my ($handle, $command) = @_;

        DEBUG && say "Received [$command]";

        if ($command =~ /^register:\s+(.+)/) {
            my $args = $1;
            $pid = ++$self->{pid};
            $handle->keepalive();
            $self->register_watcher($handle, $pid, $args);
            return;
        } elsif ($command eq 'list_processes') {
            $self->list_processes($handle);
        } elsif ($command =~ /^signal:\s+(\d+)\s+(.+)/) {
            my ($pid, $message) = ($1, $2);
            $self->send_signal($handle, $pid, $message);
        } else {
            say 'Error: unrecognised command';
            $handle->push_write("Invalid command [$command]" . ${cr});
        }

        $handle->on_drain(sub { $handle->destroy(); });
    });
}

sub prepare_handler
{
    my ($fh, $host, $port) = @_;
    DEBUG && warn "Listening on $host:$port\n";
}

sub _accept_handler
{
    my $self = shift;

    return sub {
        my ($sock, $peer_host, $peer_port) = @_;

        DEBUG && warn "Accepted connection from $peer_host:$peer_port\n";
        if (! $sock) {
            warn '$sock undefined' . "\n";
            return;
        }

        $self->watch_socket($sock);
    };
}

sub start_listen
{
    my ($self, $host, $port) = @_;

    $self->{server} = tcp_server($host,
                                 $port,
                                 $self->_accept_handler(),
                                 \&prepare_handler);
}

package main;

my $host = undef;
my $port = 12345;

my $kernel = Kernel->new();
$kernel->start_listen($host, $port);

AE::cv->recv();

Read Full Post »

This is part 1 in my occasional AnyEvent series

I’ve been playing around with Plack and Twiggy recently and that motivated me to take a look at AnyEvent, the eventing library that Twiggy is built upon.

Now, it seems to me that AnyEvent is useful in a similar problem-space to POE (or Coro, on Unix at least). POE has more documentation but AnyEvent can actually use a couple of Event Loops that were implemented in C: EV and libevent which is the backbone of the hugely successful memcached. Sounds good to me.

As usual, first things first. How do you make a simple TCP server? I took a look at how Twiggy does it – the code is available in Twiggy::Server. I won’t need everything in there of course.

The Preamble

I like the way that Twiggy sets a constant called DEBUG from an environment variable. Now I can call the script like this: $ SERVER_DEBUG=1 ./ae.pl and get debugging output.

#!/usr/bin/perl

use 5.010;

use strict;
use warnings;

package Server;

use constant CTRL_D => 4;
use constant DEBUG => $ENV{SERVER_DEBUG};

use IO::Handle;
use Socket qw(IPPROTO_TCP TCP_NODELAY);
use Errno qw(EAGAIN EINTR);

use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Util qw(WSAEWOULDBLOCK);

Basic Perl Objects

The watch variables only watch while they are in scope. If we have a simple object, we can dump them into the underlying blessed hash reference to keep ‘em in scope. I’m lamenting the non-standardness of Moose here, but that’s another post.

sub new
{
    my $class = shift;
    my $self = {};
    bless $self, $class;
    return $self;
}

Bottom-up or Top-down I’m never quite sure how to present my code. Hmmm…

AnyEvent::tcp_server

So, we’re assuming here that a server object will be called (with my $server Server->new()=) and then we will call $server->start_listen(...).

The example tcp_server call in the AnyEvent::Socket documentation rather unhelpfully demonstrates closing the socket the moment it has connected with a The internet is full, $host:$port. Go away! message. Oh well, maybe my examples are equally flawed. Thank goodness for Twiggy.

sub start_listen
{
    my ($self, $host, $port) = @_;

    $self->{server} = tcp_server($host,
                                 $port,
                                 $self->_accept_handler(),
                                 \&prepare_handler);
}

prepare_handler just logs a basic message on start-up. The accept handler returns a closure to maintain access to $self. The closure sets the socket options and then creates a watcher using watch_socket.

sub prepare_handler
{
    my ($fh, $host, $port) = @_;
    DEBUG && warn "Listening on $host:$port\n";
}

sub _accept_handler
{
    my $self = shift;

    return sub {
        my ($sock, $peer_host, $peer_port) = @_;

        DEBUG && warn "$sock Accepted connection from $peer_host:$peer_port\n";
        return unless $sock;

        setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, 1)
            or die "setsockopt(TCP_NODELAY) failed: $!";
        $sock->autoflush(1);

        # my $socket = IO::Socket::INET->new_from_fd($sock, 'r+');
        # $socket->autoflush(1);
        # $socket->blocking(0);

        $self->watch_socket($sock);
    };
}

AnyEvent IO watcher

The watcher is setup to echo whatever it received, back to the sender. If it receives EOF (sent when a telnet client hits CTRL-D), then it terminates the connection.

Now, I didn’t manage to get this working immediately. If the watcher goes out of scope, it doesn’t end up watching anything. And I originally omitted the undef $headers_io_watcher statements. As the closure wasn’t referring to the watcher variable, it went out of scope immediately. Adding them added a reference which stopped that happening. Nice, if a little subtle.

sub watch_socket
{
    my ($self, $sock) = @_;

    my $headers_io_watcher;

    $headers_io_watcher = AE::io $sock, 0, sub {
        while (defined(my $line = <$sock>)) {
            $line =~ s/\r?\n$//;
            say "Received: [$line] " . length($line) . ' ' . ord($line);

            if (length($line) == 1 and ord($line) == CTRL_D) {
                print $sock "Received EOF.  Closing connection...\r\n";
                undef $headers_io_watcher;
            } else {
                print $sock "You sent [$line]\r\n";
            }
        }

        if ($! and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK ) {
            undef $headers_io_watcher;
            die $!;
        } elsif (!$!) {
            undef $headers_io_watcher;
            die "client disconnected";
        }
    };
}

main()

package main;

my $host = undef;
my $port = 12345;

my $server = Server->new();
$server->start_listen($host, $port);

AE::cv->recv();

And here is the result.

$ telnet localhost 12345
Trying ::1...
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
You sent [hello]
Received EOF.  Closing connection...
Connection closed by foreign host.

Read Full Post »

Follow

Get every new post delivered to your Inbox.