Sunday, June 8, 2014

Crunch MapFn Example

In this post, we will look at processing data in Apache Crunch using a MapFn. A MapFn in Crunch is an implementation of DoFn that can apply processing logic for each input and returns only one result for each input.

For example, if we read a text file that contains 10 lines where each line is considered as an input, we can use a MapFn which applies the processing logic and returns 10 outputs.

MapFn guarantees an output for each input, however we should restraint using a MapFn if there is a likely case where we do not generate an output. In such cases we can use a DoFn.

Example:
In this example, let us prepare the data required for a Music Store. The music-store will host song information, like song-id, song-title, and artist name. Let us say we have all the song information stored in HDFS as text file in csv format. We can utilize Crunch API to read the text file and apply our processing logic so that we can convert it to Song objects and write them to a path in HDFS.

Create Song class:
I have used Avro to create my DTO since they work well with Hadoop and Crunch. Information on setting up project to use Avro can be found here http://blog.nithinasokan.com/2014/05/getting-started-with-avro.html
@namespace("com.crunch.tutor.api.music.store")
protocol SongProtocol {
    record Song {
        string songId;
        string songTitle;
        string artistName;
    }
}
However, if you do not wish to use Avro. You may utilize Writables, Thrift etc that were also built to work well with Hadoop.

Creating a MapFn
Typically we create an implementation of MapFn by extending the Crunch MapFn. We will also need to specify the input type and the output type so that crunch will be able to serialize and deserialize the data. Also, it is important to make sure all fields specified in MapFn are serializable, if not those fields should be marked as transient. The transient fields can be initialized in MapFn#initialize() method.

The core of the MapFn is the MapFn#map() method. In this method we take an input, apply processing logic and return an output.
package com.crunch.tutor.examples.musicstore;

import java.io.IOException;
import org.apache.crunch.MapFn;
import au.com.bytecode.opencsv.CSVParser;
import com.crunch.tutor.api.music.store.Song;

/**
 * A {@link MapFn} implementation that takes in a String as input and return a {@link Song}
 * instance. Each line of the input string should be formatted of the form
 * {@code "songId","song title","artist"}
 */
public class SongMapFn extends MapFn<String, Song> {

    // Generate this since MapFn implements serializable
    private static final long serialVersionUID = -1034942975963730842L;

    // Let's create a default parser that splits based on comma(,)
    private static CSVParser CSV_PARSER = new CSVParser();

    @Override
    public Song map(final String input) {

        final Song.Builder songBuilder = Song.newBuilder();
        try {

            // For the sake of simplicity, let us assume that input file is properly formatted csv.
            final String[] parsedInputs = CSV_PARSER.parseLine(input);

            // Again, for simplicity, let us assume the input data contains the right fields and we
            // do not fail with exception. But in a real use case, we have to check for incorrectly
            // formatted data, and handle exceptions appropriately.
            songBuilder.setSongId(parsedInputs[0]);
            songBuilder.setSongTitle(parsedInputs[1]);
            songBuilder.setArtistName(parsedInputs[2]);
        } catch (final IOException e) {
            e.printStackTrace();
        }
        return songBuilder.build();
    }
}
Driver code:
Now let us see how to make use of the MapFn in a Crunch Pipeline. We use a MapFn in a PCollection#parallelDo() method, which tells the crunch planner to trigger a MapReduce job that will apply the logic specified in the MapFn to the input PCollection. The result of the operation will be a PCollection that is formed by the output type specified in the MapFn.

In our example we make use of this method:
pCollection.parallelDo(String, DoFn, PType)
and wrote the implementation as
pCollection.parallelDo("describe the type of action it's doing", new SongMapFn(), Avros.records(Song.class));
Since we are making use of Avros, we have utilized Avros#records() to generate a PType. The following code will read a text file from HDFS, convert each line from text file to a Song object and write it to HDFS.
package com.crunch.tutor.examples.pipelines;

import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.avro.AvroFileTarget;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.crunch.tutor.api.music.store.Song;
import com.crunch.tutor.examples.musicstore.SongMapFn;

public class SongExtractorFromTextFile extends Configured implements Tool {

    public static void main(final String[] args) throws Exception {
        final int res = ToolRunner.run(new Configuration(), new SongExtractorFromTextFile(), args);
        System.exit(res);
    }

    @Override
    public int run(final String[] args) throws Exception {

        if (args.length != 2) {
            System.err.println("Usage: hadoop jar <jar-file> input-path output-path");
            System.exit(1);
        }

        final Configuration config = getConf();

        final Pipeline songExtractorPipeline = new MRPipeline(SongExtractorFromTextFile.class,
                "SongExtractor", config);

        // Read text file from HDFS
        final PCollection<String> songsAsStrings = songExtractorPipeline.readTextFile(args[0]);

        // Parse the input from text file and create Song objects. We use Avros#Records to determine
        // the PType of the resulting collection. A PType is a Crunch way of representing the type
        // of a PCollection that will instruct the framework on how to read/write data from the
        // PCollection.
        final PCollection<Song> songsFromStrings = songsAsStrings.parallelDo(
                "Build Song from text file", new SongMapFn(), Avros.records(Song.class));

        // Write all song objects to HDFS
        songExtractorPipeline.write(songsFromStrings, new AvroFileTarget(args[1]));

        // Execute the pipeline by calling Pipeline#done()
        final PipelineResult result = songExtractorPipeline.done();

        return result.succeeded() ? 0 : 1;
    }
}
Here is a sample of my input data
"song-id-1","Happy","Pharrell Williams"
"song-id-2","Dark Horse","Katy Perry Featuring Juicy J"
"song-id-3","Summer","Calvin Harris"
Command line argument to run this program
hadoop jar crunch-examples-1.0-SNAPSHOT.jar com.crunch.tutor.examples.pipelines.SongExtractorFromTextFile /crunch/examples/musicstore/extractor/v1/inputs/ /crunch/examples/musicstore/extractor/v1/outputs/
Here are the source code links
https://github.com/nasokan/crunch-tutor
https://github.com/nasokan/crunch-tutor/blob/master/crunch-examples/src/main/java/com/crunch/tutor/examples/musicstore/SongMapFn.java https://github.com/nasokan/crunch-tutor/blob/master/crunch-examples/src/main/java/com/crunch/tutor/examples/pipelines/SongExtractorFromTextFile.java https://github.com/nasokan/crunch-tutor/blob/master/crunch-examples/src/main/resources/musicStore/songs.txt