- First Steps with AnyEvent
- Emulating POSIX Signals
- List Processes AKA ps
- Bad AnyEvent Install
I’ve been playing around with Plack and Twiggy recently and that motivated me to take a look at AnyEvent, the eventing library that Twiggy is built upon.
Now, it seems to me that AnyEvent is useful in a similar problem-space to POE (or Coro, on Unix at least). POE has more documentation but AnyEvent can actually use a couple of Event Loops that were implemented in C: EV and libevent which is the backbone of the hugely successful memcached. Sounds good to me.
As usual, first things first. How do you make a simple TCP server? I took a look at how Twiggy does it – the code is available in Twiggy::Server. I won’t need everything in there of course.
The Preamble
I like the way that Twiggy sets a constant called DEBUG from an environment variable. Now I can call the script like this: $ SERVER_DEBUG=1 ./ae.pl and get debugging output.
#!/usr/bin/perl use 5.010; use strict; use warnings; package Server; use constant CTRL_D => 4; use constant DEBUG => $ENV{SERVER_DEBUG}; use IO::Handle; use Socket qw(IPPROTO_TCP TCP_NODELAY); use Errno qw(EAGAIN EINTR); use AnyEvent; use AnyEvent::Socket; use AnyEvent::Util qw(WSAEWOULDBLOCK);
Basic Perl Objects
The watch variables only watch while they are in scope. If we have a simple object, we can dump them into the underlying blessed hash reference to keep ‘em in scope. I’m lamenting the non-standardness of Moose here, but that’s another post.
sub new { my $class = shift; my $self = {}; bless $self, $class; return $self; }
Bottom-up or Top-down I’m never quite sure how to present my code. Hmmm…
AnyEvent::tcp_server
So, we’re assuming here that a server object will be called (with my $server Server->new()=) and then we will call $server->start_listen(...).
The example tcp_server call in the AnyEvent::Socket documentation rather unhelpfully demonstrates closing the socket the moment it has connected with a The internet is full, $host:$port. Go away! message. Oh well, maybe my examples are equally flawed. Thank goodness for Twiggy.
sub start_listen { my ($self, $host, $port) = @_; $self->{server} = tcp_server($host, $port, $self->_accept_handler(), \&prepare_handler); }
prepare_handler just logs a basic message on start-up. The accept handler returns a closure to maintain access to $self. The closure sets the socket options and then creates a watcher using watch_socket.
sub prepare_handler { my ($fh, $host, $port) = @_; DEBUG && warn "Listening on $host:$port\n"; } sub _accept_handler { my $self = shift; return sub { my ($sock, $peer_host, $peer_port) = @_; DEBUG && warn "$sock Accepted connection from $peer_host:$peer_port\n"; return unless $sock; setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, 1) or die "setsockopt(TCP_NODELAY) failed: $!"; $sock->autoflush(1); # my $socket = IO::Socket::INET->new_from_fd($sock, 'r+'); # $socket->autoflush(1); # $socket->blocking(0); $self->watch_socket($sock); }; }
AnyEvent IO watcher
The watcher is setup to echo whatever it received, back to the sender. If it receives EOF (sent when a telnet client hits CTRL-D), then it terminates the connection.
Now, I didn’t manage to get this working immediately. If the watcher goes out of scope, it doesn’t end up watching anything. And I originally omitted the undef $headers_io_watcher statements. As the closure wasn’t referring to the watcher variable, it went out of scope immediately. Adding them added a reference which stopped that happening. Nice, if a little subtle.
sub watch_socket { my ($self, $sock) = @_; my $headers_io_watcher; $headers_io_watcher = AE::io $sock, 0, sub { while (defined(my $line = <$sock>)) { $line =~ s/\r?\n$//; say "Received: [$line] " . length($line) . ' ' . ord($line); if (length($line) == 1 and ord($line) == CTRL_D) { print $sock "Received EOF. Closing connection...\r\n"; undef $headers_io_watcher; } else { print $sock "You sent [$line]\r\n"; } } if ($! and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK ) { undef $headers_io_watcher; die $!; } elsif (!$!) { undef $headers_io_watcher; die "client disconnected"; } }; }
main()
package main; my $host = undef; my $port = 12345; my $server = Server->new(); $server->start_listen($host, $port); AE::cv->recv();
And here is the result.
$ telnet localhost 12345 Trying ::1... Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. hello You sent [hello] Received EOF. Closing connection... Connection closed by foreign host.
Interesting page, thanks for putting it up! I’m looking at AnyEvent and came across your page, and it was helpful in trying some new things out.
There’s a small bug, however, in watch_socket(). The handle is read like so:
while (defined(my $line = <sock>)) {
The bareword ‘sock’, as you have it, is undefined.
Hi Bill,
Thanks for that – should be fixed now.
I’ve amended your comment to what I think you had originally as WordPress would have swallowed <sock> thinking it was a tag. Let me know if it isn’t correct.
It does not accept multiple connections when I run it. It closes the current connection and then accepts the new one when another client connects to it.
any ideas? or any pointers to tutorials on AnyEvent?
Sorry guys, my mistake. It does accept multiple connections. Trick is not to overwirte or outscope $headers_io_watcher (as mentioned in the original post).
What I was trying to do was to save the $headers_io_watcher into $self and add a ‘cleanup’ function which goes and undefines each $headers_io_watcher, thus closing the connection to each client before shutting down. So now I have a hash in $self with all the $headers_io_watcher past and present. when a new connection arrives I add the new $headers_io_watcher into the hash and remove any which are undef – i.e. connection closed. At the end when a cleanup function is called, it goes through this hash and undef’s each value.
Still pointers to AnyEvent tutorials are needed. Thanks for the post, it was most useful for me.