Thread::queue return nothing

2019-07-31 08:14发布

I working on below script in which I split file content in @file_splitted and trying to apply Thread::Queue to speed up the process. But the $result returns nothing at the end. Can you please check what happening?

my $NUM_WORKERS = 5;

my $q = Thread::Queue->new();

sub worker {
   my ($job) = @_;

   print "@_  \n################\n";
   my ($sub_name, @args) = @$job;
   my $sub_ref = \&Subroutine;
   $sub_ref->(@args);
}

{
    my $q = Thread::Queue->new();

    my @workers;

    for (1..$NUM_WORKERS) {
        push @workers, async {
            while (my $job = $q->dequeue()) {
            worker($job);
#           print "$job \n";
            }
        };
    }

    $q->enqueue($_) for @file_splitted;
    $q->end();

    for my $t(@workers){
        (my @getit)= $t->join();
        my $tmp = join '', @getit; 
        $result .= $tmp;
        print "$result\n";
    }

}

1条回答
仙女界的扛把子
2楼-- · 2019-07-31 08:47

This here is your current code, but tidied up a bit, and commented:

my $NUM_WORKERS = 5;

my $q = Thread::Queue->new();

{
    # ok, so here we create a new queue for some reason that shadows the outer $q
    my $q = Thread::Queue->new();

    my @workers;
    for (1 .. $NUM_WORKERS) {
        push @workers, async {
            while (my $job = $q->dequeue()) {
                my ($sub_name, @args) = @$job; # so the $job is an arrayref
                Subroutine(@args);  # what is "Subroutine"?
            }
            # this worker does not explicitly return any values
        };
    }

    # what is @file_splitted? Does it contain arrayrefs?
    $q->enqueue($_) for @file_splitted;
    $q->end();

    for my $t (@workers){
        # where is $result declared? And why are you using a return value when you
        # don't explicitly return anything from your threads?
        $result .= join '', $t->join;
        print "$result\n";
    }

}

The problem is that you aren't actually returning anything useful from your threads – note that I removed the worker subroutine above because it doesn't add anything to this discussion, and probably confused you.

Quite likely, you will want to create another queue from which the threads can return results:

my $job_queue = Thread::Queue->new;
my $result_queue = Thread::Queue->new;
my @workers;
for (1 .. $NUM_WORKERS) {
    push @workers, async {
        while(defined(my $job = $job_queue->dequeue)) {
            my $result = Subroutine($job);  # or something like this
            $result_queue->enqueue($result);
        }
        $result_queue->enqueue(undef);
    };
}

$job_queue->enqueue(@jobs);
$job_queue->end;

my $waiting = $NUM_WORKERS;
my @results;
while ($waiting) {
    if (defined(my $result = $result_queue->dequeue)) {
        # do something with the results
        push @results, $result;
    }
    else {
        $waiting--;
    }
}

$_->join for @workers;

If you only want to collect all results at the end, you could do something like this instead:

my $job_queue = Thread::Queue->new;
my $result_queue = Thread::Queue->new;
my @workers;
for (1 .. $NUM_WORKERS) {
    push @workers, async {
        while(defined(my $job = $job_queue->dequeue)) {
            my $result = Subroutine($job);  # or something like this
            $result_queue->enqueue($result);
        }
    };
}

$job_queue->enqueue(@jobs);
$job_queue->end;

$_->join for @workers;

my @results = $result_queue->dequeue($result_queue->pending);
查看更多
登录 后发表回答