For the producer demonstrated last time, it is easy to make a consumer using AnyEvent. Not only that, I can borrow most of the code from my list_processes script.
The utility functions are the same as in the unix pipe producer/consumer.
sub consumer { my ($host, $port, $cb) = @_; my $cv = AE::cv(); my $handle; $handle = AnyEvent::Handle->new( connect => [$host => $port], on_error => sub { print("Connection error: $!\n"); $handle->destroy(); }, on_eof => sub { print "Connection closed\n"; $handle->destroy(); $cv->send(); } ); # We need to consume the first line which contains the PID message $handle->push_read(line => sub {}); $handle->on_read(sub { my $handle = shift; my $data = $handle->rbuf(); $handle->rbuf() = ''; $data =~ tr/\r//d; foreach my $line (split /\n/, $data) { $cb->($line); } }); return $cv; }
The callback function checks the subject is correct and then calls process_file(...). It should be fairly easy to see how to extend this for much more complex producers and consumers.
sub process_file { my $file = shift; my_log "Processing file [$file]"; # File processing logic here ... } sub handle_line { my $line = shift; my ($subject, $action) = split /\s+/, $line; if ($subject =~ m{^/producer/file-creator/new-file}) { process_file($action); } else { my_log "FROM PRODUCER [$line]"; } } my $cv = consumer('localhost', 12345, \&handle_line); $cv->recv();
Hi Jared, many thanks for these articles. I am planning long time to look at AnyEvent, and your articles are pretty good use-cases.
Hi Roman,
You’re very welcome and thanks for the nice comment.