Booting the Analytics Application: Events -> Ruby -> Avro -> Pig -> Voldemort -> Sinatra -> Web Browser -> User

bottom-img

Complete working code for this example is available at https://github.com/rjurney/Booting-the-Analytics-Application

The first step to building analytics applications with Hadoop is to plumb your application from end to end: from collecting raw data to displaying something on the users’ screen. This is important, because models can get complex fast, and you need user feedback plugged into the equation from the start.

Collecting data in Avro is convenient, because a simple JSON schema is included with each pile of records. Apache Avro is a data serialization system with rich formats, simple integration with dynamic languages and support in Apache Pig.

Using Avro with Ruby and Pig is easy:

Install the avro ruby gem. Works in JRuby too.

gem install avro

Get and build Pig 0.9.1 and then piggybank (takes a while)

cd
wget http://mirror.olnevhost.net/pub/apache//pig/pig-0.9.1/pig-0.9.1.tar.gz
tar -xvzf pig-0.9.1.tar.gz
cd pig-0.9.1
ant
cd contrib/piggybank/java
ant
cd ../../..

Decide on a schema for your records and create some avro records in your application.

require 'rubygems'
require 'avro'


SCHEMA = <<-JSON
{ "type": "record",
  "name": "Email",
  "fields" : [
    {"name": "message_id", "type": "int"},
    {"name": "topic", "type": "string"},
    {"name": "user_id", "type": "int"}
  ]}
JSON


file = File.open('~/tmp/messages.avro', 'wb')
schema = Avro::Schema.parse(SCHEMA)
writer = Avro::IO::DatumWriter.new(schema)
dw = Avro::DataFile::Writer.new(file, writer, schema)
dw << {"message_id" => 11, "topic" => "Hello World", "user_id" => 1}
dw << {"message_id" => 12, "topic" => "Jim is silly!", "user_id" => 1}
dw << {"message_id" => 13, "topic" => "I like apples.", "user_id" => 2}
dw << {"message_id" => 24, "topic" => "Round the world...", "user_id" => 2}
dw << {"message_id" => 35, "topic" => "How do I sent a message?", "user_id" => 45}
dw.close

To setup your environment, add these lines to .bash_profile:

pig_version=0.9.1
export CLASSPATH=$CLASSPATH:~/pig-${pig_version}/build/ivy/lib/Pig/avro-1.4.1.jar\
:~/pig-${pig_version}/build/ivy/lib/Pig/json-simple-1.1.jar\
:~/pig-${pig_version}/contrib/piggybank/java/piggybank.jar\
:~/pig-${pig_version}/build/ivy/lib/Pig/jackson-core-asl-1.6.0.jar\
:~/pig-${pig_version}/build/ivy/lib/Pig/jackson-mapper-asl-1.6.0.jar
And re-run it via:
source ~/.bash_profile

This script via ‘bin/pig -l /tmp -x local’ will get you working with your records:

REGISTER ./build/ivy/lib/Pig/avro-1.4.1.jar
REGISTER ./build/ivy/lib/Pig/json-simple-1.1.jar
REGISTER  contrib/piggybank/java/piggybank.jar
REGISTER ./build/ivy/lib/Pig/jackson-core-asl-1.6.0.jar
REGISTER ./build/ivy/lib/Pig/jackson-mapper-asl-1.6.0.jar


DEFINE AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();


messages = LOAD '/tmp/messages.avro' USING AvroStorage();

DESCRIBE lists the schema of your data - if it is known. When you’re winging it, sometimes you might not have a schema. Remember: 'Pigs eat anything,’ so no schema required! In practice, you’ll usually want schemas for application development.

grunt> DESCRIBE messages

avros: {message_id: int,topic: chararray}

DUMP is simple, but be careful: 'big data’ across your screen is not always useful, and you’ll have to ctrl-c to stop it, which ends your grunt session.
grunt> DUMP messages
(11,Hello World,1)
(12,Jim is silly!,1)
(13,I like apples.,2)
(24,Round the world...,2)
(35,How do I sent a message?,45)
In practice, you’ll often want to SAMPLE/LIMIT/DUMP
grunt> A = SAMPLE messages 0.5;
grunt> B = LIMIT messages 2;
grunt> DUMP B
(11,Hello World,1)
(12,Jim is silly!,1)

Or better yet… ILLUSTRATE! I <3 ILLUSTRATE.


grunt> ILLUSTRATE messages
-----------------------------------------------------
| avros     | message_id:int   | topic:chararray    | 
-----------------------------------------------------
|           | 12               | Jim is silly!     | 
-----------------------------------------------------

ILLUSTRATE is useful because it generates sample data for your pig script immediately, without waiting on a hadoop batch job. Debugging code while waiting minutes or hours on Hadoop jobs during test runs will tank productivity fast. ILLUSTRATE’s sample data ’illustrates the semantics of the operators while keeping the example data small.’ This makes Pig very powerful.

Lets try some Pig operations on our data, to see what I’m talking about. Suppose we want to look at messages per user, but only active users… say those that have posted more than one message:


grunt> user_groups = GROUP messages by user_id;
grunt> per_user = FOREACH user_groups GENERATE group AS user_id, 
       SIZE($1) AS message_count;
grunt> gt_one = FILTER per_user BY message_count > 1;
grunt> active_users = FOREACH gt_one GENERATE user_id;

grunt> DESCRIBE active_users
active_users: {user_id: int}
Now lets employ ILLUSTRATE to see what happened as records flowed through our simple data pipeline:
grunt> ILLUSTRATE active_users
-----------------------------------------------------------------------------
| messages     | message_id:int     | topic:chararray     | user_id:int     | 
-----------------------------------------------------------------------------
|              | 24                 | Round the world...  | 2               | 
|              | 13                 | I like apples.      | 2               | 
-----------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------
| user_groups     | group:int     | messages:bag{:tuple(message_id:int,topic:chararray,user_id:int)}                     | 
--------------------------------------------------------------------------------------------------------------------------
|                 | 2             | {(24, Round the world..., 2), (13, I like apples., 2)}                               | 
--------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------
| per_user     | user_id:int     | message_count:long     | 
-----------------------------------------------------------
|              | 2               | 2                      | 
-----------------------------------------------------------
---------------------------------------------------------
| gt_one     | user_id:int     | message_count:long     | 
---------------------------------------------------------
|            | 2               | 2                      | 
---------------------------------------------------------
--------------------------------------
| active_users     | user_id:int     | 
--------------------------------------
|                  | 2               | 
--------------------------------------

Note how we can see the changes happening to the data as it flows through our pig script, without waiting on Hadoop to execute it. Note also, that the answer ILLUSTRATE gives is not definitive, but is there to guide us as we write code: our DUMP has two records in active_users, not one.

grunt> DUMP active_users
(1)
(2)

Now lets try creating something we’d like to actually consume in a web app, via a key/value store. Say we want to access all messages per user, sorted newest to oldest. Lets create a suitable key and create these records:

grunt> user_groups = GROUP messages by user_id;
per_user = FOREACH user_groups {                
    sorted = ORDER messages BY message_id DESC;     
    GENERATE CONCAT('messages_per_user_id:', (chararray)group) AS user_key, sorted.$0 AS messages;
}
grunt> DESCRIBE per_user
per_user: {user_key: chararray,messages: {(message_id: int)}}

Note: There was a six hour detour at this point to fix a bug in the AvroStorage UDF for persisting these kinds of records. JIRA created and patch submitted here: https://issues.apache.org/jira/browse/PIG-2411. This kind of detour is common when you first setup an app, and you just have to bare the Java and fight through it.

grunt> STORE per_user INTO '/tmp/per_user.avro' USING AvroStorage();
It works! Note how ILLUSTRATE has become more useful, as it displays the result of our casting the user_id from integer to chararray, and concatenating it with a record identifier to create a voldemort key that allows sharing the store with other record types:
ILLUSTRATE per_user

...

--------------------------------------------------------------------------------------------------------------------------------
| per_user     | user_key:chararray     | messages:bag{:tuple(message_id:int,topic:chararray,user_id:int)}                     | 
--------------------------------------------------------------------------------------------------------------------------------
|              | messages_per_user_id:1 | {(12, Jim is silly!, 1), (11, Hello World, 1)}                                       | 
--------------------------------------------------------------------------------------------------------------------------------

This works for EvalFunc UDFs (User Defined Functions) too, which is really helpful.

Our processed data is ready to go. How do we publish it to a key/value store for consumption?

With a few records, we care about ease of use. As our data grows, we care about scaling. Project Voldemort is distributed key/value storage system which is easy to use and which scales well too. Voldemort supports building read-only stores on Hadoop and loading them directly from the hadoop cluster to the voldemort cluster.

Initially, we will write individual records to a Voldemort BDB store. As our data grows, we can build and push entire read-only stores from hadoop to an operational cluster with a few commands.

Download and unpack voldemort:

cd
wget --no-check-certificate https://github.com/downloads/voldemort/voldemort/voldemort-0.90.1.tar.gz
tar -xvzf voldemort-0.90.1.tar.gz
cd voldemort-0.90.1

Follow the instructions here to start voldemort:

bin/voldemort-server.sh config/single_node_cluster > /tmp/voldemort.log &

Install the voldemort-rb gem. Works with JRuby too:

gem install nokogiri
gem install ruby_protobuf
gem install voldemort-rb

Now in ruby we connect to our voldemort cluster, loop through our avro records and insert a new entry for each:


require 'rubygems'
require 'avro'
require 'voldemort-rb'
require 'json'

client = VoldemortClient.new("test", "localhost:6666")

file = File.open('/tmp/per_user.avro/part-r-00000.avro', 'r+')
dr = Avro::DataFile::Reader.new(file, Avro::IO::DatumReader.new)
dr.each do |record| 
  client.put record["user_key"], JSON(record["messages"])
end

Finally… we want to see our data in the browser. A one-page Sinatra web app gets us there. Install Sinatra:

gem install sinatra
Our sinatra app looks like:
require 'rubygems'
require 'sinatra'
require 'avro'
require 'voldemort-rb'
require 'json'

# connect to voldemort
client = VoldemortClient.new("test", "localhost:6666")

get '/messages_per_user_id/:user_id' do |user_id|
  client.get "messages_per_user_id:#{user_id}"
end

We can see our plumbed data at http://localhost:4567/messages_per_user_id/1 or http://localhost:4567/messages_per_user_id/2

The data at http://localhost:4567/messages_per_user_id/1 looks like this:

[{"message_id":12},{"message_id":11}]

Note that we haven’t touched Hadoop yet. :) It is simply not neccessary to do so, in order to get started using the tools. We’ve made platform choices that will let us aggregate and mine data at scale, and we’ve gotten them up and running in a few minutes on our local machines.

In future posts, we’ll extend this to work at scale on Amazon Web Services and Heroku.