Perl too slow concurrent download with both HTTP::

2020-03-02 04:30发布

I'm trying to GET about 7 dozens of urls in parallel with scripts: the first is below, with HTTP::Async, and the second one is on pastebin, with Net::Async::HTTP. The problem is that I'm getting pretty same timing results - about 8..14 seconds for all urls list. It's inacceptable slow compared to curl+xargs started from shell, which gets all in less than 3 seconds with 10-20 "threads". For example, Devel::Timer in first script shows that max queue length is even less than 6 ($queue->in_progress_count<=5, $queue->to_send_count=0 allways). So, it's looks like foreach with $queue->add is executing too slow, and I don't know why. Pretty same situation I got with Net::Async::HTTP (second script on pastebin), which is even slower than the first.

So, please, does anybody know, what I'm doing wrong? How can I get concurrent download speed at least compared to curl+xargs started from shell?

#!/usr/bin/perl -w
use utf8;
use strict;
use POSIX qw(ceil);
use XML::Simple;
use Data::Dumper;
use HTTP::Request;
use HTTP::Async;
use Time::HiRes qw(usleep time);
use Devel::Timer;

#settings
use constant passwd => 'ultramegahypapassword';
use constant agent => 'supa agent dev.alpha';
use constant timeout => 10;
use constant slots => 10;
use constant debug => 1;

my @qids;
my @xmlz;
my $queue = HTTP::Async->new(slots => slots,max_request_time => 10, timeout => timeout, poll_interval => 0.0001);
my %responses;
my @urlz = (
'http://testpodarki.afghanet/api/products/4577',
'http://testpodarki.afghanet/api/products/4653',
'http://testpodarki.afghanet/api/products/4652',
'http://testpodarki.afghanet/api/products/4571',
'http://testpodarki.afghanet/api/products/4572',
'http://testpodarki.afghanet/api/products/4666',
'http://testpodarki.afghanet/api/products/4576',
'http://testpodarki.afghanet/api/products/4574',
'http://testpodarki.afghanet/api/products/4651',
'http://testpodarki.afghanet/api/stock_availables/?display=full&filter[id_product]=[3294]',
'http://testpodarki.afghanet/api/specific_prices/?display=full&filter[id_product]=[3294]',
'http://testpodarki.afghanet/api/combinations/?display=full&filter[id_product]=[4577]',
'http://testpodarki.afghanet/api/stock_availables/?display=full&filter[id_product]=[4577]',
'http://testpodarki.afghanet/api/specific_prices/?display=full&filter[id_product]=[4577]',
'http://testpodarki.afghanet/api/product_option_values/188',
'http://testpodarki.afghanet/api/product_option_values/191',
'http://testpodarki.afghanet/api/product_option_values/187',
'http://testpodarki.afghanet/api/product_option_values/190',
'http://testpodarki.afghanet/api/product_option_values/189',
'http://testpodarki.afghanet/api/stock_availables/?display=full&filter[id_product]=[4653]',
'http://testpodarki.afghanet/api/specific_prices/?display=full&filter[id_product]=[4653]',
'http://testpodarki.afghanet/api/images/products/4577/12176',
'http://testpodarki.afghanet/api/stock_availables/?display=full&filter[id_product]=[4652]',
'http://testpodarki.afghanet/api/specific_prices/?display=full&filter[id_product]=[4652]',
'http://testpodarki.afghanet/api/images/products/4653/12390',
'http://testpodarki.afghanet/api/combinations/?display=full&filter[id_product]=[4571]',
'http://testpodarki.afghanet/api/stock_availables/?display=full&filter[id_product]=[4571]',
'http://testpodarki.afghanet/api/specific_prices/?display=full&filter[id_product]=[4571]',
'http://testpodarki.afghanet/api/images/products/4652/12388',
'http://testpodarki.afghanet/api/product_option_values/175',
'http://testpodarki.afghanet/api/product_option_values/178',
'http://testpodarki.afghanet/api/product_option_values/179',
'http://testpodarki.afghanet/api/product_option_values/180',
'http://testpodarki.afghanet/api/product_option_values/181',
'http://testpodarki.afghanet/api/images/products/3294/8965',
'http://testpodarki.afghanet/api/product_option_values/176',
'http://testpodarki.afghanet/api/product_option_values/177',
'http://testpodarki.afghanet/api/combinations/?display=full&filter[id_product]=[4572]',
'http://testpodarki.afghanet/api/stock_availables/?display=full&filter[id_product]=[4572]',
'http://testpodarki.afghanet/api/specific_prices/?display=full&filter[id_product]=[4572]',
'http://testpodarki.afghanet/api/product_option_values/176',
'http://testpodarki.afghanet/api/product_option_values/181',
'http://testpodarki.afghanet/api/product_option_values/180',
'http://testpodarki.afghanet/api/images/products/4571/12159',
'http://testpodarki.afghanet/api/product_option_values/177',
'http://testpodarki.afghanet/api/product_option_values/179',
'http://testpodarki.afghanet/api/product_option_values/175',
'http://testpodarki.afghanet/api/product_option_values/178',
'http://testpodarki.afghanet/api/stock_availables/?display=full&filter[id_product]=[4666]',
'http://testpodarki.afghanet/api/combinations/?display=full&filter[id_product]=[4576]',
'http://testpodarki.afghanet/api/specific_prices/?display=full&filter[id_product]=[4666]',
'http://testpodarki.afghanet/api/stock_availables/?display=full&filter[id_product]=[4576]',
'http://testpodarki.afghanet/api/specific_prices/?display=full&filter[id_product]=[4576]',
'http://testpodarki.afghanet/api/images/products/4572/12168',
'http://testpodarki.afghanet/api/product_option_values/185',
'http://testpodarki.afghanet/api/product_option_values/182',
'http://testpodarki.afghanet/api/product_option_values/184',
'http://testpodarki.afghanet/api/product_option_values/183',
'http://testpodarki.afghanet/api/product_option_values/186',
'http://testpodarki.afghanet/api/images/products/4666/12413',
'http://testpodarki.afghanet/api/combinations/?display=full&filter[id_product]=[4574]',
'http://testpodarki.afghanet/api/stock_availables/?display=full&filter[id_product]=[4574]',
'http://testpodarki.afghanet/api/specific_prices/?display=full&filter[id_product]=[4574]',
'http://testpodarki.afghanet/api/product_option_values/177',
'http://testpodarki.afghanet/api/product_option_values/181',
'http://testpodarki.afghanet/api/images/products/4576/12174',
'http://testpodarki.afghanet/api/product_option_values/176',
'http://testpodarki.afghanet/api/product_option_values/180',
'http://testpodarki.afghanet/api/product_option_values/179',
'http://testpodarki.afghanet/api/product_option_values/175',
'http://testpodarki.afghanet/api/product_option_values/178',
'http://testpodarki.afghanet/api/specific_prices/?display=full&filter[id_product]=[4651]',
'http://testpodarki.afghanet/api/images/products/4574/12171',
'http://testpodarki.afghanet/api/stock_availables/?display=full&filter[id_product]=[4651]',
'http://testpodarki.afghanet/api/images/products/4651/12387'
);

my $timer = Devel::Timer->new();


foreach my $el (@urlz) {
    my $request = HTTP::Request->new(GET => $el);
    $request->header(User_Agent => agent);
    $request->authorization_basic(passwd,''); 
    push @qids,$queue->add($request);
    $timer->mark("pushed [$el], to_send=".$queue->to_send_count().", to_return=".$queue->to_return_count().", in_progress=".$queue->in_progress_count());
}

$timer->mark('requestz pushed');

while ($queue->in_progress_count) {
    usleep(2000);
    $queue->poke();
}

$timer->mark('requestz complited');

process_responses();


$timer->mark('responzez processed');

foreach my $q (@xmlz) {
#    print ">>>>>>".Dumper($q)."<<<<<<<<\n";
}

$timer->report();
print "\n\n";

4条回答
相关推荐>>
2楼-- · 2020-03-02 04:38

My best results with HTTP::Async are well over 4 and up to over 5 seconds. As I understand this approach isn't required, and here is a simple forking example that takes a little over 2 and at most below 3 seconds.

It uses Parallel::ForkManager and LWP::UserAgent for downloads.

use warnings;
use strict;
use Path::Tiny;    
use LWP::UserAgent;
use Parallel::ForkManager;

my @urls = @{ get_urls('https://pastebin.com/raw/VyhMEB3w') };

my $pm = new Parallel::ForkManager(60);  # max of 60 processes at a time
my $ua = LWP::UserAgent->new; 
print "Downloading ", scalar @urls, " files.\n";

my $dir = 'downloaded_files/';
mkdir $dir if not -d $dir;
my $cnt = 0;   
foreach my $link (@urls) 
{
    my $file = "$dir/file_" . ++$cnt . '.txt';

    $pm->start and next;                        # child process

    # add code needed for actual pages (authorization etc)            
    my $response = $ua->get($link);        
    if ($response->is_success) {
        path($file)->spew_utf8($response->decoded_content);
    }
    else { warn $response->status_line }

    $pm->finish;                                # child exit
}
$pm->wait_all_children;

sub get_urls {
    my $resp = LWP::UserAgent->new->get($_[0]);
    return [ grep /^http:/, split /\s*'?,?\s*\n\s*'?/, $resp->decoded_content ];
};

The files are written using Path::Tiny. Its path builds an object and spew routines write the file.

For reference, the sequential downloads take around 26 seconds.

With the maximum number of processes set to 30 this takes over 4 seconds, and with 60 it is a little over 2 seconds, about the same as with (up to) 90. There are 70 urls in this test.

Tested at a 4-core laptop with a decent network connection. (Here the CPU isn't all that important.) The tests were run repeatedly, at multiple times and on multiple days.


A comparison with the approach from the question

The best HTTP::Async results are slower than the above by around a factor of two. They are with 30-40 "slots" since for higher numbers the time goes up, what puzzles (me). The module uses select to multiplex, via Net::HTTP::NB (a non-blocking version of Net::HTTP). While select "does not scale well" this regards hundreds of sockets and I'd expect to be able to use more than 40 on this network bound problem. The simple forked approach does.

Also, select is considered to be a slow method to monitor sockets while forks don't even need that, as each process has its own url. (This may result in module's overhead when there are many connections?) Fork's inherent overhead is fixed and dwarfed by network access. If we were after (many) hundreds of downloads the system may get strained by processes, but select wouldn't fare well either.

Finally, select based methods download strictly one file at a time, and the effect is seen by printing as requests are added -- we can see the delay. The forked downloads go in parallel (in this case all 70 at the same time without a problem). Then there'll be a network or disk bottleneck but that is tiny in comparison to the gain.

Update: I pushed this to double the number of sites and processes, saw no signs of OS/CPU strain, and retained the average speed.

So I'd say, if you need to shave off every second use forks. But if this is not critical and there are other benefits of HTTP::Async (or such) then be content with (just a bit) longer downloads.


The HTTP::Async code that performs well ended up being simply

foreach my $link ( @urls ) {  
    $async->add( HTTP::Request->new(GET => $link) );
}    
while ( my $response = $async->wait_for_next_response ) { 
    # write file (or process otherwise)
}

I have also tried to tweak headers and timings. (This included dropping keep-alive as suggested, by $request->header(Connection => 'close'), to no effect.)

查看更多
聊天终结者
3楼-- · 2020-03-02 04:41

For explaining my comment. I was curious, because never used the Net::Async::HTTP before, wanted to try your script locally. So, created this minimalist Plack app.psgi:

use 5.014;
use warnings;

use Plack::Builder;
use Plack::Request;
use Plack::Response;
use Time::HiRes qw(usleep);

my $app = sub {
    my $env = shift;
    my $req = Plack::Request->new($env);

    my($ms,$id) = $req->path_info =~ /(\d+)/g;
    $ms //= 0;
    $id //= "NOID";

    #fake some processing slowness
    usleep($ms);

    my $res = Plack::Response->new(200);
    $res->content_type('text/plain');
    $res->body( "req# $id served by PID $$ fakewait: $ms\n");
    return $res->finalize;
};

builder {
#    enable "Auth::Basic", authenticator => \&auth;
    $app;
};

#sub auth { return $_[0] eq 'me' && $_[1] eq 'me' }

The server understand URL's in a form GET /sleep_time/reqID, where

  • the sleep time is in microseconds for the usleep - and the server sleep the given time before responds. E.g. it fakes some "processing time".
  • id - any number for req-identifying...

E.g. requesting GET /1000000/1, the server will sleep 1second before responds. In response is included the PID of the responding process.

In one terminal window run the above using Starman preforkimg server with default 20 workers.

plackup -s Starman

And in the another window the results using xargs:

time seq 20 | xargs -n1 -P10 -I% curl http://localhost:5000/1000000/%

so, sending 20 requests, where each response tooks 1s processing time.

req# 1 served by PID 28163 fakewait: 1000000
req# 2 served by PID 28161 fakewait: 1000000
req# 3 served by PID 28162 fakewait: 1000000
req# 4 served by PID 28160 fakewait: 1000000
req# 5 served by PID 28159 fakewait: 1000000
...
real    0m4,032s
user    0m0,092s
sys     0m0,074s

So, 20 requests = 4 seconds. It is visible, that the reponding PID's are different - e.g the repsonse is sent by different worker.

Now using your script async.pl (slightly shortened/modified):

#!/usr/bin/perl
use 5.014;
use warnings;

use HTTP::Request;
use IO::Async::Loop;
use Net::Async::HTTP;
use Future::Utils qw(fmap_void);

my $sleep = $ARGV[0] // 0;
my $numreq = $ARGV[1] // 20;

my $loop = IO::Async::Loop->new();

my $http = Net::Async::HTTP->new( timeout => 10, max_connections_per_host => 0, pipeline => 0, ip_tos => 0x10 );
$loop->add( $http );

my $future = fmap_void {
    (my  $url ) = @_;
    my $request = HTTP::Request->new(GET => $url);
    #$request->authorization_basic('me','me');
    $http->do_request( request => $request )
        ->on_done( sub {
            my $response = shift;
            my($body) = $response->content =~ s/\n.*//sr;
            print "$url [", $response->code, "] --> $body\n";
        } )
        ->on_fail( sub {
            my $failure = shift;
            print "$url failed: $failure\n";
        } );
} foreach => [map { "http://localhost:5000/$sleep/$_" } (1 .. $numreq)];
$loop->await( $future );

command

time perl async.pl 1000000 20

result

http://localhost:5000/1000000/1 [200] --> req# 1 served by PID 28160 fakewait: 1000000
http://localhost:5000/1000000/2 [200] --> req# 2 served by PID 28160 fakewait: 1000000
http://localhost:5000/1000000/3 [200] --> req# 3 served by PID 28160 fakewait: 1000000
http://localhost:5000/1000000/4 [200] --> req# 4 served by PID 28160 fakewait: 1000000
http://localhost:5000/1000000/5 [200] --> req# 5 served by PID 28160 fakewait: 1000000
http://localhost:5000/1000000/6 [200] --> req# 6 served by PID 28160 fakewait: 1000000
...
real    0m20,309s
user    0m0,183s
sys     0m0,053s

Same 20 requests = 20 seconds, and each request is served by the same PID. Like pure sequential processing. :(

This probably because the requests reusing the connection (eg. the keep-alive).

Finally - unfortunately, as i told - i haven't any experience with the module, so haven't idea how to force the module to do-not-reuse the opened connection.

查看更多
【Aperson】
4楼-- · 2020-03-02 04:43

Async will be slower than parallel downloading: the async code will yield to other calls only while waiting for a response, but downloading happens sequentially in a single process, while the curl+xargs will work 100% (well, almost 100%, and as long as you don't saturate the cores) in parallel, same as when using forked workers.

please, google for "concurrency is not parallelism"

查看更多
▲ chillily
5楼-- · 2020-03-02 04:56

So, finally got working sample (full script). It uses Furl and fork_call from AnyEvent::Util. This example returnes in ~3 secs, what is good enough. If you need to use basic HTTP auth, just use URI with creds like this: username:password@hostdomain.org/path?param1=val1&param2=val2. You better add use EV; before using AnyEvent because EV is the fastest.

#!/usr/bin/perl
use strict;
use warnings;
use Data::Dumper;
use Devel::Timer;
use Furl;
use EV;
use AnyEvent;
use AnyEvent::Util 'fork_call';

#get full script
my @urls = (...);


sub fetch {
    my $url = shift;
    my $furl = Furl::HTTP->new(agent => 'Furl/0.31',timeout => 3);
    print "start $url\n";
    my ($ver, $code, $msg, $headers, $body) = $furl->get($url);
    my $size = length $body;
    print "finished $url, $size bytes\n";
    return ($code, $msg, $headers, $body);
}

my %resps;

my $timer = Devel::Timer->new();
$timer->mark('foreach');
$AnyEvent::Util::MAX_FORKS = 20;
my $cv = AE::cv;
foreach my $url (@urls) {
    $timer->mark('next foreach');
    $cv->begin;
    fork_call {
        print "getting $url... ";
        my ($code, $msg, $headers, $body)=fetch($url);
        print "[$code]\n";
        return ($url, $code, $msg, $headers, $body);
        }
        sub {
            print "adding 2 %resps\n";
            my ($url, $code, $msg, $headers, $body)=@_;
            $resps{$url}->{'code'}=$code;
            $resps{$url}->{'msg'}=$msg;
            $resps{$url}->{'headers'}=$headers;
            $resps{$url}->{'body'}=$body;
            $cv->end;
        };
}
$cv->recv;
$timer->mark('end');

print "\nall data is ready, press <ENTER>:";
<STDIN>;
print Dumper(%resps);
print "\n<PRESS ENTER>to print timer report\n";
<STDIN>;
$timer->report();
sleep(3);
查看更多
登录 后发表回答