Sunday, December 14, 2014

Crunch DoFn Example

All processing functions in Crunch extend a DoFn, which has logic to communicate with the Hadoop API. For each input, a DoFn is capable producing 0...N outputs.

How is it different from a MapFn?
If you happened to read my post about MapFn, I mentioned MapFn as an implementation of a DoFn that guarantees one output for each input. However, a DoFn provides an opportunity to produce 0 or more outputs for each input.

Example:
Let's review the example from MapFn, where we read a text file as input and produce an instance of Song as output for each line in the text file. For the purpose of this example, let us assume our input to be formatted this way
"song-id","song-name","artist-name"
We may have songs that have single artist or featuring artists.
"song-id-1","Mirrors","Justin Timberlake"
"song-id-2","Fancy","Iggy Azalea ft. Charli XCX"
Let's say for each input we read, we need to create an instance of Song with songId, songName, and artistName. If the artist name includes a featuring(ft.) artist, we need to build two song instances, one with real artist name and another with featuring artist name.

Creating a DoFn
Typically we create an implementation of DoFn by extending the Crunch DoFn. We will also need to specify the input type and the output type so that crunch will be able to serialize and deserialize the data. Also, it is important to make sure all fields specified in DoFn are serializable, if not those fields should be marked as transient. The transient fields can be initialized in DoFn#initialize() method.

The core of the DoFn is the DoFn#process() method. In this method we take an input, apply processing logic and return 0..N outputs.
package com.crunch.tutor.examples.musicstore;

import java.io.IOException;

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;

import au.com.bytecode.opencsv.CSVParser;

import com.crunch.tutor.api.music.store.Song;

/**
 * A {@link DoFn} implementation that takes in a String as input and return a {@link Song} instance.
 * Each line of input should be formatted of the form
 * {@code "songId","song title","artist"} (or) {@code "songId","song title","artist ft. artist"}. If
 * the artist name includes a featuring artist, then two instances of {@link Song} will be produced.
 */
public class SongExtractorDoFn extends DoFn<String, Song> {

    // Generate this since DoFn implements serializable
    private static final long serialVersionUID = -1034942975963730842L;

    // Let's create a default parser that splits based on comma(,)
    private static CSVParser CSV_PARSER = new CSVParser();

    @Override
    public void process(final String input, final Emitter<Song> emitter) {

        String[] parsedInputs;
        try {
            // For the sake of simplicity, let us assume that input file is properly formatted csv.
            parsedInputs = CSV_PARSER.parseLine(input);
        } catch (final IOException e) {
            e.printStackTrace();
            return;
        }

        // Again, for simplicity, let us assume the input data contains the right fields and we
        // do not fail with exception. But in a real use case, we have to check for incorrectly
        // formatted data, and handle exceptions appropriately.
        final Song.Builder songBuilder = Song.newBuilder();
        songBuilder.setSongId(parsedInputs[0]);
        songBuilder.setSongTitle(parsedInputs[1]);

        // Find artist and featuring artist information
        final String[] artistNames = parsedInputs[2].split("ft. ");
        // Emit song object(s) as an output of this processing
        for (final String artistName : artistNames) {
            songBuilder.setArtistName(artistName);
            // The emitter sends the output from a DoFn to the Hadoop API. It can be called any
            // number of times to produce the desired number of outputs
            emitter.emit(songBuilder.build());
        }
    }
}

Driver code:
Now let us see how to make use of DoFn in a Crunch Pipeline. We use a DoFn in a PCollection#parallelDo() method, which tells the crunch planner to trigger a MapReduce job that will apply the logic specified in the DoFn to the input PCollection. The result of the operation will be a PCollection that is formed by the output type specified in the DoFn. In our example we make use of this method:
pCollection.parallelDo(String, DoFn, PType)
and wrote the implementation as
pCollection.parallelDo("describe the type of action it's doing", new SongExtractorDoFn(), Avros.records(Song.class));
Since we are making use of Avros, I used Avros#records() to generate a PType. The following code will read a text file from HDFS, convert each line from text file to a Song object and write it to HDFS.
package com.crunch.tutor.examples.pipelines;

import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.avro.AvroFileTarget;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.crunch.tutor.api.music.store.Song;
import com.crunch.tutor.examples.musicstore.SongExtractorDoFn;

public class SongExtractorFromTextFileUsingDoFn extends Configured implements Tool {

    public static void main(final String[] args) throws Exception {
        final int res = ToolRunner.run(new Configuration(),
                new SongExtractorFromTextFileUsingDoFn(), args);
        System.exit(res);
    }

    @Override
    public int run(final String[] args) throws Exception {

        if (args.length != 2) {
            System.err.println("Usage: hadoop jar <jar-file> input-path output-path");
            System.exit(1);
        }
        final Configuration config = getConf();
        final Pipeline songExtractorPipeline = new MRPipeline(
                SongExtractorFromTextFileUsingDoFn.class, "SongExtractorFromTextFileUsingDoFn",
                config);

        // Read text file from HDFS
        final PCollection<String> songsAsStrings = songExtractorPipeline.readTextFile(args[0]);
        // Parse the input from text file and create Song objects. We use Avros#Records to determine
        // the PType of the resulting collection. A PType is a Crunch way of representing the type
        // of a PCollection that will instruct the framework on how to read/write data from the
        // PCollection.
        final PCollection<Song> songsFromStrings = songsAsStrings.parallelDo(
                "Build Song Objects from text file", new SongExtractorDoFn(),
                Avros.records(Song.class));
        // Write all song objects to HDFS
        songExtractorPipeline.write(songsFromStrings, new AvroFileTarget(args[1]));

        // Execute the pipeline by calling Pipeline#done()
        final PipelineResult result = songExtractorPipeline.done();
        return result.succeeded() ? 0 : 1;
    }
}
Here is sample of input data
"song-id-1","Happy","Pharrell Williams"
"song-id-2","Dark Horse","Katy Perry ft. Juicy J"
"song-id-3","Summer","Calvin Harris"
"song-id-4","Fancy","Iggy Azalea ft. Charli XCX"
Command line argument to run this program
hadoop jar crunch-examples-1.0-SNAPSHOT.jar com.crunch.tutor.examples.pipelines.SongExtractorFromTextFileUsingDoFn crunch/examples/musicstore/extractor/v2/inputs/ crunch/examples/musicstore/extractor/v2/outputs
Source code links
https://github.com/nasokan/crunch-tutor
https://github.com/nasokan/crunch-tutor/blob/master/crunch-examples/src/main/java/com/crunch/tutor/examples/musicstore/SongExtractorDoFn.java
https://github.com/nasokan/crunch-tutor/blob/master/crunch-examples/src/main/java/com/crunch/tutor/examples/pipelines/SongExtractorFromTextFileUsingDoFn.java