Kafka or Flume?

A question that keeps popping up is “Should we use Kafka or Flume to load data to Hadoop clusters?”

This question implies that Kafka and Flume are interchangeable components. It makes as much sense to me as “Should we use cars or umbrellas?”. Sure, you can hide from the rain in your car and you can use your umbrella when moving from place to place. But in general, these are different tools intended for different use-cases.

Flume’s main use-case is to ingest data into Hadoop. It is tightly integrated with Hadoop’s monitoring system, file system, file formats, and utilities such a Morphlines. A lot of the Flume development effort goes into maintaining compatibility with Hadoop. Sure, Flume’s design of sources, sinks and channels mean that it can be used to move data between other systems flexibly, but the important feature is its Hadoop integration.

Kafka’s main use-case is a distributed publish-subscribe messaging system. Most of the development effort is involved with allowing subscribers to read exactly the messages they are interested in, and in making sure the distributed system is scalable and reliable under many different conditions. It was not written to stream data specifically for Hadoop, and using it to read and write data to Hadoop is significantly more challenging than it is in Flume.

To summarize:
Use Flume if you have an non-relational data sources such as log files that you want to stream into Hadoop.
Use Kafka if you need a highly reliable and scalable enterprise messaging system to connect many multiple systems, one of which is Hadoop.


Quick Tip on Using Sqoop Action in Oozie

Another Oozie tip blog post.

If you try to use Sqoop action in Oozie, you know you can use the “command” format, with the entire Sqoop configuration in a single line:

<pre><workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="myfirsthivejob">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-traker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <command>import  --connect jdbc:hsqldb:file:db.hsqldb --table TT --target-dir hdfs://localhost:8020/user/tucu/foo -m 1</command>
        </sqoop>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>

This is convenient, but can be difficult to read and maintain. I prefer using the “arg” syntax, with each argument in its own line:

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="myfirsthivejob">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-traker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <arg>import</arg>
            <arg>--connect</arg>
            <arg>jdbc:hsqldb:file:db.hsqldb</arg>
            <arg>--table</arg>
            <arg>TT</arg>
            <arg>--target-dir</arg>
            <arg>hdfs://localhost:8020/user/tucu/foo</arg>
            <arg>-m</arg>
            <arg>1</arg>
        </sqoop>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>

As you can see, each argument here is in its own “arg” tag. Even two arguments that belong together like “–table” and “TT” go in two separate tags.
If you’ll try to put them together for readability, as I did, Sqoop will throw its entire user manual at you. It took me a while to figure out why this is an issue.

When you call Oozie from the command line, all the arguments you pass are sent as a String[] array, and the spaces separate the arguments into array elements. So if you call Sqoop with “–table TT” it will be two elements, “–table” and “TT”.
When using “arg” tags in Oozie, you are basically generating the same array in XML. Oozie will turn the XML argument list into an array and pass it to Sqoop just the way it would in the command line. Then Sqoop parses it in exactly the same way.
So every item separated with space in the command line must be in separate tags in Oozie.

Its simple and logical once you figure out why 🙂
If you want to dig a bit more into how Sqoop parses its arguments, it is using Apache Commons CLI with GnuParser. You can read all about it.


Why Oozie?

Thats a really frequently asked question. Oozie is a workflow manager and scheduler. Most companies already have a workflow schedulers – Activebatch, Autosys, UC4, HP Orchestration. These workflow schedulers run jobs on all their existing databases – Oracle, Netezza, MySQL. Why does Hadoop need its own special workflow scheduler?

As usual, it depends. In general, you can keep using any workflow scheduler that works for you. No need to change, really.
However, Oozie does have some benefits that are worth considering:

  1. Oozie is designed to scale in a Hadoop cluster. Each job will be launched from a different datanode. This means that the workflow load will be balanced and no single machine will become overburdened by launching workflows. This also means that the capacity to launch workflows will grow as the cluster grows.
  2. Oozie is well integrated with Hadoop security. This is especially important in a kerberized cluster. Oozie knows which user submitted the job and will launch all actions as that user, with the proper privileges. It will handle all the authentication details for the user as well.
  3. Oozie is the only workflow manager with built-in Hadoop actions, making workflow development, maintenance and troubleshooting easier.
  4. Oozie UI makes it easier to drill down to specific errors in the data nodes. Other systems would require significantly more work to correlate jobtracker jobs with the workflow actions.
  5. Oozie is proven to scale in some of the world’s largest clusters. The white paper discusses a deployment at Yahoo! that can handle 1250 job submissions a minute.
  6. Oozie gets callbacks from MapReduce jobs so it knows when they finish and whether they hang without expensive polling. No other workflow manager can do this.
  7. Oozie Coordinator allows triggering actions when files arrive at HDFS. This will be challenging to implement anywhere else.
  8. Oozie is supported by Hadoop vendors. If there is ever an issue with how the workflow manager integrates with Hadoop – you can turn to the people who wrote the code for answers.

So, should you use Oozie? If you find these benefits compelling, then yes. Step out of your comfort zone and learn another new tool. It will be worth it.


Parameterizing Hive Actions in Oozie Workflows

Very common request I get from my customers is to parameterize the query executed by a Hive action in their Oozie workflow.
For example, the dates used in the query depend on a result of a previous action. Or maybe they depend on something completely external to the system – the operator just decides to run the workflow on specific dates.

There are many ways to do this, including using EL expressions, capturing output from shell action or java action.
Here’s an example of how to pass the parameters through the command line. This assumes that whoever triggers the workflow (Human or an external system) has the correct value and just needs to pass it to the workflow so it will be used by the query.

Here’s what the query looks like:

insert into test select * from test2 where dt=${MYDATE}

MYDATE is the parameter that allows me to run the query on a different date each time. When running this query in hive, I’d use something like “set MYDATE=’2011-10-10′” before running the query. But when I run it from Oozie, I need to pass the parameter to the Hive action.

Lets assume I saved the query in a file hive1.hql. Here’s what the Oozie workflow would look like:

<workflow-app name="cmd-param-demo" xmlns="uri:oozie:workflow:0.4">
	<start to="hive-demo"/>
	<action name="hive-demo">
		<hive xmlns="uri:oozie:hive-action:0.2">
			<job-tracker>${jobTracker}</job-tracker>
			<name-node>${nameNode}</name-node>
			<job-xml>${hiveSiteXML}</job-xml>
			<script>${dbScripts}/hive1.hql</script>
			<param>MYDATE=${MYDATE}</param>
		</hive>
		<ok to="end"/>
		<error to="kill"/>
	</action>
	<kill name="kill">
		<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
	</kill>
	<end name="end"/>
</workflow-app>

The important line is “MYDATE=${MYDATE}”. Here I translate an Oozie parameter to a parameter that will be used by the Hive script. Don’t forget to copy hive-site.xml and hive1.hql to HDFS! Oozie actions can run on any datanode and will not read files on local file system.

And here’s how you call Oozie with the commandline parameter:
oozie job -oozie http://myserver:11000/oozie -config ~/workflow/job.properties -run -verbose -DMYDATE=’2013-11-15′

Thats it!


Using Oozie in Kerberized Cluster

In general, most Hadoop ecosystem tools work rather transparently in a kerberized cluster. Most of the time things “just work”. This includes Oozie. Still, when things don’t “just work”, they tend to fail with slightly alarming and highly ambiguous error messages. Here are few tips for using Oozie when your Hadoop cluster is kerberized. Note that this is a client/user guide. I assume you already followed the documentation on how to configure the Oozie server in the kerberized cluster (or you are using Cloudera Manager, which magically configures it for you).

    1. As always, use “kinit” to authenticate with Kerberos and get your tgt before trying to run oozie commands. Verify with klist. Failure to do this will result in “No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)”
    2. I always enable security debug messages. This helps me troubleshoot, and also helps when I need to ask support/mailing list for help.
      export HADOOP_ROOT_LOGGER=TRACE,console;
      export HADOOP_JAAS_DEBUG=true;
      export HADOOP_OPTS="-Dsun.security.krb5.debug=true"
    3. Your Oozie command typically contains a URL. Something like “oozie -url http://myserver:11000/oozie -conf job.properties -run” The server name in the URL must match an existing principal name in Kerberos. If your principals are actually “myserver.mydomain.com” make sure you use that in the URL.
    4. If you decide to use CURL to connect to your Oozie server, either for troubleshooting or for using the REST API, don’t forget to use “–negotiate -u foo:bar”. The actual username and password don’t matter (you are authenticating with your Kerberos ticket), but CURL throws a fit if they don’t exist.
    5. If you have Hive action in your Oozie workflow, you need to define and use credentials. Here’s an example:
      <workflow-app xmlns="uri:oozie:workflow:0.2.5" name="example-wf">
              <credentials>
                      <credential name='hive_credentials' type='hcat'>
                              <property>
                                  <name>hcat.metastore.uri</name>
                                  <value>thrift://metastore_server:9083</value>
                              </property>
                              <property>
                                  <name>hcat.metastore.principal</name>
                                  <value>hive/_HOST@KERBDOM.COM</value>
                              </property>
                      </credential>
              </credentials>
      <start to="hive-example"/>
      <action name="hive-example" cred="hive_credentials">
              <hive xmlns="uri:oozie:hive-action:0.2">
                      <job-tracker>${jobTracker}</job-tracker>
                      <name-node>${nameNode}</name-node>
                      <job-xml>${hiveSiteXML}</job-xml>
                      <script>${dbScripts}/hive-example.hql</script>
              </hive>
              <ok to="end"/>
              <error to="fail"/>
      </action>
      <kill name="fail">
              <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
      </kill>
      <end name="end"/>
      </workflow-app>
      
    6. To make step #5 actually work (i.e. allow Oozie to run Hive actions), you will also need to do the following:
      In CM:
    7. go to “HDFS Service->Configuration->Service-Wide->Advanced->Cluster-wide Configuration Safety Valve for core-site.xml” and add:

      <property>
      <name>hadoop.proxyuser.oozie.hosts</name>
      <value>*</value>
      </property>
      <property>
      <name>hadoop.proxyuser.oozie.groups</name>
      <value>*</value>
      </property>
      

      – go to “Oozie service->Configuration->Oozie Server(default)->Advanced-> Oozie Server Configuration Safety Valve for oozie-site.xml” and add:

      <property>
      <name>oozie.credentials.credentialclasses</name>
      <value>hcat=org.apache.oozie.action.hadoop.HCatCredentials</value>
      </property>
      

      – Deploy client configuration and restart Hive service and Oozie service.

    8. Oozie doesn’t kinit a user for you on the node its launching the action on, and it doesn’t move principles and tickets around. Instead it uses delegation tokens. If you want to authenticate to Hadoop inside a shell or java action, you’ll need to use the same tokens.

      In a shell action, it will be something like:

      if [ -z ${HADOOP_TOKEN_FILE_LOCATION} ]
      then
      	hive -e "select x from test" -S
      else
      	hive -e "SET mapreduce.job.credentials.binary=$HADOOP_TOKEN_FILE_LOCATION; select x from test" -S
      fi
      

      In Java it will be:

      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
                  jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
      }
      

    Hope this helps! Feel free to comment with questions, especially if you ran into errors that I did not address. I’ll be happy to add more tips to the list.


Hadoop Summit 2013 – The Schedule

90% of winning is planning. I learned this as a kid from my dad, and I validated this through many years of work in operations. This applies to everything in life, including conferences.

So in order to maximize fun, networking and learning in Hadoop Summit, I’m planning my schedule in advance. Even if only few hours in advance. Its the thought that counts.

In addition to social activities such as catching up with my former colleagues from Pythian, dining with my distributed solutions architecture team in Cloudera and participating in the Hadoop Summit bike ride, I’m also planning to attend few sessions.

There are tons of good sessions at the conference, and it was difficult to pick. It is also very possible that I’ll change my plans in the last minute based on recommendations from other attendees. For the benefit of those who would like soume recommendations, or to catch up with me at the conference, here’s where you can find me:

Wednesday:

11:20am: Securing the Hadoop Ecosystem – Security is important, but I’ll admit that I’m only attending this session because I’m a fan of ATM. Don’t judge.

12am: LinkedIn Member Segmentation Platform: A Big Data Application – LinkedIn are integrating Hive and Pig with Teradata. Just the type of use case I’m interested in, from my favorite social media company.

2:05pm: How One Company Offloaded Data Warehouse ETL To Hadoop and Saved $30 Million – I’m a huge believer in offloading ETL to Hadoop and I know companies who saved big bucks that way. But $30M is more than even I’d expect, so I have to hear this story.

2:55pm: HDFS – What is New and Future – Everything Hadoop relies on HDFS, so keeping updated with new features is critical for Hadoop professionals. This should be a packed room.

4:05pm: Parquet: Columnar storage for the People – Parquet is a columnar data store for Hadoop. I’m interesting to learn more about Parquet as it should enable smoother transition of data warehouse workloads to Hadoop.

Thursday:

11:50pm: Mahout and Scalable Natural Language Processing – This session is promising so much data science content in one hour, that I’m a bit worried that my expectations are too high. Hope it doesn’t disappoint.

1:40pm:  Hadoop Hardware @Twitter: Size does matter  – I’m expecting operations level talk about sizing of Hadoop clusters. Something nearly every company adopting Hadoop is struggling with.

2:30pm: Video Analysis in Hadoop – A Case Study – My former colleagues at Pythian are presenting a unique Hadoop use-case they developed. I have to be there.

3:35pm: Large scale near real-time log indexing with Flume and SolrCloud – I’m encountering this type of architecture quite often now. Will be interesting to hear how Cisco are doing it.

5:15pm: Building a geospatial processing pipeline using Hadoop and HBase and how Monsanto is using it to help farmers increase their yield – Using Hadoop to analyze large amounts of geospatial data and help farmers. Sounds interesting. Also, Robert is a customer and a very sharp software architect, so worth attending.

Expect me to tweet every 5 minutes from one of the sessions. Its my way of taking notes and sharing knowledge.  If you are at the conference and want to get together, ping me through here or twitter. Also ping me if you think I’m missing a not-to-be-missed session.

See you at Hadoop Summit!