Wednesday, April 30, 2014

Crunch program to read/write text files

In this post I will demonstrate how to add Apache Crunch dependency to a project and write a  basic map reduce application that is capable of reading a text file from HDFS (hadoop distributed file system) and writing it back to HDFS.

Prerequisites
Hadoop distribution available in distributed or pseudo-distributed mode. (If you do not have hadoop installed, you may follow instructions from cloudera to install hadoop in pseudo-distributed mode)

Crunch maven dependency
Apache Crunch can be added to maven projects with the following artifact information
<dependency>
    <groupId>org.apache.crunch</groupId>
    <artifactId>crunch-core</artifactId>
    <version>${crunch.version}</version>
</dependency>
Release versions are available here. It has to be noted that there are certain constraints when choosing a Crunch version because of a dependency on HBase version. Since Crunch is a project that is making improvements and features on a regular basis, it is a good idea to look at the official recommendation when it comes to choosing a version. You can find this information from Crunch Getting Started page. For this example, I'm using 0.9.0-hadoop2 as my Crunch version.

Example program
The goal of this program is to demonstrate whether Crunch dependency plays well with our hadoop installation. We will read a text file and write it contents to a different path in HDFS.
public class ReadWriteExample {

    public static void main(final String[] args) throws Exception {

        if (args.length != 2) {
            System.err.println("Usage: hadoop jar <jar-file> input-path output-path");
            System.exit(1);
        }

        // Get input and output paths
        final String inputPath = args[0];
        final String outputPath = args[1];

        // Create an instance of Pipeline by providing ClassName and the job configuration.
        final Pipeline pipeline = new MRPipeline(ReadWriteExample.class, new Configuration());

        // Read a text file
        final PCollection<String> inputLines = pipeline.readTextFile(inputPath);

        // write the text file
        pipeline.writeTextFile(inputLines, outputPath);

        // Execute the pipeline by calling Pipeline#done()
        final PipelineResult result = pipeline.done();

        System.exit(result.succeeded() ? 0 : 1);
    }
}
The Pipeline#readTextFile(String) expects a path that lives in HDFS. This path can denote a filename if you want the map-reduce program to read a single file. However, if the path denotes a folder, hadoop will read all text files inside the folder and output the results.

Now, to test this program you need to create  your file in HDFS. One possible way to quickly create files in HDFS is to move files from local filesystem to HDFS. This can be done from terminal/command prompt
hadoop fs -put /path/to/local-file-or-folder/ /path/to/destination/
However folders can be created using
hadoop fs -mkdir /path/to/folderName/
To run the program, you have to build a jar file(you may use maven) and issue this command
hadoop jar <jar-file-location> ReadWriteExample /inputFile.txt /output-folder/

Here is the set of commands I used to run this program
> hadoop fs -mkdir readWriteExample
> hadoop fs -mkdir readWriteExample/inputs
> hadoop fs -put /home/nithin/inputFile.txt readWriteExample/
> hadoop jar crunch-examples-1.0-SNAPSHOT.jar com.crunch.tutor.examples.demo.ReadWriteExample readWriteExample/inputs/ readWriteExample/outputs/

verify results
> hadoop fs -ls readWriteExample/outputs/
> hadoop fs -cat readWriteExample/outputs/part-m-00000
Source code available on Github.

Make use of comments section if you have a question.

Sunday, April 20, 2014

Crunch Pipeline's

Pipeline

A Crunch pipeline executes a series of MapReduce jobs created by crunch planner that are capable of reading, writing and processing data. The crunch planner essentially chains map reduce jobs in order of execution, and is capable of identifying jobs that are capable of running independently. The planner can also launch jobs upon completion of it's parent jobs. 

The Pipeline interface provides a set of methods that make it possible to read/write data, control pipeline execution, update configurations, and cleanup temporary files.

Types of Pipeline:

There are three different types of Pipeline (at the time of writing)
  1. MRPipeline
  2. SparkPipeline
  3. MemPipeline
MRPipeline
The most commonly used Pipeline for executing map reduce jobs. The MRPipeline also provides a MRPipeline#plan() method that will run the planning phase of the pipeline which generates the plan(DOT) for the pipeline execution. The generated DOT can be visualized via external tools to understand the execution strategy for that Pipeline.

SparkPipeline
A pipeline implementation that converts a crunch pipeline to a series of spark pipelines that utilizes Apache Spark api.

MemPipeline
An in-memory pipeline that could be used on a client machine. A MemPipeline will not submit any map-reduce jobs to an actual hadoop cluster. However, a MemPipeline can be used effectively for testing user code.  A typical use case shall be unit testing, where user can test pipeline execution. Since MemPipeline runs in-memory the input dataset may be relatively small to favor memory usage.

A MemPipeline provides a set of methods to create PCollection's and PTable's on the fly as opposed to MRPipeline. However, it has to be noted that PCollection's and PTable's created using a MemPipeline should not be utilized in MRPipeline or SparkPipeline.

Additionally, a MemPipeline does not offer all features like other pipelines, for example: DistributedCache; and it's mainly because of the nature of the Pipeline. The idea is to use this pipeline on smaller data which could be run faster in-memory.

Thursday, April 17, 2014

Introduction to Apache Crunch

What is Apache Crunch?


From Apache crunch
The Apache Crunch Java library provides a framework for writing, testing, and running MapReduce pipelines. Its goal is to make pipelines that are composed of many user-defined functions simple to write, easy to test, and efficient to run.
Apache Crunch is a java API that works on top of Hadoop and Apache Spark. I have been using Crunch for more than a year now, and I find it really neat and simple to write any map-reduce program. I would say the main advantages include rapid development, amount of complex operations that could be achieved easily, no boiler-plate code, and incredible planner for pipeline execution.

Some of the features of Crunch include:
  • support for complex data operations like joins, unions
  • support for reading/writing data via HBase
  • support for Avro, Protocol Buffers and Thrift 
  • managing pipeline execution

How is it different?

A lot of people may wonder how is this different from other API's like Cascading, Apache Pig. There is an excellent post on quora that describes the differences.

Crunch DataTypes

Crunch API supports three different data types (at the time of writing) that represents distributed data in HDFS (Hadoop Distributed File System).

  • PCollection<T>
    • crunch representation of a collection that is distributed, immutable and can hold data of type T. 
  • PTable<K,V>
    • crunch representation of a table which is a distributed, unordered map of keys and values. It supports adding duplicate keys and values. A PTable is really a sub-interface of PCollection, with difference that a PTable can hold a key (K), and a value (V) corresponding to it. 
  • PGroupedTable<K, V>
    • crunch representation of a grouped table, which is a distributed, sorted map of keys (K) to an Iterable of values (V). A PGroupedTable is created by calling PTable#groupByKey() which triggers the sort and shuffle phase of a map-reduce job. To create a PGroupedTable crunch takes the PTable and groups distinct keys, and the values for each key is loaded to an Iterable<V> corresponding to the key. It should be noted that they keys are in order since it goes through sort and shuflle phase of map-reduce job; and the Iterable<V> for a key should be iterated only once, subsequent iterations would fail.
We will discuss more in detail about Pipeline, PCollection's, PTable's in the next few posts.