Sunday, December 14, 2014

Crunch DoFn Example

All processing functions in Crunch extend a DoFn, which has logic to communicate with the Hadoop API. For each input, a DoFn is capable producing 0...N outputs.

How is it different from a MapFn?
If you happened to read my post about MapFn, I mentioned MapFn as an implementation of a DoFn that guarantees one output for each input. However, a DoFn provides an opportunity to produce 0 or more outputs for each input.

Example:
Let's review the example from MapFn, where we read a text file as input and produce an instance of Song as output for each line in the text file. For the purpose of this example, let us assume our input to be formatted this way
"song-id","song-name","artist-name"
We may have songs that have single artist or featuring artists.
"song-id-1","Mirrors","Justin Timberlake"
"song-id-2","Fancy","Iggy Azalea ft. Charli XCX"
Let's say for each input we read, we need to create an instance of Song with songId, songName, and artistName. If the artist name includes a featuring(ft.) artist, we need to build two song instances, one with real artist name and another with featuring artist name.

Creating a DoFn
Typically we create an implementation of DoFn by extending the Crunch DoFn. We will also need to specify the input type and the output type so that crunch will be able to serialize and deserialize the data. Also, it is important to make sure all fields specified in DoFn are serializable, if not those fields should be marked as transient. The transient fields can be initialized in DoFn#initialize() method.

The core of the DoFn is the DoFn#process() method. In this method we take an input, apply processing logic and return 0..N outputs.
package com.crunch.tutor.examples.musicstore;

import java.io.IOException;

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;

import au.com.bytecode.opencsv.CSVParser;

import com.crunch.tutor.api.music.store.Song;

/**
 * A {@link DoFn} implementation that takes in a String as input and return a {@link Song} instance.
 * Each line of input should be formatted of the form
 * {@code "songId","song title","artist"} (or) {@code "songId","song title","artist ft. artist"}. If
 * the artist name includes a featuring artist, then two instances of {@link Song} will be produced.
 */
public class SongExtractorDoFn extends DoFn<String, Song> {

    // Generate this since DoFn implements serializable
    private static final long serialVersionUID = -1034942975963730842L;

    // Let's create a default parser that splits based on comma(,)
    private static CSVParser CSV_PARSER = new CSVParser();

    @Override
    public void process(final String input, final Emitter<Song> emitter) {

        String[] parsedInputs;
        try {
            // For the sake of simplicity, let us assume that input file is properly formatted csv.
            parsedInputs = CSV_PARSER.parseLine(input);
        } catch (final IOException e) {
            e.printStackTrace();
            return;
        }

        // Again, for simplicity, let us assume the input data contains the right fields and we
        // do not fail with exception. But in a real use case, we have to check for incorrectly
        // formatted data, and handle exceptions appropriately.
        final Song.Builder songBuilder = Song.newBuilder();
        songBuilder.setSongId(parsedInputs[0]);
        songBuilder.setSongTitle(parsedInputs[1]);

        // Find artist and featuring artist information
        final String[] artistNames = parsedInputs[2].split("ft. ");
        // Emit song object(s) as an output of this processing
        for (final String artistName : artistNames) {
            songBuilder.setArtistName(artistName);
            // The emitter sends the output from a DoFn to the Hadoop API. It can be called any
            // number of times to produce the desired number of outputs
            emitter.emit(songBuilder.build());
        }
    }
}

Driver code:
Now let us see how to make use of DoFn in a Crunch Pipeline. We use a DoFn in a PCollection#parallelDo() method, which tells the crunch planner to trigger a MapReduce job that will apply the logic specified in the DoFn to the input PCollection. The result of the operation will be a PCollection that is formed by the output type specified in the DoFn. In our example we make use of this method:
pCollection.parallelDo(String, DoFn, PType)
and wrote the implementation as
pCollection.parallelDo("describe the type of action it's doing", new SongExtractorDoFn(), Avros.records(Song.class));
Since we are making use of Avros, I used Avros#records() to generate a PType. The following code will read a text file from HDFS, convert each line from text file to a Song object and write it to HDFS.
package com.crunch.tutor.examples.pipelines;

import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.avro.AvroFileTarget;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.crunch.tutor.api.music.store.Song;
import com.crunch.tutor.examples.musicstore.SongExtractorDoFn;

public class SongExtractorFromTextFileUsingDoFn extends Configured implements Tool {

    public static void main(final String[] args) throws Exception {
        final int res = ToolRunner.run(new Configuration(),
                new SongExtractorFromTextFileUsingDoFn(), args);
        System.exit(res);
    }

    @Override
    public int run(final String[] args) throws Exception {

        if (args.length != 2) {
            System.err.println("Usage: hadoop jar <jar-file> input-path output-path");
            System.exit(1);
        }
        final Configuration config = getConf();
        final Pipeline songExtractorPipeline = new MRPipeline(
                SongExtractorFromTextFileUsingDoFn.class, "SongExtractorFromTextFileUsingDoFn",
                config);

        // Read text file from HDFS
        final PCollection<String> songsAsStrings = songExtractorPipeline.readTextFile(args[0]);
        // Parse the input from text file and create Song objects. We use Avros#Records to determine
        // the PType of the resulting collection. A PType is a Crunch way of representing the type
        // of a PCollection that will instruct the framework on how to read/write data from the
        // PCollection.
        final PCollection<Song> songsFromStrings = songsAsStrings.parallelDo(
                "Build Song Objects from text file", new SongExtractorDoFn(),
                Avros.records(Song.class));
        // Write all song objects to HDFS
        songExtractorPipeline.write(songsFromStrings, new AvroFileTarget(args[1]));

        // Execute the pipeline by calling Pipeline#done()
        final PipelineResult result = songExtractorPipeline.done();
        return result.succeeded() ? 0 : 1;
    }
}
Here is sample of input data
"song-id-1","Happy","Pharrell Williams"
"song-id-2","Dark Horse","Katy Perry ft. Juicy J"
"song-id-3","Summer","Calvin Harris"
"song-id-4","Fancy","Iggy Azalea ft. Charli XCX"
Command line argument to run this program
hadoop jar crunch-examples-1.0-SNAPSHOT.jar com.crunch.tutor.examples.pipelines.SongExtractorFromTextFileUsingDoFn crunch/examples/musicstore/extractor/v2/inputs/ crunch/examples/musicstore/extractor/v2/outputs
Source code links
https://github.com/nasokan/crunch-tutor
https://github.com/nasokan/crunch-tutor/blob/master/crunch-examples/src/main/java/com/crunch/tutor/examples/musicstore/SongExtractorDoFn.java
https://github.com/nasokan/crunch-tutor/blob/master/crunch-examples/src/main/java/com/crunch/tutor/examples/pipelines/SongExtractorFromTextFileUsingDoFn.java

Sunday, June 8, 2014

Crunch MapFn Example

In this post, we will look at processing data in Apache Crunch using a MapFn. A MapFn in Crunch is an implementation of DoFn that can apply processing logic for each input and returns only one result for each input.

For example, if we read a text file that contains 10 lines where each line is considered as an input, we can use a MapFn which applies the processing logic and returns 10 outputs.

MapFn guarantees an output for each input, however we should restraint using a MapFn if there is a likely case where we do not generate an output. In such cases we can use a DoFn.

Example:
In this example, let us prepare the data required for a Music Store. The music-store will host song information, like song-id, song-title, and artist name. Let us say we have all the song information stored in HDFS as text file in csv format. We can utilize Crunch API to read the text file and apply our processing logic so that we can convert it to Song objects and write them to a path in HDFS.

Create Song class:
I have used Avro to create my DTO since they work well with Hadoop and Crunch. Information on setting up project to use Avro can be found here http://blog.nithinasokan.com/2014/05/getting-started-with-avro.html
@namespace("com.crunch.tutor.api.music.store")
protocol SongProtocol {
    record Song {
        string songId;
        string songTitle;
        string artistName;
    }
}
However, if you do not wish to use Avro. You may utilize Writables, Thrift etc that were also built to work well with Hadoop.

Creating a MapFn
Typically we create an implementation of MapFn by extending the Crunch MapFn. We will also need to specify the input type and the output type so that crunch will be able to serialize and deserialize the data. Also, it is important to make sure all fields specified in MapFn are serializable, if not those fields should be marked as transient. The transient fields can be initialized in MapFn#initialize() method.

The core of the MapFn is the MapFn#map() method. In this method we take an input, apply processing logic and return an output.
package com.crunch.tutor.examples.musicstore;

import java.io.IOException;
import org.apache.crunch.MapFn;
import au.com.bytecode.opencsv.CSVParser;
import com.crunch.tutor.api.music.store.Song;

/**
 * A {@link MapFn} implementation that takes in a String as input and return a {@link Song}
 * instance. Each line of the input string should be formatted of the form
 * {@code "songId","song title","artist"}
 */
public class SongMapFn extends MapFn<String, Song> {

    // Generate this since MapFn implements serializable
    private static final long serialVersionUID = -1034942975963730842L;

    // Let's create a default parser that splits based on comma(,)
    private static CSVParser CSV_PARSER = new CSVParser();

    @Override
    public Song map(final String input) {

        final Song.Builder songBuilder = Song.newBuilder();
        try {

            // For the sake of simplicity, let us assume that input file is properly formatted csv.
            final String[] parsedInputs = CSV_PARSER.parseLine(input);

            // Again, for simplicity, let us assume the input data contains the right fields and we
            // do not fail with exception. But in a real use case, we have to check for incorrectly
            // formatted data, and handle exceptions appropriately.
            songBuilder.setSongId(parsedInputs[0]);
            songBuilder.setSongTitle(parsedInputs[1]);
            songBuilder.setArtistName(parsedInputs[2]);
        } catch (final IOException e) {
            e.printStackTrace();
        }
        return songBuilder.build();
    }
}
Driver code:
Now let us see how to make use of the MapFn in a Crunch Pipeline. We use a MapFn in a PCollection#parallelDo() method, which tells the crunch planner to trigger a MapReduce job that will apply the logic specified in the MapFn to the input PCollection. The result of the operation will be a PCollection that is formed by the output type specified in the MapFn.

In our example we make use of this method:
pCollection.parallelDo(String, DoFn, PType)
and wrote the implementation as
pCollection.parallelDo("describe the type of action it's doing", new SongMapFn(), Avros.records(Song.class));
Since we are making use of Avros, we have utilized Avros#records() to generate a PType. The following code will read a text file from HDFS, convert each line from text file to a Song object and write it to HDFS.
package com.crunch.tutor.examples.pipelines;

import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.avro.AvroFileTarget;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.crunch.tutor.api.music.store.Song;
import com.crunch.tutor.examples.musicstore.SongMapFn;

public class SongExtractorFromTextFile extends Configured implements Tool {

    public static void main(final String[] args) throws Exception {
        final int res = ToolRunner.run(new Configuration(), new SongExtractorFromTextFile(), args);
        System.exit(res);
    }

    @Override
    public int run(final String[] args) throws Exception {

        if (args.length != 2) {
            System.err.println("Usage: hadoop jar <jar-file> input-path output-path");
            System.exit(1);
        }

        final Configuration config = getConf();

        final Pipeline songExtractorPipeline = new MRPipeline(SongExtractorFromTextFile.class,
                "SongExtractor", config);

        // Read text file from HDFS
        final PCollection<String> songsAsStrings = songExtractorPipeline.readTextFile(args[0]);

        // Parse the input from text file and create Song objects. We use Avros#Records to determine
        // the PType of the resulting collection. A PType is a Crunch way of representing the type
        // of a PCollection that will instruct the framework on how to read/write data from the
        // PCollection.
        final PCollection<Song> songsFromStrings = songsAsStrings.parallelDo(
                "Build Song from text file", new SongMapFn(), Avros.records(Song.class));

        // Write all song objects to HDFS
        songExtractorPipeline.write(songsFromStrings, new AvroFileTarget(args[1]));

        // Execute the pipeline by calling Pipeline#done()
        final PipelineResult result = songExtractorPipeline.done();

        return result.succeeded() ? 0 : 1;
    }
}
Here is a sample of my input data
"song-id-1","Happy","Pharrell Williams"
"song-id-2","Dark Horse","Katy Perry Featuring Juicy J"
"song-id-3","Summer","Calvin Harris"
Command line argument to run this program
hadoop jar crunch-examples-1.0-SNAPSHOT.jar com.crunch.tutor.examples.pipelines.SongExtractorFromTextFile /crunch/examples/musicstore/extractor/v1/inputs/ /crunch/examples/musicstore/extractor/v1/outputs/
Here are the source code links
https://github.com/nasokan/crunch-tutor
https://github.com/nasokan/crunch-tutor/blob/master/crunch-examples/src/main/java/com/crunch/tutor/examples/musicstore/SongMapFn.java https://github.com/nasokan/crunch-tutor/blob/master/crunch-examples/src/main/java/com/crunch/tutor/examples/pipelines/SongExtractorFromTextFile.java https://github.com/nasokan/crunch-tutor/blob/master/crunch-examples/src/main/resources/musicStore/songs.txt

Thursday, May 1, 2014

How to fix hadoop NoClassDefFoundError

Ever faced a situation where a hadoop program fails with a similar exception:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/crunch/Pipeline
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at org.apache.hadoop.util.RunJar.main(RunJar.java:201)
Caused by: java.lang.ClassNotFoundException: org.apache.crunch.Pipeline
 at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
 ... 3 more
A NoClassDefFoundError exception is thrown, when a class is not available for a JVM at runtime. It is a behavior of the JVM to expect third-party jars, and classes to be available on the JVM classpath during it's execution.

MapReduce jobs are executed on a node's task tracker JVM, so how do we guarantee third-party jars are available in the JVM classpath?

Distributed Cache and HADOOP_CLASSPATH
Hadoop requires all third-party/external jars to be available on the client and remote JVM classpath. To accomplish this, there are two different steps that has to be followed.

Update HADOOP_CLASSPATH to include the location of third-party jars so that the client JVM can use it.
export HADOOP_CLASSPATH=/path/file1.jar:/path/file2.jar:$HADOOP_CLASSPATH

Run the hadoop command with -libjars option. This will instruct hadoop to place the third-party jars in DistributedCache. Once the jars are in DistributedCache, hadoop framework will take care of placing the jars in the task node's JVM classpath. However, this will only work when your program implements the Tool interface so that hadoop can identify the libjars option.

hadoop jar <jar-file> -libjars=/path/file1.jar,/path/file2.jar MainClass arg1 arg2
By running the export command first and issuing the libjars option along with your hadoop command, mapreduce jobs should be able to utilize third-party jars.

Alternatively, if the third-party jar is a dependency, you can build an uber jar so that all dependencies are loaded along with the main jar. Here is a guide on how to build a shaded (or) uber jar. http://blog.nithinasokan.com/2014/05/create-shaded-uber-jar-for-maven-multi.html


Wednesday, April 30, 2014

Crunch program to read/write text files

In this post I will demonstrate how to add Apache Crunch dependency to a project and write a  basic map reduce application that is capable of reading a text file from HDFS (hadoop distributed file system) and writing it back to HDFS.

Prerequisites
Hadoop distribution available in distributed or pseudo-distributed mode. (If you do not have hadoop installed, you may follow instructions from cloudera to install hadoop in pseudo-distributed mode)

Crunch maven dependency
Apache Crunch can be added to maven projects with the following artifact information
<dependency>
    <groupId>org.apache.crunch</groupId>
    <artifactId>crunch-core</artifactId>
    <version>${crunch.version}</version>
</dependency>
Release versions are available here. It has to be noted that there are certain constraints when choosing a Crunch version because of a dependency on HBase version. Since Crunch is a project that is making improvements and features on a regular basis, it is a good idea to look at the official recommendation when it comes to choosing a version. You can find this information from Crunch Getting Started page. For this example, I'm using 0.9.0-hadoop2 as my Crunch version.

Example program
The goal of this program is to demonstrate whether Crunch dependency plays well with our hadoop installation. We will read a text file and write it contents to a different path in HDFS.
public class ReadWriteExample {

    public static void main(final String[] args) throws Exception {

        if (args.length != 2) {
            System.err.println("Usage: hadoop jar <jar-file> input-path output-path");
            System.exit(1);
        }

        // Get input and output paths
        final String inputPath = args[0];
        final String outputPath = args[1];

        // Create an instance of Pipeline by providing ClassName and the job configuration.
        final Pipeline pipeline = new MRPipeline(ReadWriteExample.class, new Configuration());

        // Read a text file
        final PCollection<String> inputLines = pipeline.readTextFile(inputPath);

        // write the text file
        pipeline.writeTextFile(inputLines, outputPath);

        // Execute the pipeline by calling Pipeline#done()
        final PipelineResult result = pipeline.done();

        System.exit(result.succeeded() ? 0 : 1);
    }
}
The Pipeline#readTextFile(String) expects a path that lives in HDFS. This path can denote a filename if you want the map-reduce program to read a single file. However, if the path denotes a folder, hadoop will read all text files inside the folder and output the results.

Now, to test this program you need to create  your file in HDFS. One possible way to quickly create files in HDFS is to move files from local filesystem to HDFS. This can be done from terminal/command prompt
hadoop fs -put /path/to/local-file-or-folder/ /path/to/destination/
However folders can be created using
hadoop fs -mkdir /path/to/folderName/
To run the program, you have to build a jar file(you may use maven) and issue this command
hadoop jar <jar-file-location> ReadWriteExample /inputFile.txt /output-folder/

Here is the set of commands I used to run this program
> hadoop fs -mkdir readWriteExample
> hadoop fs -mkdir readWriteExample/inputs
> hadoop fs -put /home/nithin/inputFile.txt readWriteExample/
> hadoop jar crunch-examples-1.0-SNAPSHOT.jar com.crunch.tutor.examples.demo.ReadWriteExample readWriteExample/inputs/ readWriteExample/outputs/

verify results
> hadoop fs -ls readWriteExample/outputs/
> hadoop fs -cat readWriteExample/outputs/part-m-00000
Source code available on Github.

Make use of comments section if you have a question.

Sunday, April 20, 2014

Crunch Pipeline's

Pipeline

A Crunch pipeline executes a series of MapReduce jobs created by crunch planner that are capable of reading, writing and processing data. The crunch planner essentially chains map reduce jobs in order of execution, and is capable of identifying jobs that are capable of running independently. The planner can also launch jobs upon completion of it's parent jobs. 

The Pipeline interface provides a set of methods that make it possible to read/write data, control pipeline execution, update configurations, and cleanup temporary files.

Types of Pipeline:

There are three different types of Pipeline (at the time of writing)
  1. MRPipeline
  2. SparkPipeline
  3. MemPipeline
MRPipeline
The most commonly used Pipeline for executing map reduce jobs. The MRPipeline also provides a MRPipeline#plan() method that will run the planning phase of the pipeline which generates the plan(DOT) for the pipeline execution. The generated DOT can be visualized via external tools to understand the execution strategy for that Pipeline.

SparkPipeline
A pipeline implementation that converts a crunch pipeline to a series of spark pipelines that utilizes Apache Spark api.

MemPipeline
An in-memory pipeline that could be used on a client machine. A MemPipeline will not submit any map-reduce jobs to an actual hadoop cluster. However, a MemPipeline can be used effectively for testing user code.  A typical use case shall be unit testing, where user can test pipeline execution. Since MemPipeline runs in-memory the input dataset may be relatively small to favor memory usage.

A MemPipeline provides a set of methods to create PCollection's and PTable's on the fly as opposed to MRPipeline. However, it has to be noted that PCollection's and PTable's created using a MemPipeline should not be utilized in MRPipeline or SparkPipeline.

Additionally, a MemPipeline does not offer all features like other pipelines, for example: DistributedCache; and it's mainly because of the nature of the Pipeline. The idea is to use this pipeline on smaller data which could be run faster in-memory.

Thursday, April 17, 2014

Introduction to Apache Crunch

What is Apache Crunch?


From Apache crunch
The Apache Crunch Java library provides a framework for writing, testing, and running MapReduce pipelines. Its goal is to make pipelines that are composed of many user-defined functions simple to write, easy to test, and efficient to run.
Apache Crunch is a java API that works on top of Hadoop and Apache Spark. I have been using Crunch for more than a year now, and I find it really neat and simple to write any map-reduce program. I would say the main advantages include rapid development, amount of complex operations that could be achieved easily, no boiler-plate code, and incredible planner for pipeline execution.

Some of the features of Crunch include:
  • support for complex data operations like joins, unions
  • support for reading/writing data via HBase
  • support for Avro, Protocol Buffers and Thrift 
  • managing pipeline execution

How is it different?

A lot of people may wonder how is this different from other API's like Cascading, Apache Pig. There is an excellent post on quora that describes the differences.

Crunch DataTypes

Crunch API supports three different data types (at the time of writing) that represents distributed data in HDFS (Hadoop Distributed File System).

  • PCollection<T>
    • crunch representation of a collection that is distributed, immutable and can hold data of type T. 
  • PTable<K,V>
    • crunch representation of a table which is a distributed, unordered map of keys and values. It supports adding duplicate keys and values. A PTable is really a sub-interface of PCollection, with difference that a PTable can hold a key (K), and a value (V) corresponding to it. 
  • PGroupedTable<K, V>
    • crunch representation of a grouped table, which is a distributed, sorted map of keys (K) to an Iterable of values (V). A PGroupedTable is created by calling PTable#groupByKey() which triggers the sort and shuffle phase of a map-reduce job. To create a PGroupedTable crunch takes the PTable and groups distinct keys, and the values for each key is loaded to an Iterable<V> corresponding to the key. It should be noted that they keys are in order since it goes through sort and shuflle phase of map-reduce job; and the Iterable<V> for a key should be iterated only once, subsequent iterations would fail.
We will discuss more in detail about Pipeline, PCollection's, PTable's in the next few posts.