Sunday, March 15, 2015

Crunch FilterFn Example

A FilterFn is a specialized DoFn implementation that helps filter out items in a PCollection or PTable. While this logic can be achieved using a DoFn, the filter function is a convenient API that Crunch provides to choose what elements are represented in a PCollection/PTable.

An instance of a FilterFn must return true or false. True meaning the item in the PCollection should be utilized for downsteam processing, false meaning it should be filtered/or not utilized for downstream processing.

Example:
The following example will demonstrate how a FilterFn can be used to choose records that should be part of the PCollection.

package com.crunch.tutor.examples.pipelines;

import org.apache.crunch.FilterFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.From;
import org.apache.crunch.io.To;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.crunch.tutor.api.music.store.Song;

public class PipelineWithFilterFn extends Configured implements Tool {

    public static void main(final String[] args) throws Exception {
        final int res = ToolRunner.run(new Configuration(), new PipelineWithFilterFn(), args);
        System.exit(res);
    }

    @Override
    public int run(final String[] args) throws Exception {

        if (args.length != 2) {
            System.err.println("Usage: hadoop jar <jar-file> input-path output-path");
            System.exit(1);
        }

        final String inputPath = args[0];
        final String outputPath = args[1];

        final Configuration config = getConf();
        final Pipeline pipeline = new MRPipeline(PipelineWithFilterFn.class,
                "PipelineWithFilterFn", config);
        // read Song object that are persisted in HDFS as avro
        final PCollection<Song> songsCollection = pipeline.read(From.avroFile(inputPath, Avros.records(Song.class)));
        // filter out songs that has a null genre
        final PCollection<Song> songsWithValidGenre = songsCollection.filter(
                "Filter songs with invalid genre's", new SongsWithGenreFilterFn());
        // write the filtered song objects to HDFS
        pipeline.write(songsWithValidGenre, To.avroFile(outputPath));

        // Execute the pipeline by calling Pipeline#done()
        final PipelineResult result = pipeline.done();
        return result.succeeded() ? 0 : 1;
    }

    /**
     * A {@link FilterFn} implementation that filters {@link Song} records that have a non-null {@link Song#getGenre()}
     */
    private static class SongsWithGenreFilterFn extends FilterFn<Song> {

        private static final long serialVersionUID = 1673909907356321796L;

        @Override
        public boolean accept(final Song input) {

            // if the song has a genre, include it
            if (input.getGenre() != null) {
                return true;
            } else {
                return false;
            }
        }
    }
}

The example will include an instance of Song to PCollection if Song has a non-null genre associated with it.
Here is a sample of my input data, represented as text for simplicity. Last item represents genre
"song-id-1","Happy","Pharrell Williams","pop"
"song-id-2","Dark Horse","Katy Perry Featuring Juicy J","pop"
"song-id-3","Summer","Calvin Harris",null
Command line argument to run this program
hadoop jar crunch-examples-1.0-SNAPSHOT.jar com.crunch.tutor.examples.pipelines.PipelineWithFilterFn /crunch/examples/musicstore/filter/v1/inputs/ /crunch/examples/musicstore/filter/v1/outputs/
Here are the source code links
https://github.com/nasokan/crunch-tutor/blob/master/crunch-examples/src/main/java/com/crunch/tutor/examples/pipelines/PipelineWithFilterFn.java
https://github.com/nasokan/crunch-tutor/blob/master/crunch-api/src/main/avro/musicStore/song.avdl

11 comments:

  1. I was searching for the right blog to get Hadoop updates to know what is happening in the Big Data industry. I found your blog where I can get a lot of new updates in storing and retrieving the data concept. Thank you admin I would like to share this blog with my friends also. Keep updating, waiting for your next article.
    Regards
    Big Data Training Chennai | Best Hadoop Training in Chennai

    ReplyDelete
  2. Thank you a lot for providing individuals with a very spectacular possibility to read critical reviews from this site.

    AWS training in bangalore

    ReplyDelete
  3. This is my first visit to your blog, your post made productive reading, thank you Digital Marketing Training in Mumbai

    ReplyDelete
  4. Expected to form you a next to no word to thank you once more with respect to the decent recommendations you've contributed here.
    fire and safety course in chennai

    ReplyDelete
  5. Wonderful information, thanks a lot for sharing kind of information. Your website gives the best and the most interesting information. Thanks a ton once again

    Oracle DBA Training in Chennai
    Oracle PLSQL Training in Chennai
    Oracle Performance Tunning Training in Chennai

    ReplyDelete
  6. Hi, thank you very much for new information , i learned something new. Very well written. It was so good to read and usefull to improve knowledge. Keep posting. If you are looking for any python related information please visit our website
    python training in bangalore

    ReplyDelete

  7. Thanks for sharing your innovative ideas to our vision. I have read your blog and I gathered some new information through your blog. Your blog is really very informative and unique. Keep posting like this. Awaiting for your further update. If you are looking for any Python programming related information, please visit our website python training institute in Bangalore

    ReplyDelete
  8. After reading this web site I am very satisfied simply because this site is providing comprehensive knowledge for you to audience... Thank you to the perform as well as discuss anything incredibly important in my opinion
    AWS training in chennai | AWS training in annanagar | AWS training in omr | AWS training in porur | AWS training in tambaram | AWS training in velachery

    ReplyDelete
  9. I like the helpful info you supply in your articles. I’ll bookmark your weblog and take a look at once more here regularly. I am relatively certain I will learn a lot of new stuff right here! Good luck for the following!

    sap training in chennai

    sap training in velachery

    azure training in chennai

    azure training in velachery

    cyber security course in chennai

    cyber security course in velachery

    ethical hacking course in chennai

    ethical hacking course in velachery

    ReplyDelete