Understanding async in perl on specific example
Asked Answered
M

2

5

I have to write a script that get some URLs in parallel and do some work. In the past I have always used Parallel::ForkManager for such things, but now I wanted to learn something new and try asynchronous programming with AnyEvent (and AnyEvent::HTTP or AnyEvent::Curl::Multi) ... but I'm having problem understanding AnyEvent and writing a script that should:

  • open a file (every line is a seperate URL)
  • (from now in parallel, but with a limit for f.e. 10 concurrent requests)
  • read file line after line (I dont want to load whole file to memory - it might be big)
  • make a HTTP request for that URL
  • read response
  • updates MySQL record accordingly
  • (next file line)

I have read many manuals, tutorials, but its still hard for me to understand differences between blocking and non-blocking code. I have found similar script at http://perlmaven.com/fetching-several-web-pages-in-parallel-using-anyevent, where Mr. Szabo explains the basics, but I still cant understand how to implement something like:

...
open my $fh, "<", $file;
while ( my $line = <$fh> )
{
# http request, read response, update MySQL
}
close $fh
...

... and add a concurrency limit in this case.

I would be very grateful for help ;)

UPDATE

Following Ikegami's advice I gave Net::Curl::Multi a try. I'm very pleased with results. After years of using Parallel::ForkManager just for concurrent grabbing thousands of URLs, Net::Curl::Multi seems to be awesome. Here is my code with while loop on filehandle. It seems to work as it should, but considering it's my first time writing something like this I would like to ask more experienced Perl users to take a look and tell me if there are some potential bugs, something I missed, etc. Also, if I may ask: as I don't fully understand how Net::Curl::Multi's concurrency works, please tell me whether I should expect any problems with putting MySQL UPDATE command (via DBI) inside RESPONSE loop (besides higher server load obviously - I expect final script to run with about 50 concurrent N::C::M workers, maybe more).

#!/usr/bin/perl

use Net::Curl::Easy  qw( :constants );
use Net::Curl::Multi qw( );

sub make_request {
    my ( $url ) = @_;
    my $easy = Net::Curl::Easy->new();
    $easy->{url} = $url;
    $easy->setopt( CURLOPT_URL,        $url );
    $easy->setopt( CURLOPT_HEADERDATA, \$easy->{head} );
    $easy->setopt( CURLOPT_FILE,       \$easy->{body} );
    return $easy;
}

my $maxWorkers = 10;

my $multi = Net::Curl::Multi->new();
my $workers = 0;

my $i = 1;
open my $fh, "<", "urls.txt";
LINE: while ( my $url = <$fh> )
{
    chomp( $url );
    $url .= "?$i";
    print "($i) $url\n";
    my $easy = make_request( $url );
    $multi->add_handle( $easy );
    $workers++;

    my $running = 0;
    do {
        my ($r, $w, $e) = $multi->fdset();
        my $timeout = $multi->timeout();
        select $r, $w, $e, $timeout / 1000
        if $timeout > 0;

        $running = $multi->perform();
        RESPONSE: while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
            $multi->remove_handle( $easy );
            $workers--;
            printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
        }

        # dont max CPU while waiting
        select( undef, undef, undef, 0.01 );
    } while ( $workers == $maxWorkers || ( eof && $running ) );
    $i++;
}
close $fh;
Meritocracy answered 27/4, 2016 at 18:56 Comment(0)
P
5

Net::Curl is a rather good library that's extremely fast. Furthermore, it can handle parallel requests too! I'd recommend using this instead of AnyEvent.

use Net::Curl::Easy  qw( :constants );
use Net::Curl::Multi qw( );

sub make_request {
    my ( $url ) = @_;
    my $easy = Net::Curl::Easy->new();
    $easy->{url} = $url;
    $easy->setopt( CURLOPT_URL,        $url );
    $easy->setopt( CURLOPT_HEADERDATA, \$easy->{head} );
    $easy->setopt( CURLOPT_FILE,       \$easy->{body} );
    return $easy;
}

my $max_running = 10;
my @urls = ( 'http://www.google.com/' );

my $multi = Net::Curl::Multi->new();
my $running = 0;
while (1) {
    while ( @urls && $running < $max_running ) {
       my $easy = make_request( shift( @urls ) );
       $multi->add_handle( $easy );
       ++$running;
    }

    last if !$running;

    my ( $r, $w, $e ) = $multi->fdset();
    my $timeout = $multi->timeout();
    select( $r, $w, $e, $timeout / 1000 )
        if $timeout > 0;

    $running = $multi->perform();
    while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
        $multi->remove_handle( $easy );
        printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
    }
}
Penutian answered 27/4, 2016 at 19:39 Comment(6)
I'm getting a lot of "callback function is not set". It seems that it shows when there's a domain in URL Host. Im not getting this error if I use a IP. Also, if I put f.e. print "got it!"; where the # process $easy is, the page content is automatically printed.Meritocracy
Fixed it so the content is stored in $easy rather than printed. I don't get the callback error you get?? [Try it with the change. It might be related]Penutian
Thanks for help. Unfortunatelly I still get "callback function is not set". Actually 4 times, then Your printf. I dont know where it comes from.Meritocracy
metacpan.org/changes/release/SYP/LWP-Protocol-Net-Curl-0.016 - minor fix: "callback function is not set" warning when libcurl has the AsynchDNS feature - same author.Meritocracy
Recompiling curl without --enable-ares helped and now I dont get ""callback function is not set".Meritocracy
I updated my "question" post with final script. If I may ask, please take a look ;).Meritocracy
O
2

This does exactly what you want, in an asynchronous fashion, and it does that by wrapping Net::Curl in a safe fashion:

#!/usr/bin/env perl

package MyDownloader;
use strict;
use warnings qw(all);

use Moo;

extends 'YADA::Worker';

has '+use_stats'=> (default => sub { 1 });
has '+retry'    => (default => sub { 10 });

after init => sub {
    my ($self) = @_;

    $self->setopt(
        encoding            => '',
        verbose             => 1,
    );
};

after finish => sub {
    my ($self, $result) = @_;

    if ($self->has_error) {
        print "ERROR: $result\n";
    } else {
        # do the interesting stuff here
        printf "Finished downloading %s: %d bytes\n", $self->final_url, length ${$self->data};
    }
};

around has_error => sub {
    my $orig = shift;
    my $self = shift;

    return 1 if $self->$orig(@_);
    return 1 if $self->getinfo('response_code') =~ m{^5[0-9]{2}$}x;
};

1;

package main;
use strict;
use warnings qw(all);

use Carp;

use YADA;

my $q = YADA->new(
    max     => 8,
    timeout => 30,
);

open(my $fh, '<', 'file_with_urls_per_line.txt')
    or croak "can't open queue: $!";
while (my $url = <$fh>) {
    chomp $url;

    $q->append(sub {
        MyDownloader->new($url)
    });
}
close $fh;
$q->wait;
Outwardbound answered 28/4, 2016 at 10:45 Comment(1)
Although Your idea is great and meet all my requirements, Ikegami's solution is much more understandable and readable for me. Thanks for Your input. Its great to see many ways to achieve same goal.Meritocracy

© 2022 - 2024 — McMap. All rights reserved.