Sunday, June 17, 2012

Hadoop & Humongus XML files

Hadoop brings distributed computing which bring amazing performance. In this post, we will look at how we can parse humongus XML file's using Hadoop.

Hadoop's documentation is bleak and does not cover much on this topic. I tried different things and thought this would be worth sharing(lessons learned).

Hadoop provides different input formats but none for XML.
Some of the common ways to parse XML in hadoop are -
1) Java MR - Custom mapreduce java programs using Mahout's XmlInputFormat (third party jar)
2) Hadoop Streaming - Custom mapreduce programs in a programming language other than java like perl, python, ruby, groovy, etc.
In case if you are using hadoop platforms which generate mapreduce -
3) Pig - XMLLoader function from the piggybank library to extract the XML within start and end tags.  Then using regular expressions or custom transform (mapper\reducer) to parse the Pig tuple.
4) Hive - A custom SerDe (Serializer or DeSerializer) or custom transform (mapper\reducer) 

I prefer using option 1 and 2, they provide more flexibility to control performance, control output formats, etc. I enjoy coding in pig and hive but they do not offer a clean solution.

Java MR  -
Pros - Provides superior performance. In hadoop everything boils down to java.
Cons - Development and testing takes time. For a project involving large number of different XML sources, it's time consuming process.
Usecase - To process daily XML load\feeds in an environment where performance is critical

Hadoop Streaming  -
Pros - Development and testing takes less time.
Cons - Performance slower than the Java MR implementation.
Usecase - One time XML parsing for historic data and then moving data into Hive or any DB. Also useful for applications where xml sources are periodically changing.

Nice tips on the Cloudera blog on what to choose as per the situation and what it supports.

Recently had to process enormous XML file's with historic data and had to use hadoop streaming (write some mapreduce). I will cover a detailed demo and use\impact of the following options on mapreduce performance
- Number of Reduce tasks
- Partition
- Comparator
- OutputFormat
- Compression

Performance also depends upon hadoop cluster configuration, config parameters, compressing map outputs, using a custom combiner(if applicable), etc. (cannot be covered in a single blogpost).

For a demo purpose will be using a 7gb XML file containing weather information.
The file is not humongous from any standards but this a demo. Yes, we will NOT be able to predict even the next rainfall from this data :-)
Goal is to parse the xml  file and output a csv file for each state.



I will be using python as the streaming language, you can choose the language of your choice.

---------------------------
The Mapper  -
---------------------------

The code below uses the python's ElementTree iterparse function to parse the XML. You can also use any other xml parser like lxml , XMLTreeBuilder, etc.  The mapper will parse the xml and output the will be in csv format.

------------------------------------------------------------

#!/usr/bin/env python
import sys
import cStringIO
import csv
from xml.etree.ElementTree import iterparse

def process(xmldata):
    xmldatafile=cStringIO.StringIO(xmldata)
    for (event, node) in iterparse(xmldatafile, events=['start']):
        if   node.tag.upper()== 'ZIPCODE':
                ZIPCODE=node.text
        elif node.tag.upper()== 'STATE':
                STATE=node.text
        elif node.tag.upper()== 'CITY':
                CITY=node.text.upper()
        elif node.tag.upper()== 'FORECASTDATE':
                FORECASTDATE=node.text
        elif node.tag.upper()=='DESCRIPTIVEFORECAST':
                attribupper = dict((key.upper(),value) for key,value in node.attrib.items())
                GENERALDESC=''
                if attribupper.get('GENERALDESC'):
                   GENERALDESC=attribupper['GENERALDESC']
        elif node.tag.upper()== 'DAYTIMEDESC':
                DAYTIMEDESC=node.text
        elif node.tag.upper()== 'NIGHTTIMEDESC':
                NIGHTTIMEDESC=node.text
        elif node.tag.upper()== 'TEMPLOW':
                TEMPLOW=node.text
        elif node.tag.upper()== 'TEMPHIGH':
                TEMPHIGH=node.text
        elif node.tag.upper()== 'PRECIPNIGHTTIME':
                PRECIPNIGHTTIME=node.text
        elif node.tag.upper()== 'PRECIPDAYTIME':
                PRECIPDAYTIME=node.text
        elif node.tag.upper()== 'SUNRISETIME':
                SUNRISETIME=node.text
        elif node.tag.upper()== 'SUNSETTIME':
                SUNSETTIME=node.text
                rowwriter.writerow((STATE,CITY,ZIPCODE,FORECASTDATE,GENERALDESC,
               TEMPLOW, TEMPHIGH,PRECIPNIGHTTIME,PRECIPDAYTIME,SUNRISETIME,
               SUNSETTIME,DAYTIMEDESC,NIGHTTIMEDESC))
        else:
               pass


if __name__ == '__main__':
    ### Create your own custom dialect like pipe delimited, tab delimited, etc
    csv.register_dialect('customdialect', quotechar='''"''',quoting=csv.QUOTE_ALL,delimiter=',')
    customdialect = csv.get_dialect('customdialect')
    rowwriter = csv.writer(sys.stdout, dialect=customdialect)
    xmlcollect = None
    xmlfind = False
    for line in sys.stdin:
        line = line.strip()
        if  line.find('<WeatherDetails>') != -1:
            xmlfind = True
            xmlcollect = cStringIO.StringIO()
            xmlcollect.write(line)
        elif line.find('
</WeatherDetails>') != -1:
            xmlcollect.write(line)
            xml = xmlcollect.getvalue()
            xmlcollect.close()
            xmlcollect = None
            xmlfind = False
            process(xml)
        else:
            if xmlfind:
               xmlcollect.write(line)

------------------------------------------------------------

Mapreduce jobs can be run without a reducer. The output of mapper becomes the output of your job, (an output file per mapper).
In the case of parsing xml, a reducer is not needed unless data needs to be sorted or some operations needs to performed on parsed data. The entire row is treated as a key.

The command below uses
1) "-D mapred.reduce.tasks=0" - to notify no reducer will be used
2) "-inputreader" uses hadoop's StreamXmlRecordReader class to split the file and pass the data between the xml tags to the mapper.
3) "-file" option makes sure the mapper program is distributed to all the hadoop nodes


$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
    -D mapred.reduce.tasks=0   \
    -D mapred.job.name="weather_parse_all"  \
    -file   /tmp/weather_parse/weather_parse_mapper.py  \
    -mapper /tmp/weather_parse/weather_parse_mapper.py  \
    -input /user/hduser/inputxmlweather/weatherall.xml  \
    -output   /user/hduser/outputweathercsv \
    -inputreader  "StreamXmlRecordReader,begin=<WeatherDetails>,end=</WeatherDetails>"


Output -



---------------------------
The Reducer -
---------------------------

We will use the reducer to just sort the data.

------------------------------------------------------------

#!/usr/bin/env python
import sys
if __name__ == '__main__':
    for line in sys.stdin:
        print line.strip()

------------------------------------------------------------

Various other things can be performed like sum,max,min on the data like getting max,min,average,etc  of temperatures for a particular zipcode,city,state over a period of time.
Hadoop's Partition & Shuffle phase will sort the data based on the key and send it to reducer. In this case the entire row.

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
  -D mapred.reduce.tasks=1  \
  -D mapred.job.name="weather_parse_all" \
  -file   /tmp/weather_parse/weather_parse_mapper.py  \
  -mapper /tmp/weather_parse/weather_parse_mapper.py     \
  -input /user/hduser/inputxmlweather/weatherall.xml \
  -output   /user/hduser/outputweathercsv  \
  -inputreader  "StreamXmlRecordReader,begin=<WeatherDetails>,end=</WeatherDetails>" \
  -file   /tmp/weather_parse/weather_parse_reducer.py   \
  -reducer /tmp/weather_parse/weather_parse_reducer.py 





--------------------------------------
Number of Reduce Tasks -
-------------------------------------

The more the number of reduce tasks  better the performance. The data is split between different reducers and runs across different nodes. Each reduce tasks writes output to a different file (Output format, if used plays a role on how the output is written and can effect number of output files).  The default partition phase which runs before the reducer computes hash value for key and sends it to a reducer.

---------------------------
Partition -
---------------------------

In most of the cases we want part of keys with same value to go a single reducer.
As in this case , we want all the records for a single zipcode\state\city going to a single reducer. If you are preforming calculations in reducer then you need to have all the values related keys in the same reducer.

The command below uses
1) "-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner" - to use the default hadoop partitioner class. This partitioner class outputs the key value pair seperated by tab. The reducer needs to be modified
------------------------------------------------------------

#!/usr/bin/env python
import sys
if __name__ == '__main__':
    for line in sys.stdin:
        lines = line.split('\t',1)
        print lines[0] + lines[1]

------------------------------------------------------------
 Note: A custom partitioner class can be created and used depending upon the data.
 2) "-D stream.num.map.output.key.fields=4" - to use first 4 fields of the row as keys and rest as the value, creating a key value pair.
3) "-D mapred.text.key.partitioner.options=-k1,1" - to partition on the first key , all the state records are send to same reducer. All the 4 key fields will be used for sorting.

 $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
  -D mapred.reduce.tasks=5  \
  -D mapred.job.name="weather_parse_all" \
  -D stream.map.output.field.separator=,     \
  -D stream.num.map.output.key.fields=4  \
  -D map.output.key.field.separator=, \
  -D mapred.text.key.partitioner.options=-k1,1 \
  -file   /tmp/weather_parse/weather_parse_mapper.py  \
  -mapper /tmp/weather_parse/weather_parse_mapper.py     \
  -input /user/hduser/inputxmlweather/weatherall.xml \
  -output   /user/hduser/outputweathercsv  \
  -inputreader  "StreamXmlRecordReader,begin=<WeatherDetails>,end=</WeatherDetails>" \
  -file   /tmp/weather_parse/weather_parse_reducer.py   \
  -reducer /tmp/weather_parse/weather_parse_reducer.py  \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner


Every reducer generates a unique file with format as part-00000,part-00001,part-00002 and so on.
We see how to change the output file name by using custom output format (later in this post).

---------------------------
Comparator - 
---------------------------

Sometimes a custom sorting is needed on keys. Comparator class "org.apache.hadoop.mapred.lib.KeyFieldBasedComparator" can easily help achieve it. We will sort the data based on forecast date descending and state,city,zipcode ascending.

The command below uses
-D mapred.text.key.comparator.options="-k1,3 -k4r"  to sort first 3 keys ascending and 4 key in reverse order (descending).

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
  -D mapred.reduce.tasks=5  \
  -D mapred.job.name="weather_parse_all" \
  -D stream.map.output.field.separator=,     \
  -D stream.num.map.output.key.fields=4  \
  -D map.output.key.field.separator=, \
  -D mapred.text.key.partitioner.options=-k1,1 \
  -D mapred.text.key.comparator.options="-k1,3 -k4r" \
  -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \

  -file   /tmp/weather_parse/weather_parse_mapper.py  \
  -mapper /tmp/weather_parse/weather_parse_mapper.py     \
  -input /user/hduser/inputxmlweather/weatherall.xml \
  -output   /user/hduser/outputweathercsv  \
  -inputreader  "StreamXmlRecordReader,begin=<WeatherDetails>,end=</WeatherDetails>" \
  -file   /tmp/weather_parse/weather_parse_reducer.py   \
  -reducer /tmp/weather_parse/weather_parse_reducer.py  \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner  

---------------------------
Output Format -
---------------------------

Hadoop is a batch system. It's task is to do the heavy lifting and feed the output to target systems. Output format is the process to generate the output as desired by other systems. For any target which exposes its input format a custom output format can be created. It can be only written in java.

For example, if parsing output of an XML are both master and detail records, then a custom output format can be coded to write master records and details records into separate files.

In our case, we would like to generate a file with name as .csv. 

The class below extends hadoop's  "org.apache.hadoop.mapred.lib.MultipleTextOutputFormat" class.

------------------------------------------------------------

package customoutputformat.myformat.weatherparser;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;

public class GenerateCustomFilename extends MultipleTextOutputFormat {

     @Override
     protected String generateFileNameForKeyValue(Text key, Text value, String name) {
                             String[] keys = key.toString().split(",");
                             String filename = keys[0];                 
                             filename = filename.replaceAll("\"","");      
                             return filename + ".csv";
     }

}


------------------------------------------------------------

The command below uses
1) libjars - to specify path of the custom output jar file
2) outputformat - to specify the class of the custom output format


 $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
  -libjars /tmp/weather_parse/customJobOutformat/weather_parse_outformat.jar \
  -D mapred.reduce.tasks=5  \
  -D mapred.job.name="weather_parse_all" \
  -D stream.map.output.field.separator=,     \
  -D stream.num.map.output.key.fields=4  \
  -D map.output.key.field.separator=, \
  -D mapred.text.key.partitioner.options=-k1,1 \
  -D mapred.text.key.comparator.options="-k1,3 -k4r" \
  -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -file   /tmp/weather_parse/weather_parse_mapper.py  \
  -mapper /tmp/weather_parse/weather_parse_mapper.py     \
  -input /user/hduser/inputxmlweather/weatherall.xml \
  -output   /user/hduser/outputweathercsv  \
  -inputreader  "StreamXmlRecordReader,begin=<WeatherDetails>,end=</WeatherDetails>" \
  -file   /tmp/weather_parse/weather_parse_reducer.py   \
  -reducer /tmp/weather_parse/weather_parse_reducer.py  \
  -outputformat customoutputformat.myformat.weatherparser.GenerateCustomFilename \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner


---------------------------
Compression -
---------------------------

Compression always comes at a price - more cpu cycles to compress\decompress the data.
Choosing the right compression type is important, if the output will be processed by some other mapreduce job. Compressions - Some are fast, some are slow, some save lot of space.
Choosing a compression which is splittable is most important. Hadoop can split the compressed file and send it to reducers. If a compression type is not splittable, the data will be processed (mapper) at a single node and hadoop cannot parallel process the data, all the performance is lost.

The command below uses
1) "-D mapred.output.compression.codec" - to specify the class used for compression, in this case its Bzip2 compression.

  $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
  -libjars /tmp/weather_parse/customJobOutformat/weather_parse_outformat.jar \
  -D mapred.reduce.tasks=5 \
  -D mapred.job.name="weather_parse_all" \
  -D stream.map.output.field.separator=,     \
  -D stream.num.map.output.key.fields=4  \
  -D map.output.key.field.separator=, \
  -D mapred.text.key.partitioner.options=-k1,1 \
  -D mapred.text.key.comparator.options="-k1,3 -k4r" \
  -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapred.output.compress=true \
  -D mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec \
  -file   /tmp/weather_parse/weather_parse_mapper.py  \
  -mapper /tmp/weather_parse/weather_parse_mapper.py     \
  -input /user/hduser/inputxmlweather/weatherall.xml \
  -output   /user/hduser/outputweathercsv  \
  -inputreader  "StreamXmlRecordReader,begin=<WeatherDetails>,end=</WeatherDetails>" \
  -file   /tmp/weather_parse/weather_parse_reducer.py   \
  -reducer /tmp/weather_parse/weather_parse_reducer.py  \
  -outputformat customoutputformat.myformat.weatherparser.GenerateCustomFilename \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner  

Output -



---------------------------
Conclusion -
---------------------------

There is No Silver Bullet when it comes to parsing Hierarchical data like XML in hadoop.


7 comments:

  1. Hi Suraj,

    Excellent. Thanks for covering in such a detail.

    -Jim

    ReplyDelete
  2. Thanks for such a detailed post.

    Can you show how to use Pig to parse the XML ?

    Thanks,
    Eric.

    ReplyDelete
  3. We have the same problem and want to parse a big xml in SQL SERVER 2008 table
    Did you have an export of your IKM.
    Best regards
    Thanks in advance

    ReplyDelete
  4. HI Suraj,
    can i use the python mapper as python UDF to parse the xml values in pig

    ReplyDelete
  5. Hi,

    Please provide how to process xml file via mapreduce and load them in hbase table using java.

    Thanks in advance.

    ReplyDelete
  6. Hi this is vijay..... Actually i m new to hadoop and was looking at ur blog very useful but couldnt find the dataset wat u used... can u mail me the dataset pls ...vijaykumar243@gmail.com
    thanks

    ReplyDelete