So at Digg, we have been working our own Hadoop cluster using Cloudera‘s distribution. One of the things we have been working through is how can we split our large compressed data and run them in parallel on Hadoop? One of the biggest drawbacks from compression algorithms like Gzip is that you can’t split them into multiple mappers. This is where LZO comes in.
The LZO library implements a number of algorithms with the following features:
- Compression is comparable in speed to deflate compression.
- On modern architectures, decompression is very fast; in non-trivial cases able to exceed the speed of a straight memory-to-memory copy due to the reduced memory-reads.
- Requires an additional buffer during compression (of size 8 kB or 64 kB, depending on compression level).
- Requires no additional memory for decompression other than the source and destination buffers.
- Allows the user to adjust the balance between compression quality and compression speed, without affecting the speed of decompression.
This is great until you start trying to actually get LZO working on Hadoop..First off, it gets really confusing when its now removed from Hadoop 0.20+ because of GPL restrictions.
I first came across a blog post by Johan Oskarsson that discussed this. Unfortunately when you dive into HADOOP-4640 you find out it’s against 0.20. Cloudera’s distribution uses a modified version of 0.18.3. The patch from HADOOP-4640 applies pretty cleanly besides a few things. On top of this, you need HADOOP-2664 which enables LZOP codec. You actually need this because the compressor on most Linux systems is `lzop` and that differs from the traditional LzoCodec bundled in 0.18.
So how do we get all of this working? First off grab both modified patches from my Github account.
Once you have those, apply the patches to your Cloudera distribution. Then be sure to rebuild. After that’s done and you have redeployed to your clients and production cluster you need to modify your hadoop-site.xml on the client side.
<property> <name>io.compression.codecs</name> <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.LzopCodec</value> <description>A list of the compression codec classes that can be used for compression/decompression.</description> </property>
Once that is completed, go ahead and upload your large LZO file to your Hadoop cluster.
So lets say you uploaded the file:
$ hadoop fs -put large_file.lzo /tmp/large_file.lzo
The next step is you need to index your LZO file, so that hadoop knows how to split the file into multiple mappers.
The Indexer.jar in the my Github account will be used for this process. Now you need to run the Indexer.jar and tell it what file to generate an index file for.
$ hadoop jar Indexer.jar /tmp/large_file.lzo
After that’s completed, you’re almost there! The index file will be created in /tmp. Now all you need to do is run a map/reduce job and your set! Don’t forget to set the -inputFormat parameter. Here is a code snippet using wordcount example:
#!/bin/sh HADOOP_HOME=/usr/lib/hadoop $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.18.3-7-streaming.jar \ -input /tmp/large_file.lzo \ -output wc_test \ -inputformat org.apache.hadoop.mapred.LzoTextInputFormat \ -mapper 'cat' \ -reducer 'wc -l'