A new feature of Mojolicious, as of 7.49, is the implementation of the Promises/A+ specification. In this posting, we're going to use promises to implement non-blocking, parallel fetching of a number of web pages.

Background

"Normal" Perl code runs synchronously: it does each step it is told to, one at a time, and only that. This is also known as "blocking", since the program cannot do anything else.

The essence of a non-blocking code framework is that if you are waiting for something, you can register with the framework what to do when that thing happens. It can then do other processing tasks in the meantime. This means you don't have lots of processes (or possibly threads) sitting there, hogging operating-system resources, just blocked waiting for something else to finish; only the bare minimum of information is kept, about what to wait for, and what to do then.

Originally this was done just using callbacks, but this lead to what is known as "callback hell": each callback contains the next callback, at an increasing level of indentation. Even harder to keep track of is if the functions are kept separate. Avoiding this lead to the development of Promises, then Promises/A+.

Promises are used to easily add processing steps to a transaction: one can keep adding code for what to do "then" - after a previous stage has finished. Best of all, each "callback" is small and separate, with each one placed in succession. The resulting code reads like sequential, synchronous code, even though it runs asynchronously.

First let's get web pages, one after the other, synchronously. Obviously, that means the code will block anything else while it's running.

# refers to a previously-set-up @urls
sub fetchpages {
  while (my $url = shift @urls) {
    # Fetch, show title
    say $ua->get($url)->result->dom->at('title')->text;
  }
}

With a callback

This you could realistically have running as part of a web service, since with any kind of callback it will run asynchronously, therefore non-blocking, as discussed above.

sub fetchpages {
  # Stop if there are no more URLs
  return unless my $url = shift @urls;
  # Fetch the next title
  $ua->get($url, sub {
    my ($tx) = @_;
    say "$url: ", $tx->result->dom->at('title')->text;
    fetchpages();
  });
}

Promises

With promises, we're going to split the processing, still in a single "stream" of activity, into two steps, to show the use of then. Notice the first then doesn't return a Promise, just a normal object. When using then, that's fine!

sub fetchpages {
  # Stop if there are no more URLs
  return unless my $url = shift @urls;
  # Fetch the next title
  $ua->get_p($url)->then(sub {
    my ($tx) = @_;
    $tx->result;
  })->then(sub {
    my ($result) = @_;
    say "$url: ", $result->dom->at('title')->text;
    fetchpages(); # returns a promise, but of a whole new page to process
  });
}

Here you'll see we're using get_p. This is just like get, but instead of taking a callback, it returns a Promise.

Promises with two streams

Given that a Promise is a single chain of processing steps, how can we have a number of them running concurrently, without making all the requests at once? We'll use two ideas: chaining (shown above - the key is each "then" returns a new Promise), and Mojo::Promise->all - it will wait until all the promises it's given are finished. Combining them gives us multiple streams of concurrent, but sequenced, activity.

sub fetchpages {
  # Stop if there are no more URLs
  return unless my $url = shift @urls;
  # Fetch the next title
  $ua->get_p($url)->then(sub {
    my ($tx) = @_;
    $tx->result;
  })->then(sub {
    my ($result) = @_;
    say "$url: ", $result->dom->at('title')->text;
    fetchpages(); # returns a promise, but of a whole new page to process
  });
}

# Process two requests at a time
my @promises = map fetchpages(), 1 .. 2;
Mojo::Promise->all(@promises)->wait if @promises;

Another option for dealing with a number of concurrent activities, if you just want the first one that completes, is race.

What if something doesn't work?

In the above, we assumed that everything worked. What if it doesn't? Promises as a standard offer two other methods: catch, and finally.

catch is given a code-ref, which will be called when a Promise is "rejected". When things work as above, each Promise is "resolved". That means the value it was resolved with gets passed to the next then. If it is "rejected", then the error it is rejected with gets passed to the next catch in the chain, however far along that is. E.g.:

sub fetchpage {
  $ua->get_p($url)->then(sub { ... })->then(sub { ... })->catch(sub {
    # either log, or report, or something else
  });
}

If either the initial get_p, or either of the thens get rejected, then execution will skip to the catch. Another way to get this behaviour is to give a second code-ref to then.

finally is given a code-ref which will be called with either the successful (i.e. resolved) value, or the failure (i.e. the rejection) value.

The task at hand

We have to synchronise the work between the multiple "streams" of execution, so that nothing gets missed, or done twice. Luckily, in the asynchronous but single-threaded context we have here, we can just pass around a reference to a single "queue", a Perl array. Let's build that array, at the start of our script:

#!/usr/bin/env perl

# cut down from https://stackoverflow.com/questions/15152633/perl-mojo-and-json-for-simultaneous-requests/15166898#15166898
sub usage { die "Usage: bulkget-delay urlbase outdir suffixesfile\n", @_ };
# each line of suffixesfile is a suffix
# it gets appended to urlbase, then requested non-blocking
# output in outdir with suffix as filename

use Mojo::Base -strict;
use Mojo::UserAgent;
use Mojo::Promise;
use Mojo::File 'path';

my $MAXREQ = 20;

my ($urlbase, $outdir, $suffixesfile) = @ARGV;
usage "No URL" if !$urlbase;
usage "$outdir: $!" if ! -d $outdir;
usage "$suffixesfile: $!" if ! -f $suffixesfile;

my $outpath = path($outdir);
my @suffixes = getsuffixes($suffixesfile, $outpath);
my $ua = Mojo::UserAgent->new;

sub getsuffixes {
  my ($suffixesfile, $outpath) = @_;
  open my $fh, '<', $suffixesfile or die $!;
  grep { !-f $outpath->child($_); } map { chomp; $_ } <$fh>;
}

We also want a procedure to handle results that are ready, to store them in a file if successful:

sub handle_result {
  my ($outpath, $tx, $s) = @_;
  if ($tx->res->is_success) {
    print "got $s\n";
    $outpath->child($s)->spurt($tx->res->body);
  } else {
    print "error $s\n";
  }
}

And now, the Promise part:

my @promises = map makepromise($urlbase, $ua, \@suffixes, $outpath), (1..$MAXREQ);
Mojo::Promise->all(@promises)->wait if @promises;

sub makepromise {
  my ($urlbase, $ua, $suffixes, $outpath) = @_;
  my $s = shift @$suffixes;
  return if !defined $s;
  my $url = $urlbase . $s;
  print "getting $url\n";
  $ua->get_p($url)->then(sub {
    my ($tx) = @_;
    handle_result($outpath, $tx, $s);
    makepromise($urlbase, $ua, $suffixes, $outpath);
  });
}

Once each stream runs out of suffixes to process, it will finish. If we wanted to add the ability to add to the queue that could keep as many streams as we started, we would restructure so that each stream is subscribed to a queue, and if the queue is empty, to wait (asynchronously!) until it is not. That's absolutely idiomatic for Promises, but we'll look at that another time!

See also

Image by mariadelajuana, CC BY 2.0.

Tagged in : advent, non-blocking, promises

author image
Ed J

Ed J (aka "mohawk" on IRC) has been using Perl for a long time. He is currently porting the reference GraphQL implementation from the JavaScript version to Perl. Find out more by joining the #graphql-perl channel on irc.perl.org!