In Perl, how can one transform a function that requires callbacks to a new function that returns a stream of results?
Image I have a fixed function I can't change:
sub my_fixed_handler {
my $callback = shift;
my $count = 1;
while(1) {
$callback->($count++);
}
}
To print all the a count of numbers I could easily write this code:
my_fixed_handler( sub {
my $num = shift;
print "...$num\n";
});
But now I need another function based on on the my_fixed_handler
that will return only the result of one calculation step:
my $stream = my_wrapper( my_fixer_hander( ... ) ) ;
$stream->next; # 1
$stream->next; # 2
Is this possible?
Use the fact that the pipe blocks when full: Run the fixed_handler
in a forked process, with the callback writing back to the parent via a pipe. While the parent is processing after a read the pipe is blocked if full and the writer is waiting. To facilitate this write an extra empty string to fill the pipe.
use warnings;
use strict;
use feature 'say';
sub fixed_handler {
my $callback = shift;
#state $count = 1; # would solve the problem
my $count = 1;
for (1..4) { $callback->($count++) }
}
pipe my $reader, my $writer or die "Can't open pipe: $!";
$writer->autoflush(1);
$reader->autoflush(1);
my $fill_buff = ' ' x 100_000; # (64_656 - 3); # see text
my $iter = sub {
my $data = shift;
say "\twrite on pipe ... ($data)";
say $writer $data;
say $writer $fill_buff; # (over)fill the buffer
};
my $pid = fork // die "Can't fork: $!"; #/
if ($pid == 0) {
close $reader;
fixed_handler($iter);
close $writer;
exit;
}
close $writer;
say "Parent: started kid $pid";
while (my $recd = <$reader>) {
next if $recd !~ /\S/; # throw out the filler
chomp $recd;
say "got: $recd";
sleep 1;
}
my $gone = waitpid $pid, 0;
if ($gone > 0) { say "Child $gone exited with: $?" }
elsif ($gone < 0) { say "No such process: $gone" }
Output
Parent: started kid 13555
write on pipe ... (1)
got: 1
write on pipe ... (2)
got: 2
write on pipe ... (3)
got: 3
write on pipe ... (4)
got: 4
Child 13555 exited with: 0
At first the writer would keep printing until it fills the buffer.† Then, as the reader gets one line, the writer puts another (or two, if prints' length vary), etc. If this is OK, remove say $writer $fill_buff;
. Then in the output we see all write on pipe
lines first, then parent's prints go. A common buffer size nowadays is 64K.
However, we are told that each step of file_handler
takes time and so we'd wait for thousands of such steps before starting the processing in the parent (depending on the size of each write), until the buffer fills and the writer starts getting blocked at each read.
One way out of this is to write an extra string, long enough to fill the buffer, and dismiss it in the reader. I found it finicky to get the exact length for this though. For one, the buffer found in the program by
my $cnt; while (1) { ++$cnt; print $writer ' '; print "\r$cnt" } # reader sleeps
differs from the one found on the command line in a similar way. Even with this I still (sometimes) get "double writes." While that may not be a show stopper, I went with 100K
to make sure to fill it.
See this post for discussion of buffer sizes, for example.
Another way may be to set the pipe's buffer size using IO::Handle::setvbuf
. However, I ran into "Not implemented on this architecture" (on production machines) and so I wouldn't consider that.
Messing with buffering will of course slow down the communication a lot.
This implements the idea from melpomene's comments.
† With "buffer" I refer to the pipe's buffer (the amount of data written before the pipe blocks, if no data is read on the other side). That are other buffers that get involved but are not as relevant here.