Transform callbacks into a stream

2019-08-03 00:27发布

问题:

In Perl, how can one transform a function that requires callbacks to a new function that returns a stream of results?

Image I have a fixed function I can't change:

sub my_fixed_handler {
    my $callback = shift;

    my $count = 1;
    while(1) {
       $callback->($count++);
    }
}

To print all the a count of numbers I could easily write this code:

my_fixed_handler( sub {
    my $num = shift;
    print "...$num\n";
});

But now I need another function based on on the my_fixed_handler that will return only the result of one calculation step:

my $stream = my_wrapper( my_fixer_hander( ... ) ) ;
$stream->next;  # 1
$stream->next;  # 2

Is this possible?

回答1:

Use the fact that the pipe blocks when full: Run the fixed_handler in a forked process, with the callback writing back to the parent via a pipe. While the parent is processing after a read the pipe is blocked if full and the writer is waiting. To facilitate this write an extra empty string to fill the pipe.

use warnings;
use strict;
use feature 'say';

sub fixed_handler {
    my $callback = shift;
    #state $count = 1; # would solve the problem
    my $count = 1;
    for (1..4) { $callback->($count++) }   
}

pipe my $reader, my $writer  or die "Can't open pipe: $!";
$writer->autoflush(1);
$reader->autoflush(1);

my $fill_buff = ' ' x 100_000;  # (64_656 - 3); # see text

my $iter = sub { 
    my $data = shift; 
    say "\twrite on pipe ... ($data)";
    say $writer $data;
    say $writer $fill_buff;     # (over)fill the buffer
};

my $pid = fork // die "Can't fork: $!";  #/

if ($pid == 0) {
    close $reader;
    fixed_handler($iter);
    close $writer;
    exit;
}

close $writer;
say "Parent: started kid $pid";

while (my $recd = <$reader>) {
    next if $recd !~ /\S/;      # throw out the filler
    chomp $recd;
    say "got: $recd";
    sleep 1;
}

my $gone = waitpid $pid, 0;
if    ($gone > 0) { say "Child $gone exited with: $?" }
elsif ($gone < 0) { say "No such process: $gone" }

Output

Parent: started kid 13555
        write on pipe ... (1)
got: 1
        write on pipe ... (2)
got: 2
        write on pipe ... (3)
got: 3
        write on pipe ... (4)
got: 4
Child 13555 exited with: 0

At first the writer would keep printing until it fills the buffer. Then, as the reader gets one line, the writer puts another (or two, if prints' length vary), etc. If this is OK, remove say $writer $fill_buff;. Then in the output we see all write on pipe lines first, then parent's prints go. A common buffer size nowadays is 64K.

However, we are told that each step of file_handler takes time and so we'd wait for thousands of such steps before starting the processing in the parent (depending on the size of each write), until the buffer fills and the writer starts getting blocked at each read.

One way out of this is to write an extra string, long enough to fill the buffer, and dismiss it in the reader. I found it finicky to get the exact length for this though. For one, the buffer found in the program by

my $cnt; while (1) { ++$cnt; print $writer ' '; print "\r$cnt" } # reader sleeps

differs from the one found on the command line in a similar way. Even with this I still (sometimes) get "double writes." While that may not be a show stopper, I went with 100K to make sure to fill it.

See this post for discussion of buffer sizes, for example.

Another way may be to set the pipe's buffer size using IO::Handle::setvbuf. However, I ran into "Not implemented on this architecture" (on production machines) and so I wouldn't consider that.

Messing with buffering will of course slow down the communication a lot.

This implements the idea from melpomene's comments.


With "buffer" I refer to the pipe's buffer (the amount of data written before the pipe blocks, if no data is read on the other side). That are other buffers that get involved but are not as relevant here.