An instance of a FilterFn must return true or false. True meaning the item in the PCollection should be utilized for downsteam processing, false meaning it should be filtered/or not utilized for downstream processing.
Example:
The following example will demonstrate how a FilterFn can be used to choose records that should be part of the PCollection.
package com.crunch.tutor.examples.pipelines; import org.apache.crunch.FilterFn; import org.apache.crunch.PCollection; import org.apache.crunch.Pipeline; import org.apache.crunch.PipelineResult; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.From; import org.apache.crunch.io.To; import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.crunch.tutor.api.music.store.Song; public class PipelineWithFilterFn extends Configured implements Tool { public static void main(final String[] args) throws Exception { final int res = ToolRunner.run(new Configuration(), new PipelineWithFilterFn(), args); System.exit(res); } @Override public int run(final String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: hadoop jar <jar-file> input-path output-path"); System.exit(1); } final String inputPath = args[0]; final String outputPath = args[1]; final Configuration config = getConf(); final Pipeline pipeline = new MRPipeline(PipelineWithFilterFn.class, "PipelineWithFilterFn", config); // read Song object that are persisted in HDFS as avro final PCollection<Song> songsCollection = pipeline.read(From.avroFile(inputPath, Avros.records(Song.class))); // filter out songs that has a null genre final PCollection<Song> songsWithValidGenre = songsCollection.filter( "Filter songs with invalid genre's", new SongsWithGenreFilterFn()); // write the filtered song objects to HDFS pipeline.write(songsWithValidGenre, To.avroFile(outputPath)); // Execute the pipeline by calling Pipeline#done() final PipelineResult result = pipeline.done(); return result.succeeded() ? 0 : 1; } /** * A {@link FilterFn} implementation that filters {@link Song} records that have a non-null {@link Song#getGenre()} */ private static class SongsWithGenreFilterFn extends FilterFn<Song> { private static final long serialVersionUID = 1673909907356321796L; @Override public boolean accept(final Song input) { // if the song has a genre, include it if (input.getGenre() != null) { return true; } else { return false; } } } }
The example will include an instance of Song to PCollection if Song has a non-null genre associated with it.
Here is a sample of my input data, represented as text for simplicity. Last item represents genre
"song-id-1","Happy","Pharrell Williams","pop" "song-id-2","Dark Horse","Katy Perry Featuring Juicy J","pop" "song-id-3","Summer","Calvin Harris",nullCommand line argument to run this program
hadoop jar crunch-examples-1.0-SNAPSHOT.jar com.crunch.tutor.examples.pipelines.PipelineWithFilterFn /crunch/examples/musicstore/filter/v1/inputs/ /crunch/examples/musicstore/filter/v1/outputs/Here are the source code links
https://github.com/nasokan/crunch-tutor/blob/master/crunch-examples/src/main/java/com/crunch/tutor/examples/pipelines/PipelineWithFilterFn.java
https://github.com/nasokan/crunch-tutor/blob/master/crunch-api/src/main/avro/musicStore/song.avdl