Sunday, June 17, 2012

Hadoop & Humongus XML files

Hadoop brings distributed computing which bring amazing performance. In this post, we will look at how we can parse humongus XML file's using Hadoop.

Hadoop's documentation is bleak and does not cover much on this topic. I tried different things and thought this would be worth sharing(lessons learned).

Hadoop provides different input formats but none for XML.
Some of the common ways to parse XML in hadoop are -
1) Java MR - Custom mapreduce java programs using Mahout's XmlInputFormat (third party jar)
2) Hadoop Streaming - Custom mapreduce programs in a programming language other than java like perl, python, ruby, groovy, etc.
In case if you are using hadoop platforms which generate mapreduce -
3) Pig - XMLLoader function from the piggybank library to extract the XML within start and end tags.  Then using regular expressions or custom transform (mapper\reducer) to parse the Pig tuple.
4) Hive - A custom SerDe (Serializer or DeSerializer) or custom transform (mapper\reducer) 

I prefer using option 1 and 2, they provide more flexibility to control performance, control output formats, etc. I enjoy coding in pig and hive but they do not offer a clean solution.

Java MR  -
Pros - Provides superior performance. In hadoop everything boils down to java.
Cons - Development and testing takes time. For a project involving large number of different XML sources, it's time consuming process.
Usecase - To process daily XML load\feeds in an environment where performance is critical

Hadoop Streaming  -
Pros - Development and testing takes less time.
Cons - Performance slower than the Java MR implementation.
Usecase - One time XML parsing for historic data and then moving data into Hive or any DB. Also useful for applications where xml sources are periodically changing.

Nice tips on the Cloudera blog on what to choose as per the situation and what it supports.

Recently had to process enormous XML file's with historic data and had to use hadoop streaming (write some mapreduce). I will cover a detailed demo and use\impact of the following options on mapreduce performance
- Number of Reduce tasks
- Partition
- Comparator
- OutputFormat
- Compression

Performance also depends upon hadoop cluster configuration, config parameters, compressing map outputs, using a custom combiner(if applicable), etc. (cannot be covered in a single blogpost).

For a demo purpose will be using a 7gb XML file containing weather information.
The file is not humongous from any standards but this a demo. Yes, we will NOT be able to predict even the next rainfall from this data :-)
Goal is to parse the xml  file and output a csv file for each state.



I will be using python as the streaming language, you can choose the language of your choice.

---------------------------
The Mapper  -
---------------------------

The code below uses the python's ElementTree iterparse function to parse the XML. You can also use any other xml parser like lxml , XMLTreeBuilder, etc.  The mapper will parse the xml and output the will be in csv format.

------------------------------------------------------------

#!/usr/bin/env python
import sys
import cStringIO
import csv
from xml.etree.ElementTree import iterparse

def process(xmldata):
    xmldatafile=cStringIO.StringIO(xmldata)
    for (event, node) in iterparse(xmldatafile, events=['start']):
        if   node.tag.upper()== 'ZIPCODE':
                ZIPCODE=node.text
        elif node.tag.upper()== 'STATE':
                STATE=node.text
        elif node.tag.upper()== 'CITY':
                CITY=node.text.upper()
        elif node.tag.upper()== 'FORECASTDATE':
                FORECASTDATE=node.text
        elif node.tag.upper()=='DESCRIPTIVEFORECAST':
                attribupper = dict((key.upper(),value) for key,value in node.attrib.items())
                GENERALDESC=''
                if attribupper.get('GENERALDESC'):
                   GENERALDESC=attribupper['GENERALDESC']
        elif node.tag.upper()== 'DAYTIMEDESC':
                DAYTIMEDESC=node.text
        elif node.tag.upper()== 'NIGHTTIMEDESC':
                NIGHTTIMEDESC=node.text
        elif node.tag.upper()== 'TEMPLOW':
                TEMPLOW=node.text
        elif node.tag.upper()== 'TEMPHIGH':
                TEMPHIGH=node.text
        elif node.tag.upper()== 'PRECIPNIGHTTIME':
                PRECIPNIGHTTIME=node.text
        elif node.tag.upper()== 'PRECIPDAYTIME':
                PRECIPDAYTIME=node.text
        elif node.tag.upper()== 'SUNRISETIME':
                SUNRISETIME=node.text
        elif node.tag.upper()== 'SUNSETTIME':
                SUNSETTIME=node.text
                rowwriter.writerow((STATE,CITY,ZIPCODE,FORECASTDATE,GENERALDESC,
               TEMPLOW, TEMPHIGH,PRECIPNIGHTTIME,PRECIPDAYTIME,SUNRISETIME,
               SUNSETTIME,DAYTIMEDESC,NIGHTTIMEDESC))
        else:
               pass


if __name__ == '__main__':
    ### Create your own custom dialect like pipe delimited, tab delimited, etc
    csv.register_dialect('customdialect', quotechar='''"''',quoting=csv.QUOTE_ALL,delimiter=',')
    customdialect = csv.get_dialect('customdialect')
    rowwriter = csv.writer(sys.stdout, dialect=customdialect)
    xmlcollect = None
    xmlfind = False
    for line in sys.stdin:
        line = line.strip()
        if  line.find('<WeatherDetails>') != -1:
            xmlfind = True
            xmlcollect = cStringIO.StringIO()
            xmlcollect.write(line)
        elif line.find('
</WeatherDetails>') != -1:
            xmlcollect.write(line)
            xml = xmlcollect.getvalue()
            xmlcollect.close()
            xmlcollect = None
            xmlfind = False
            process(xml)
        else:
            if xmlfind:
               xmlcollect.write(line)

------------------------------------------------------------

Mapreduce jobs can be run without a reducer. The output of mapper becomes the output of your job, (an output file per mapper).
In the case of parsing xml, a reducer is not needed unless data needs to be sorted or some operations needs to performed on parsed data. The entire row is treated as a key.

The command below uses
1) "-D mapred.reduce.tasks=0" - to notify no reducer will be used
2) "-inputreader" uses hadoop's StreamXmlRecordReader class to split the file and pass the data between the xml tags to the mapper.
3) "-file" option makes sure the mapper program is distributed to all the hadoop nodes


$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
    -D mapred.reduce.tasks=0   \
    -D mapred.job.name="weather_parse_all"  \
    -file   /tmp/weather_parse/weather_parse_mapper.py  \
    -mapper /tmp/weather_parse/weather_parse_mapper.py  \
    -input /user/hduser/inputxmlweather/weatherall.xml  \
    -output   /user/hduser/outputweathercsv \
    -inputreader  "StreamXmlRecordReader,begin=<WeatherDetails>,end=</WeatherDetails>"


Output -



---------------------------
The Reducer -
---------------------------

We will use the reducer to just sort the data.

------------------------------------------------------------

#!/usr/bin/env python
import sys
if __name__ == '__main__':
    for line in sys.stdin:
        print line.strip()

------------------------------------------------------------

Various other things can be performed like sum,max,min on the data like getting max,min,average,etc  of temperatures for a particular zipcode,city,state over a period of time.
Hadoop's Partition & Shuffle phase will sort the data based on the key and send it to reducer. In this case the entire row.

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
  -D mapred.reduce.tasks=1  \
  -D mapred.job.name="weather_parse_all" \
  -file   /tmp/weather_parse/weather_parse_mapper.py  \
  -mapper /tmp/weather_parse/weather_parse_mapper.py     \
  -input /user/hduser/inputxmlweather/weatherall.xml \
  -output   /user/hduser/outputweathercsv  \
  -inputreader  "StreamXmlRecordReader,begin=<WeatherDetails>,end=</WeatherDetails>" \
  -file   /tmp/weather_parse/weather_parse_reducer.py   \
  -reducer /tmp/weather_parse/weather_parse_reducer.py 





--------------------------------------
Number of Reduce Tasks -
-------------------------------------

The more the number of reduce tasks  better the performance. The data is split between different reducers and runs across different nodes. Each reduce tasks writes output to a different file (Output format, if used plays a role on how the output is written and can effect number of output files).  The default partition phase which runs before the reducer computes hash value for key and sends it to a reducer.

---------------------------
Partition -
---------------------------

In most of the cases we want part of keys with same value to go a single reducer.
As in this case , we want all the records for a single zipcode\state\city going to a single reducer. If you are preforming calculations in reducer then you need to have all the values related keys in the same reducer.

The command below uses
1) "-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner" - to use the default hadoop partitioner class. This partitioner class outputs the key value pair seperated by tab. The reducer needs to be modified
------------------------------------------------------------

#!/usr/bin/env python
import sys
if __name__ == '__main__':
    for line in sys.stdin:
        lines = line.split('\t',1)
        print lines[0] + lines[1]

------------------------------------------------------------
 Note: A custom partitioner class can be created and used depending upon the data.
 2) "-D stream.num.map.output.key.fields=4" - to use first 4 fields of the row as keys and rest as the value, creating a key value pair.
3) "-D mapred.text.key.partitioner.options=-k1,1" - to partition on the first key , all the state records are send to same reducer. All the 4 key fields will be used for sorting.

 $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
  -D mapred.reduce.tasks=5  \
  -D mapred.job.name="weather_parse_all" \
  -D stream.map.output.field.separator=,     \
  -D stream.num.map.output.key.fields=4  \
  -D map.output.key.field.separator=, \
  -D mapred.text.key.partitioner.options=-k1,1 \
  -file   /tmp/weather_parse/weather_parse_mapper.py  \
  -mapper /tmp/weather_parse/weather_parse_mapper.py     \
  -input /user/hduser/inputxmlweather/weatherall.xml \
  -output   /user/hduser/outputweathercsv  \
  -inputreader  "StreamXmlRecordReader,begin=<WeatherDetails>,end=</WeatherDetails>" \
  -file   /tmp/weather_parse/weather_parse_reducer.py   \
  -reducer /tmp/weather_parse/weather_parse_reducer.py  \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner


Every reducer generates a unique file with format as part-00000,part-00001,part-00002 and so on.
We see how to change the output file name by using custom output format (later in this post).

---------------------------
Comparator - 
---------------------------

Sometimes a custom sorting is needed on keys. Comparator class "org.apache.hadoop.mapred.lib.KeyFieldBasedComparator" can easily help achieve it. We will sort the data based on forecast date descending and state,city,zipcode ascending.

The command below uses
-D mapred.text.key.comparator.options="-k1,3 -k4r"  to sort first 3 keys ascending and 4 key in reverse order (descending).

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
  -D mapred.reduce.tasks=5  \
  -D mapred.job.name="weather_parse_all" \
  -D stream.map.output.field.separator=,     \
  -D stream.num.map.output.key.fields=4  \
  -D map.output.key.field.separator=, \
  -D mapred.text.key.partitioner.options=-k1,1 \
  -D mapred.text.key.comparator.options="-k1,3 -k4r" \
  -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \

  -file   /tmp/weather_parse/weather_parse_mapper.py  \
  -mapper /tmp/weather_parse/weather_parse_mapper.py     \
  -input /user/hduser/inputxmlweather/weatherall.xml \
  -output   /user/hduser/outputweathercsv  \
  -inputreader  "StreamXmlRecordReader,begin=<WeatherDetails>,end=</WeatherDetails>" \
  -file   /tmp/weather_parse/weather_parse_reducer.py   \
  -reducer /tmp/weather_parse/weather_parse_reducer.py  \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner  

---------------------------
Output Format -
---------------------------

Hadoop is a batch system. It's task is to do the heavy lifting and feed the output to target systems. Output format is the process to generate the output as desired by other systems. For any target which exposes its input format a custom output format can be created. It can be only written in java.

For example, if parsing output of an XML are both master and detail records, then a custom output format can be coded to write master records and details records into separate files.

In our case, we would like to generate a file with name as .csv. 

The class below extends hadoop's  "org.apache.hadoop.mapred.lib.MultipleTextOutputFormat" class.

------------------------------------------------------------

package customoutputformat.myformat.weatherparser;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;

public class GenerateCustomFilename extends MultipleTextOutputFormat {

     @Override
     protected String generateFileNameForKeyValue(Text key, Text value, String name) {
                             String[] keys = key.toString().split(",");
                             String filename = keys[0];                 
                             filename = filename.replaceAll("\"","");      
                             return filename + ".csv";
     }

}


------------------------------------------------------------

The command below uses
1) libjars - to specify path of the custom output jar file
2) outputformat - to specify the class of the custom output format


 $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
  -libjars /tmp/weather_parse/customJobOutformat/weather_parse_outformat.jar \
  -D mapred.reduce.tasks=5  \
  -D mapred.job.name="weather_parse_all" \
  -D stream.map.output.field.separator=,     \
  -D stream.num.map.output.key.fields=4  \
  -D map.output.key.field.separator=, \
  -D mapred.text.key.partitioner.options=-k1,1 \
  -D mapred.text.key.comparator.options="-k1,3 -k4r" \
  -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -file   /tmp/weather_parse/weather_parse_mapper.py  \
  -mapper /tmp/weather_parse/weather_parse_mapper.py     \
  -input /user/hduser/inputxmlweather/weatherall.xml \
  -output   /user/hduser/outputweathercsv  \
  -inputreader  "StreamXmlRecordReader,begin=<WeatherDetails>,end=</WeatherDetails>" \
  -file   /tmp/weather_parse/weather_parse_reducer.py   \
  -reducer /tmp/weather_parse/weather_parse_reducer.py  \
  -outputformat customoutputformat.myformat.weatherparser.GenerateCustomFilename \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner


---------------------------
Compression -
---------------------------

Compression always comes at a price - more cpu cycles to compress\decompress the data.
Choosing the right compression type is important, if the output will be processed by some other mapreduce job. Compressions - Some are fast, some are slow, some save lot of space.
Choosing a compression which is splittable is most important. Hadoop can split the compressed file and send it to reducers. If a compression type is not splittable, the data will be processed (mapper) at a single node and hadoop cannot parallel process the data, all the performance is lost.

The command below uses
1) "-D mapred.output.compression.codec" - to specify the class used for compression, in this case its Bzip2 compression.

  $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
  -libjars /tmp/weather_parse/customJobOutformat/weather_parse_outformat.jar \
  -D mapred.reduce.tasks=5 \
  -D mapred.job.name="weather_parse_all" \
  -D stream.map.output.field.separator=,     \
  -D stream.num.map.output.key.fields=4  \
  -D map.output.key.field.separator=, \
  -D mapred.text.key.partitioner.options=-k1,1 \
  -D mapred.text.key.comparator.options="-k1,3 -k4r" \
  -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapred.output.compress=true \
  -D mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec \
  -file   /tmp/weather_parse/weather_parse_mapper.py  \
  -mapper /tmp/weather_parse/weather_parse_mapper.py     \
  -input /user/hduser/inputxmlweather/weatherall.xml \
  -output   /user/hduser/outputweathercsv  \
  -inputreader  "StreamXmlRecordReader,begin=<WeatherDetails>,end=</WeatherDetails>" \
  -file   /tmp/weather_parse/weather_parse_reducer.py   \
  -reducer /tmp/weather_parse/weather_parse_reducer.py  \
  -outputformat customoutputformat.myformat.weatherparser.GenerateCustomFilename \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner  

Output -



---------------------------
Conclusion -
---------------------------

There is No Silver Bullet when it comes to parsing Hierarchical data like XML in hadoop.


Wednesday, June 1, 2011

ODI - Manual Commits

--

In this post, we will look at one of the ways to manually commit data in ODI.
There are cases during data loads where you want to commit data when all your interfaces have run successfully. If anyone fails , commit should not be issued.

This is in context of a database only.

Most of the KM's provide a "COMMIT" option which can be either set to true or false.

What happens when the COMMIT option is set to false ? Does it commits the data or it does not?

Note: All the cases below have interfaces with COMMIT option set to false.

Case 1)  Executing an Interface or scenario for an Interface -

Lets a take simple example of a KM - IKM SQL Control Append applied to an Interface EMP_LOAD.


Note: TRUNCATE option is set to true.

Executing the scenario for the interface EMP_LOAD.


The COMMIT task was not executed.  The only task that were executed were Truncate and Insert.
But the data in the table was still populated.


This clearly indicates that ODI will issue COMMIT at the end of a successful session.

Case 2) Where an Interface successfully completes and other interface fails in an ODI Package - 

Here is an example of an ODI package with three interfaces

Dropped the target table for interface JOBS_LOAD so that the package fails.

Executing the package

DEPT_LOAD interface executed sucessfully and the JOBS_LOAD interface failed. The session failed.
The data was inserted into Department table by DEPT_LOAD interface but it was not committed as the session failed.


Case 3) Where all Interface successfully completes in an ODI Package -

Executing the package after fixing the Interface.

The ODI package executed successfully , no commits were issued by any of the Interface.
But looking into any of  the table, data is populated. As seen in Case 1 , ODI issues commit at end of an successful session.


Conclusion - Manual Commits can be achieved by having all the interfaces within an ODI package with COMMIT option set to false.

Few things to consider:
1) Adding scenarios of the interface to the package will not work as ODI will launch a new session for each
    scenario.
2) DDL's cannot be reversed.
3) Performance can degrade, infact manual commits should be avoided whenever possible.


--

Monday, May 23, 2011

ODI Session Restart - Scenarios & Load Plans

--


In this post we will look at ODI session restarts. Basically, how ODI behaves when you restart a session and different restart options for Load Plans.

We will consider three cases:

Case 1) Restarting a Scenario with no sub-scenarios -

I have here is a "LOAD_ALL" package which is executing three steps (interfaces using IKM SQL Control Append). I have dropped the target table so that JOBS_LOAD interface fails.
Executing the ODI package



ODI generated a session number 99045 and the interface JOBS_LOAD failed at task "Truncate target table".

ODI stores all the task (commands in your KM or procedure)execution details in the work repository table SNP_SESS_TASK_LOG


After fixing the error and restarting the session 99045. ODI restarts the session from the failed task which is the "Truncate target table" for interface JOBS_LOAD. If the interface had failed while executing the "Insert new rows" task then ODI would have restarted it from the same task.



In cases where there are no sub-scenarios inside an scenario , ODI will restart from the failed task rather than the failed step.
If you create a scenario for an ODI object(like Interface,Procedure) , restarting the failed scenario will result in same behavior.

Case 2) Restarting a Scenario with sub-scenarios -

Created a scenario for each of the three interfaces and added them in a package LOAD_ALL_SCEN.
Again before executing the package , i will drop the table to fail the interface JOBS_LOAD.



Executing the scenario for the package

ODI created session for each scenario.  The Parent session 101045 (scenario for package LOAD_ALL_SCEN ) spawns a new session (102045,103045) for each sub-scenario (interfaces). The session 103045 failed (scenario for JOBS_LOAD interface).

Fixed the error and restarted the parent session 101045.

The parent session starts from the failed step. It does not starts from the failed task.. In this case it created a new session 104045 for the JOBS_LOAD scenario.
Note: Sessions can be restarted at the failed sub-scenario(step). This should be avoided because they execute the current failed step and stop. Restarts should be done at top most parent session.

View of the SNP_SESS_TASK_LOG -


Case 3) Restarting Load Plans -

To explain the restart concept , we will take an example of simple load plans with no exception handling ( If anything fails load plans fails ).

Load plans provides different restart options.  Executing a load plan creates a session.

Load plans consists of steps (Serial,Parallel,Case,When,Else). Steps can have sub-steps and scenarios. The scenarios are the lowest entity in the Load Plan.

The first step is always a Serial step. In the example above its named as the ROOT_STEP.

If a scenario under a step fails then the step itself fails.

Restart options at the Step level of Load Plans
1)  Restart from failure -
     If the load plan is restarted then the scenario which failed will be the place where restart will initiate.
     Look  at the restart options at Scenario level.
2)  Restart all children -
     This will re-excute(new sessions) all the successfully executed scenarios, the scenario which failed will be
     restarted as per the restart option set at the scenario level.


Restart options at the Scenario level of Load Plans
Note: The "Step" discussed here is this section is a step within an ODI Package which is executed as a scenario.
1) Restart from new Session -
    The failed scenario will be re-executed with a new session.
    If the scenario has sub-scenarios as steps, a new session will be launched again for each scenario.
2) Restart from failed Step -
    If the failed scenario executes other scenarios (sub-scenarios) as steps then a new session will be launched
    for failed sub-scenario as discussed in "Case 2".
    If there is no sub-scenario as step then restart will start from the failed step.
3) Restart from failed Task -
    If the failed scenario has sub-scenario as steps then a new session will be launched for failed sub-scenario.
    If there is no sub-scenario as step then restart will start from the failed task as discussed in "Case 1" above.

Load plans provide additional benefits of restart which are not provided by packages.
A good practice is always create scenarios for each and every object.

--