Showing posts with label mempipeline. Show all posts
Showing posts with label mempipeline. Show all posts

Wednesday, April 30, 2014

Crunch program to read/write text files

In this post I will demonstrate how to add Apache Crunch dependency to a project and write a  basic map reduce application that is capable of reading a text file from HDFS (hadoop distributed file system) and writing it back to HDFS.

Prerequisites
Hadoop distribution available in distributed or pseudo-distributed mode. (If you do not have hadoop installed, you may follow instructions from cloudera to install hadoop in pseudo-distributed mode)

Crunch maven dependency
Apache Crunch can be added to maven projects with the following artifact information
<dependency>
    <groupId>org.apache.crunch</groupId>
    <artifactId>crunch-core</artifactId>
    <version>${crunch.version}</version>
</dependency>
Release versions are available here. It has to be noted that there are certain constraints when choosing a Crunch version because of a dependency on HBase version. Since Crunch is a project that is making improvements and features on a regular basis, it is a good idea to look at the official recommendation when it comes to choosing a version. You can find this information from Crunch Getting Started page. For this example, I'm using 0.9.0-hadoop2 as my Crunch version.

Example program
The goal of this program is to demonstrate whether Crunch dependency plays well with our hadoop installation. We will read a text file and write it contents to a different path in HDFS.
public class ReadWriteExample {

    public static void main(final String[] args) throws Exception {

        if (args.length != 2) {
            System.err.println("Usage: hadoop jar <jar-file> input-path output-path");
            System.exit(1);
        }

        // Get input and output paths
        final String inputPath = args[0];
        final String outputPath = args[1];

        // Create an instance of Pipeline by providing ClassName and the job configuration.
        final Pipeline pipeline = new MRPipeline(ReadWriteExample.class, new Configuration());

        // Read a text file
        final PCollection<String> inputLines = pipeline.readTextFile(inputPath);

        // write the text file
        pipeline.writeTextFile(inputLines, outputPath);

        // Execute the pipeline by calling Pipeline#done()
        final PipelineResult result = pipeline.done();

        System.exit(result.succeeded() ? 0 : 1);
    }
}
The Pipeline#readTextFile(String) expects a path that lives in HDFS. This path can denote a filename if you want the map-reduce program to read a single file. However, if the path denotes a folder, hadoop will read all text files inside the folder and output the results.

Now, to test this program you need to create  your file in HDFS. One possible way to quickly create files in HDFS is to move files from local filesystem to HDFS. This can be done from terminal/command prompt
hadoop fs -put /path/to/local-file-or-folder/ /path/to/destination/
However folders can be created using
hadoop fs -mkdir /path/to/folderName/
To run the program, you have to build a jar file(you may use maven) and issue this command
hadoop jar <jar-file-location> ReadWriteExample /inputFile.txt /output-folder/

Here is the set of commands I used to run this program
> hadoop fs -mkdir readWriteExample
> hadoop fs -mkdir readWriteExample/inputs
> hadoop fs -put /home/nithin/inputFile.txt readWriteExample/
> hadoop jar crunch-examples-1.0-SNAPSHOT.jar com.crunch.tutor.examples.demo.ReadWriteExample readWriteExample/inputs/ readWriteExample/outputs/

verify results
> hadoop fs -ls readWriteExample/outputs/
> hadoop fs -cat readWriteExample/outputs/part-m-00000
Source code available on Github.

Make use of comments section if you have a question.

Sunday, April 20, 2014

Crunch Pipeline's

Pipeline

A Crunch pipeline executes a series of MapReduce jobs created by crunch planner that are capable of reading, writing and processing data. The crunch planner essentially chains map reduce jobs in order of execution, and is capable of identifying jobs that are capable of running independently. The planner can also launch jobs upon completion of it's parent jobs. 

The Pipeline interface provides a set of methods that make it possible to read/write data, control pipeline execution, update configurations, and cleanup temporary files.

Types of Pipeline:

There are three different types of Pipeline (at the time of writing)
  1. MRPipeline
  2. SparkPipeline
  3. MemPipeline
MRPipeline
The most commonly used Pipeline for executing map reduce jobs. The MRPipeline also provides a MRPipeline#plan() method that will run the planning phase of the pipeline which generates the plan(DOT) for the pipeline execution. The generated DOT can be visualized via external tools to understand the execution strategy for that Pipeline.

SparkPipeline
A pipeline implementation that converts a crunch pipeline to a series of spark pipelines that utilizes Apache Spark api.

MemPipeline
An in-memory pipeline that could be used on a client machine. A MemPipeline will not submit any map-reduce jobs to an actual hadoop cluster. However, a MemPipeline can be used effectively for testing user code.  A typical use case shall be unit testing, where user can test pipeline execution. Since MemPipeline runs in-memory the input dataset may be relatively small to favor memory usage.

A MemPipeline provides a set of methods to create PCollection's and PTable's on the fly as opposed to MRPipeline. However, it has to be noted that PCollection's and PTable's created using a MemPipeline should not be utilized in MRPipeline or SparkPipeline.

Additionally, a MemPipeline does not offer all features like other pipelines, for example: DistributedCache; and it's mainly because of the nature of the Pipeline. The idea is to use this pipeline on smaller data which could be run faster in-memory.