Processing files from S3 with Cascading

   Cascading is a Hadoop ecosystem framework that provides a higher level abstraction over MapReduce. I recently worked on a Cascading prototype that would read log files from an Amazon Web Services S3 bucket, do a minor transform, land the output in HDFS then move the files to another S3 bucket configured for archiving.

Figuring out how to get Cascading to read a stream of data from S3 turned out to be a bit tricky since the documentation or example applications didn’t bring together all the pieces explicitly so this article will capture what I’ve learned.

The first thing to understand is that Cascading’s S3 support is really just an extension of Hadoop support for S3.

Secondly, as noted in the Hadoop S3 wiki page, there are two of what I’ll call “formats” that Hadoop supports: HDFS file types and what is called “native” files. The former is a file in the same format as it would be stored in HDFS and the latter is what can be thought of as a “plain old file”. In the prototypes I’ve worked I needed to access native files which were gzipped, delimited files (Hadoop can process gzipped files natively and Cascading offers an extension that supports zip files too).

In the end the tricky part was finding the properties and understanding the two different URI schemes. And it turned out to be very simple to stream files from S3 into a Cascading job. Just setup a Tap with an S3 URI:


import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;

// ...
public class Main {

    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        String accessKey = args[0];
        String secretKey = args[1];

        properties.setProperty("fs.s3n.awsAccessKeyId", accessKey);
        properties.setProperty("fs.s3n.awsSecretAccessKey", secretKey);

        properties.setProperty("fs.defaultFS", "hdfs://localhost:8020/");
        properties.setProperty("fs.permissions.umask-mode", "007");

        AppProps.setApplicationJarClass( properties, Main.class );

        HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

        String input = "s3n://my-bucket/my-log.gz";

        Tap inTap = new Hfs( new TextDelimited( false, "\t" ), input);

        Pipe copyPipe = new Pipe( "copy" );
        Tap outTap = new Hfs( new TextDelimited( false, "," ), "hdfs://tmp/output");

        FlowDef flowDef = FlowDef.flowDef()
	    .addSource( copyPipe, inTap )
	    .addTailSink( copyPipe, outTap );

        flowConnector.connect( flowDef ).complete();
    }
}

The code above would stream a tab delimited file directly from S3 and output it to the HDFS folder /tmp/output as a comma separated file.

I should also note that this code can be run on Elastic Map Reduce in the cloud as well so the data never has to leave the Cloud.

Leave a Reply