In my recent Connecting Software Systems post, I included a teaser:
If you need to connect a single producer to a single consumer, you can easily connect them using a unix pipe without changing much of the underlying code. The trick is to write to STDOUT
whenever work is available for the consumer. The consumer reads work requests from STDIN
.
Script Preamble
The preamble will be mostly common to both the producer and the consumer. I set STDOUT to autoflush to avoid any slowness due to buffering.
use 5.010; use strict; use warnings; use File::Path 'make_path'; use IO::Handle; use POSIX 'strftime'; use File::Slurp; STDOUT->autoflush(1); sub hms { return strftime('%H:%M:%S', localtime(time())); } sub my_log { print '[ ', hms(), ' ] : ', @_, "\n"; }
The Producer
The work files are created in /var/tmp/pro-co
. Hopefully create_file
would be doing something a bit more useful in a real application!
sub create_file { my $top = shift; my $filename = $top . '/' . int(rand(1_000_000)) . '.txt'; my $content = rand(); # or something more useful... # simulate taking some time for processing sleep rand(5); write_file($filename, $content); return $filename; } # -- my $top = '/var/tmp/pro-co'; make_path($top); for (1..5) { my_log "Iteration $_"; my $filename = create_file($top); my_log "PRODUCED $filename"; }
The Consumer
The consumer removes the timestamp from the producer output. Then, any line that indicates a unit of work (marked in the example by PRODUCED
) is passed to process_file()
.
sub process_file { my $file = shift; my_log "Processing file [$file]"; # File processing logic here ... } # -- while (defined(my $line = <STDIN>)) { # Strip the timestamp $line =~ s{\[\s[0-9:]+\s\]\s:\s}{}; chomp $line; if ($line =~ /^PRODUCED\s+(.+)/) { process_file($1); } else { my_log "FROM PRODUCER [$line]"; } }
The Example Run
As you can see from the output below, the consumer was able to process the work as it became available.
$ ./producer.pl | tee producer.log | ./consumer.pl [ 09:22:20 ] : FROM PRODUCER [Iteration 1] [ 09:22:20 ] : Processing file [/var/tmp/pro-co/944319.txt] [ 09:22:20 ] : FROM PRODUCER [Iteration 2] [ 09:22:23 ] : Processing file [/var/tmp/pro-co/141765.txt] [ 09:22:23 ] : FROM PRODUCER [Iteration 3] [ 09:22:27 ] : Processing file [/var/tmp/pro-co/463599.txt] [ 09:22:27 ] : FROM PRODUCER [Iteration 4] [ 09:22:28 ] : Processing file [/var/tmp/pro-co/423055.txt] [ 09:22:28 ] : FROM PRODUCER [Iteration 5] [ 09:22:28 ] : Processing file [/var/tmp/pro-co/233909.txt]
$ cat producer.log [ 09:22:20 ] : Iteration 1 [ 09:22:20 ] : PRODUCED /var/tmp/pro-co/944319.txt [ 09:22:20 ] : Iteration 2 [ 09:22:23 ] : PRODUCED /var/tmp/pro-co/141765.txt [ 09:22:23 ] : Iteration 3 [ 09:22:27 ] : PRODUCED /var/tmp/pro-co/463599.txt [ 09:22:27 ] : Iteration 4 [ 09:22:28 ] : PRODUCED /var/tmp/pro-co/423055.txt [ 09:22:28 ] : Iteration 5 [ 09:22:28 ] : PRODUCED /var/tmp/pro-co/233909.txt
Leave a Reply