Feeds:
Posts
Comments

Posts Tagged ‘networking’

list_processes aka ps

This is part 3 in my occasional AnyEvent series

Now that we have our userspace kernel, it is time to write the utilities to make it more pleasant to use. On Linux, I can simulate all of the commands using telnet, but my Windows box doesn’t have telnet installed. I will therefore need a list_processes command, a signal command and a library to help with registering new processes.

First up is list_processes which is modelled on the http_get function presented in AnyEvent::Intro. It simply needs to send:

list_processes\015\012

and then call the passed in function with each line of response. The result might be something like the following.

$ ./list_processes.pl
1: ./register.pl a b c
2: ./register.pl

list_processes.pl

use 5.010;

use strict;
use warnings;

use constant DEBUG => $ENV{KERNEL_DEBUG};

my $cr = "\015\012";

use AnyEvent::Handle;

sub list_processes
{
    my ($host, $port, $cb) = @_;

    my $cv = AE::cv();

    my $handle; $handle = AnyEvent::Handle->new(
        connect  => [$host => $port],
        on_error => sub {
            say("Connection error: $!");
            $handle->destroy();
        },
        on_eof => sub {
            DEBUG && say 'Connection closed';
            $handle->destroy();
            $cv->send();
        }
    );

    $handle->push_write('list_processes' . ${cr});

    $handle->on_read(sub {
        my $handle = shift;
        my $data = $handle->rbuf();
        $handle->rbuf() = '';
        $data =~ tr/\r//d;
        foreach my $line (split /\n/, $data) {
            $cb->($line);
        }
    });

    return $cv;
}

my $cv = list_processes('localhost', 12345, sub { say @_ } );
$cv->recv();

Read Full Post »

This is part 2 in my occasional AnyEvent series

As we already discovered, signals don’t work in Strawberry Perl [on Windows]. As they are so useful, we really want a way to emulate them. I can think of a few approaches, but the most obvious is to have a userspace "kernel" sitting on a socket which all processes register with.

The kernel will respond to a few commands

register: [args]
list_processes           # ps :)
signal: [pid] [message]  # similar to kill, but more flexible

A quick disclaimer: for the sake of brevity, I’ve omitted authentication and error handling.

Kernel Code

The server again uses AnyEvent and start_listen(), prepare_handler() and _accept_handler() should look pretty familiar.

Twiggy uses AE::io and checks errno for its conditions. I guess this is done for speed as AnyEvent::Handle is much easier to use.

sub watch_socket
{
    my ($self, $sock) = @_;

    # Closure variables
    my $pid = -1;

    my $handle; $handle = AnyEvent::Handle->new(
        fh  => $sock,
        on_error => sub {
            say "Error: $! (PID $pid)";
            $handle->destroy;
        },
        on_eof => sub {
            say "Disconnected (PID $pid)";
            delete $self->{watchers}{$pid};
            $handle->destroy;
        },
    );

    $handle->push_read(line => sub {
        my ($handle, $command) = @_;

        DEBUG && say "Received [$command]";

        # valid commands are:
        #     register: [args]
        #     list_processes
        #     signal: [pid] [message]

        if ($command =~ /^register:\s+(.+)/) {
            my $args = $1;
            $pid = ++$self->{pid};
            $self->register_watcher($handle, $pid, $args);
            # Don't set on_drain(...)
            return;
        } elsif ($command eq 'list_processes') {
            $self->list_processes($handle);
        } elsif ($command =~ /^signal:\s+(\d+)\s+(.+)/) {
            my ($pid, $message) = ($1, $2);
            $self->send_signal($handle, $pid, $message);
        } else {
            say 'Error: unrecognised command';
            $handle->push_write("Invalid command [$command]" . ${cr});
        }

        # For all commands apart from register, we want to close the
        # socket after sending the response which we do by setting
        # on_drain(...)
        $handle->on_drain(sub { $handle->destroy(); });
    });
}

register_watcher

register_watcher stores the handle and the information about the process in the object. It informs the process which PID has been given to the process. (I am still considering the registering process sending $$ to use as the PID).

sub register_watcher
{
    my ($self, $handle, $pid, $args) = @_;
    DEBUG && say "Registering [$args] (PID $pid)";

    $handle->keepalive(1);
    $self->{watchers}{$pid} = { handle => $handle, args => $args };
    $handle->push_write("Connected.  (PID $pid)" . ${cr});
    # on_read(...) needs to be set, otherwise disconnects from the
    # client side are not always detected
    $handle->on_read(sub { });
}

list_processes

list_processes gives a process list of processes that are registered with the kernel. This is similar, but uglier to ps(1).

sub list_processes
{
    my ($self, $handle) = @_;

    foreach my $pid (sort { $a <=> $b } keys %{$self->{watchers}}) {
        my $args = $self->{watchers}{$pid}{args};
        $handle->push_write("$pid: $args" . ${cr});
    }
}

send_signal

send_signal looks up the handle belonging to the process in $self->{watchers}. If it is registered, it is possible to send any message to the process, otherwise it returns an error message. I’ve added special handling to a disconnect message, allowing

This is the dangerous bit. kill(1) has some protection in that you are only able to send signals to processes you own (unless you are root). The implementation, as presented here, allows anyone who can telnet to the kernel port to send any message to any process that is registered. It would be necessary to add a layer of authentication and process ownership if the goal really was to provide something similar to unix signals.

sub send_signal
{
    my ($self, $handle, $pid, $message) = @_;

    if (! exists($self->{watchers}{$pid})) {
        say "Error: PID $pid does not exist";
        $handle->push_write("PID $pid does not exist" . ${cr});
    } else {
        my $process = $self->{watchers}{$pid}{handle};
        $process->push_write('signal: ' . $message . ${cr});
        if ($message eq 'disconnect') {
            delete $self->{watchers}{$pid};
            $process->on_drain(sub { $process->destroy(); });
        }
    }
}

kernel.pl – complete source

#!perl

use 5.010;

use strict;
use warnings;

package Kernel;

use constant DEBUG => $ENV{KERNEL_DEBUG};

my $cr = "\015\012";

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;

sub new
{
    my $class = shift;
    my $self = { pid => 0, watchers => {} };
    bless $self, $class;
    return $self;
}

sub register_watcher
{
    my ($self, $handle, $pid, $args) = @_;
    DEBUG && say "Registering [$args] (PID $pid)";

    $self->{watchers}{$pid} = { handle => $handle, args => $args };
    $handle->push_write("Connected.  (PID $pid)" . ${cr});
    $handle->on_read(sub { });
}

sub list_processes
{
    my ($self, $handle) = @_;

    foreach my $pid (sort keys %{$self->{watchers}}) {
        my $args = $self->{watchers}{$pid}{args};
        $handle->push_write("$pid: $args" . ${cr});
    }
}

sub send_signal
{
    my ($self, $handle, $pid, $message) = @_;

    if (! exists($self->{watchers}{$pid})) {
        say "Error: PID $pid does not exist";
        $handle->push_write("PID $pid does not exist" . ${cr});
    } else {
        my $process = $self->{watchers}{$pid}{handle};
        $process->push_write('signal: ' . $message . ${cr});
        if ($message eq 'disconnect') {
            delete $self->{watchers}{$pid};
            $process->on_drain(sub { $process->destroy(); });
        }
    }
}

sub watch_socket
{
    my ($self, $sock) = @_;

    # Closure variables
    my $pid = -1;

    my $handle; $handle = AnyEvent::Handle->new(
        fh  => $sock,
        on_error => sub {
            say "Error: $! (PID $pid)";
            $handle->destroy();
        },
        on_eof => sub {
            say "Disconnected (PID $pid)";
            delete $self->{watchers}{$pid};
            $handle->destroy();
        },
    );

    $handle->push_read(line => sub {
        my ($handle, $command) = @_;

        DEBUG && say "Received [$command]";

        if ($command =~ /^register:\s+(.+)/) {
            my $args = $1;
            $pid = ++$self->{pid};
            $handle->keepalive();
            $self->register_watcher($handle, $pid, $args);
            return;
        } elsif ($command eq 'list_processes') {
            $self->list_processes($handle);
        } elsif ($command =~ /^signal:\s+(\d+)\s+(.+)/) {
            my ($pid, $message) = ($1, $2);
            $self->send_signal($handle, $pid, $message);
        } else {
            say 'Error: unrecognised command';
            $handle->push_write("Invalid command [$command]" . ${cr});
        }

        $handle->on_drain(sub { $handle->destroy(); });
    });
}

sub prepare_handler
{
    my ($fh, $host, $port) = @_;
    DEBUG && warn "Listening on $host:$port\n";
}

sub _accept_handler
{
    my $self = shift;

    return sub {
        my ($sock, $peer_host, $peer_port) = @_;

        DEBUG && warn "Accepted connection from $peer_host:$peer_port\n";
        if (! $sock) {
            warn '$sock undefined' . "\n";
            return;
        }

        $self->watch_socket($sock);
    };
}

sub start_listen
{
    my ($self, $host, $port) = @_;

    $self->{server} = tcp_server($host,
                                 $port,
                                 $self->_accept_handler(),
                                 \&prepare_handler);
}

package main;

my $host = undef;
my $port = 12345;

my $kernel = Kernel->new();
$kernel->start_listen($host, $port);

AE::cv->recv();

Read Full Post »

This is part 1 in my occasional AnyEvent series

I’ve been playing around with Plack and Twiggy recently and that motivated me to take a look at AnyEvent, the eventing library that Twiggy is built upon.

Now, it seems to me that AnyEvent is useful in a similar problem-space to POE (or Coro, on Unix at least). POE has more documentation but AnyEvent can actually use a couple of Event Loops that were implemented in C: EV and libevent which is the backbone of the hugely successful memcached. Sounds good to me.

As usual, first things first. How do you make a simple TCP server? I took a look at how Twiggy does it – the code is available in Twiggy::Server. I won’t need everything in there of course.

The Preamble

I like the way that Twiggy sets a constant called DEBUG from an environment variable. Now I can call the script like this: $ SERVER_DEBUG=1 ./ae.pl and get debugging output.

#!/usr/bin/perl

use 5.010;

use strict;
use warnings;

package Server;

use constant CTRL_D => 4;
use constant DEBUG => $ENV{SERVER_DEBUG};

use IO::Handle;
use Socket qw(IPPROTO_TCP TCP_NODELAY);
use Errno qw(EAGAIN EINTR);

use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Util qw(WSAEWOULDBLOCK);

Basic Perl Objects

The watch variables only watch while they are in scope. If we have a simple object, we can dump them into the underlying blessed hash reference to keep ‘em in scope. I’m lamenting the non-standardness of Moose here, but that’s another post.

sub new
{
    my $class = shift;
    my $self = {};
    bless $self, $class;
    return $self;
}

Bottom-up or Top-down I’m never quite sure how to present my code. Hmmm…

AnyEvent::tcp_server

So, we’re assuming here that a server object will be called (with my $server Server->new()=) and then we will call $server->start_listen(...).

The example tcp_server call in the AnyEvent::Socket documentation rather unhelpfully demonstrates closing the socket the moment it has connected with a The internet is full, $host:$port. Go away! message. Oh well, maybe my examples are equally flawed. Thank goodness for Twiggy.

sub start_listen
{
    my ($self, $host, $port) = @_;

    $self->{server} = tcp_server($host,
                                 $port,
                                 $self->_accept_handler(),
                                 \&prepare_handler);
}

prepare_handler just logs a basic message on start-up. The accept handler returns a closure to maintain access to $self. The closure sets the socket options and then creates a watcher using watch_socket.

sub prepare_handler
{
    my ($fh, $host, $port) = @_;
    DEBUG && warn "Listening on $host:$port\n";
}

sub _accept_handler
{
    my $self = shift;

    return sub {
        my ($sock, $peer_host, $peer_port) = @_;

        DEBUG && warn "$sock Accepted connection from $peer_host:$peer_port\n";
        return unless $sock;

        setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, 1)
            or die "setsockopt(TCP_NODELAY) failed: $!";
        $sock->autoflush(1);

        # my $socket = IO::Socket::INET->new_from_fd($sock, 'r+');
        # $socket->autoflush(1);
        # $socket->blocking(0);

        $self->watch_socket($sock);
    };
}

AnyEvent IO watcher

The watcher is setup to echo whatever it received, back to the sender. If it receives EOF (sent when a telnet client hits CTRL-D), then it terminates the connection.

Now, I didn’t manage to get this working immediately. If the watcher goes out of scope, it doesn’t end up watching anything. And I originally omitted the undef $headers_io_watcher statements. As the closure wasn’t referring to the watcher variable, it went out of scope immediately. Adding them added a reference which stopped that happening. Nice, if a little subtle.

sub watch_socket
{
    my ($self, $sock) = @_;

    my $headers_io_watcher;

    $headers_io_watcher = AE::io $sock, 0, sub {
        while (defined(my $line = <$sock>)) {
            $line =~ s/\r?\n$//;
            say "Received: [$line] " . length($line) . ' ' . ord($line);

            if (length($line) == 1 and ord($line) == CTRL_D) {
                print $sock "Received EOF.  Closing connection...\r\n";
                undef $headers_io_watcher;
            } else {
                print $sock "You sent [$line]\r\n";
            }
        }

        if ($! and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK ) {
            undef $headers_io_watcher;
            die $!;
        } elsif (!$!) {
            undef $headers_io_watcher;
            die "client disconnected";
        }
    };
}

main()

package main;

my $host = undef;
my $port = 12345;

my $server = Server->new();
$server->start_listen($host, $port);

AE::cv->recv();

And here is the result.

$ telnet localhost 12345
Trying ::1...
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
You sent [hello]
Received EOF.  Closing connection...
Connection closed by foreign host.

Read Full Post »

A little while ago we were talking about clients and servers. And I’ve finally got back to the interesting thing which is playing with emacs networking.

The comment from Gabriel nicely pointed out emacs’ low-level socket facilities.

(open-network-stream) which is synchronous and
(make-network-process) which can be asynchronous if the parameter is set. I haven’t been able to make the last one work, but check it out yourself.

So, er, I’m not 100% sure how to simulate a crappy internet connection. Maybe making my server pause 2 seconds between line reads will be sufficient.

sub process_request
{
    while () {
        s/\r?\n$//;
        print STDERR "Received [$_]\n";
        last if /quit/i;
        sleep 2;
    }
}

I’m also interested (or perhaps curious?) in when the connection is made and broken.

sub post_accept_hook
{
    print STDERR "Accepted new connection\n";
}

sub post_process_request_hook
{
    print STDERR "Client disconnected\n";
}

Did I mention I think Perl’s bad press is a little unfair?

So what I need now is a basic API for connecting, disconnecting and sending messages. make-network-process makes a process as you can probably infer from the name and for my purposes a single client is enough so I make a wrapper to get the current process. Layering in this way means I can add any error handling at the appropriate point later on. And also it should be fairly easy to generalise client-process to choose between different connections.

(defconst client-process-name "*client*")
(defsubst client-process () (get-process client-process-name))

For maximum asynchronicity we add a sentinel function that will be called back when the connection has completed, either successfully or unsuccessfully.

It turns out that a perl-style chomp function is really handy!

(defun chomp (str)
  (if (and (stringp str) (string-match "\r?\n$" str))
      (replace-match "" t nil str)
    str))

(defun client-notify-connect (&rest args)
  (message (format "Connection message [%s]" (mapcar #'chomp args))))

I added a fairly hacky (sit-for 1) after opening the client as otherwise I could send messages before it was ready due to the sentinel and nowait. Future versions will probably get the sentinel to set an "opened" variable which will determine whether the send message and close functions work.

(defun client-open (host port)
  (make-network-process :name client-process-name
                        :host host
                        :service port
                        :nowait t
                        :sentinel #'client-notify-connect)
  (sit-for 1))

Closing the connection is a simple matter of deleting the process. I confirmed this against my test perl server.

(defun client-close ()
  (delete-process (client-process)))
(defun client-send-string (str)
  (process-send-string (client-process) (concat str "\r\n")))

My test program simply sends 10 hello messages to the server at the same time as inserting 10 hellos in the buffer. The fact that the buffer insertions completed almost instantaneously whereas the server took around 20 seconds to display the messages indicates there is some degree of separation.

(progn
  (client-open 'local 8080)
  (dotimes (var 10)
    (client-send-string (format "Hello %s" var))
    (insert (format "Hello %s\n" var)))
  (client-close))

Next time I’ll look at setting up a filter to receive messages that the server sends back. Any questions or comments let me know.

If you liked this post, why not subscribe to my RSS feed.

Read Full Post »

So much for good intentions. I get myself all geared up to
the problem I need to solve today? Well, I want to find out about how emacs networking works.

So first, I need a simple server to play with. Now normally when I think server I automatically think Apache but this time I want something a bit more basic. And if I was thinking enterprise1 I might reach for C++/ACE. However, for something basic, Perl is ideal.

I’ve just upgraded to Ubuntu 9.04 on this box and Perl is unused so let’s see if it has what I need.

06.52 Ubuntu finishes booting

I waste a few minutes on the internet.

06.57 I start Emacs

and remind myself just how gorgeous emacs-23 looks.

06.59 I check for the Net::Server package
$ perldoc Net::Server
You need to install the perl-doc package to use this program.
$ sudo apt-get install perl-doc

$ perldoc Net::Server
No documentation found for "Net::Server".

It is not installed. I could install it using apt (it is called libnet-server-perl) but I’ve got in the habit of using Perl’s CPAN module which provides package management facilities too. The advantage is that it is somewhat consistent across platforms.

$ sudo perl -MCPAN -e shell

cpan[1]> install YAML
cpan[2]> install CPAN
cpan[3]> reload CPAN
cpan[4]> install Bundle::CPAN

The CPAN bundle installs quite a bit so I go for breakfast.

07.15 Back from breakfast
cpan[5]> install Net::Server

I took this code pretty much straight from perldoc Net::Server.

#!/usr/bin/perl

use strict;
use warnings;

package SimpleServer;

use base qw(Net::Server);

sub process_request
{
    while (<STDIN>) {
        s/\r?\n$//;
        print STDERR "Received [$_]\n";
        last if /quit/i;
    }
}

SimpleServer->run(port => 8080);

I tested it using telnet localhost 8080 to confirm it does what I need.

07.20 All done

Presumably Python and Ruby have similar incantations that will get a server up and running quickly.

And unfortunately at this point I have to go to work. But later on, I can begin experimenting with emacs networking.


1. i.e. some thing that grows humongous over a period of two years and then no-one wants to work with it anymore.

Read Full Post »

Follow

Get every new post delivered to your Inbox.