MapReduce Pipeline in Perl

Perl's attitude to things is to be as unobtrusive as possible. Well, at least this is my approach.

In terms of using MapReduce inside a Perl application the following API should suffice:

use Parallel::MapReduce;
my $mri = new Parallel::MapReduce (...);

my $B = {1 => 'this is something ',
         2 => 'this is something else',
         3 => 'something else completely'};
my $C = $mri->mapreduce (
                         sub { ... mapper here ... },
                         sub { ... reducer here .. },
                         $B);

The mapreduce would use the provided functions and would transform hash (reference) $B into a new hash (reference). Any setup, also in the parallel distributed case, will be handled by the $mri object.

Closure'd MapReducers

One rather obvious extension is to let the user prefabricate one mapreducer, which can be later used over and over again without the need to respecify the mapper and reducer functions.

To set one up which does the word counting trick:

my $lines2wc = $mri->mapreducer (
               sub {
                    my ($k, $v) = (shift, shift);
                    return map { $_ => 1 } split /\s+/, $v;
               },
               sub {
                    my ($k, $v) = (shift, shift);
                    my $sum = 0; map { $sum += $_ } @$v;
                    return $sum;
               }
               );

And to use that you only need to procure the incoming hash:

my $C = &$lines2wc ($B);

Internally this is trivial to implement with Perl's closures:

package Parallel::MapReduce;
...
sub mapreducer {
    my $self   = shift;
    my $map    = $_[0];
    my $reduce = $_[1];
   
    return sub {
        my $mri = $self;
        return $mri->mapreduce ($map, $reduce, @_);
    }
};

It's so easy that it almost hurts.

MapReduce Pipelines

As we have molded our interface along a transformer paradigm (and not an object-oriented one), we can harvest another benefit: pipelining transformers.

The transformer $lines2wc we have defined above expects a hash where the lines occupy individual key/value pairs. The key could mean the original line number.

If you are confronted with a text file, then you would have to read the stream first, then split it along line breaks before injecting it into the MapReduce infrastructure.

Alternatively, you could let MapReduce do the job with another transformer, one which takes a hash with only a single entry

my $A = {1 => 'this is something
this is something else
something else completely'
};

and which does the splitting and line counting for us:

my $s2lines = $mri->mapreducer (
              sub {
                   my ($k, $v) = (shift, shift);
                   my $c = 0;
                   return map { ++$c => $_ } split /\n/, $v
              },
              sub {
                   my ($k, $v) = (shift, shift);
                   return join "", @$v;
              }
              );

The two transformers can be chained into a pipeline

my $s2wc = $mri->pipeline ($s2lines, $lines2wc);

and that can be used like any other transformer:

my $C = &$s2wc ($A);

Pipelines are doubleplus good.

Intended Sideeffects

Apart from the convenience of reusability pipelines have another important consequence:

Obviously, when executing a pipeline, we are not interested in intermediate hash results. From a MapReduce perspective this means that there is no need to transfer these hash fragments spotted over hundreds of nodes back to the master.

Only the end result will be actually collected, the intermediate hashes are only virtual.

Posted In

It's not yet on the CPAN,

It's not yet on the CPAN, therefore it doesn't exist. Please rectify. :)

Anonymous (not verified) | Mon, 07/14/2008 - 22:03

Re: It's not yet on the CPAN,

It's not yet on the CPAN, therefore it doesn't exist. Please rectify. :)

It will :-) I'm really close, but can only work on weekends on it.

\rho

rho | Tue, 07/15/2008 - 06:13
rho | Mon, 07/21/2008 - 07:54

Parallel::Iterator

Have a look at Parallel::Iterator. It's just the map part of MR really - but it might give you something you can work with. It's fork-based and uses Storable to marshal the data that's sent to the map closure - so it's only useful in cases where the map function is expensive compared to those operations. Given those limitations I think you should trivially be able to implement MR using it :)

Andy Armstrong (not verified) | Tue, 07/15/2008 - 12:41

Re: Parallel::Iterator

I think you should trivially be able to implement MR using it :)

Andy,

Thanks for the hint!

I have looked at it, but I wonder how it would help me to distribute the tasks to the worker machines. And forking is too heavy for me as I have to maintain channels to the worker machines (and that is done via forked ssh tunnels.

But I have other ideas when/how to use your package!

\rho

rho | Tue, 07/15/2008 - 19:30
Anonymous (not verified) | Wed, 07/23/2008 - 10:45

Re: mapreduce using HTTP as transport

Hmmm, certainly worth a try. Maybe there is a way to merge this with Parallel::MapReduce. The package structure is still in flux and will be for some time.

rho | Wed, 07/23/2008 - 11:22