Jython UDFs in Pig

bottom-img
“the more powerful the language, the shorter the program”


Jython UDFs were added to Pig in version 0.8, and are pretty stable in the current version, 0.9.2. They are highly convenient, and a major timesaver.

Using Jython UDFs is simple. Create a UDF in udfs.py:
# Extracts the hour from an iso8601 datetime string
@outputSchema("hour:chararray")
def hour(iso_string):
  tuple_time = time.strptime(iso_string, "%Y-%m-%dT%H:%M:%S")
  return str(tuple_time[2])
Import and run the UDF in Pig via:
grunt> register 'udfs.py' using jython as myfuncs;

2012-02-13 16:19:33,325 [main] INFO  org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: myfuncs.hour

grunt> describe times;                                                        
times: {date: chararray}
grunt> hours = foreach times generate date, myfuncs.hour(date) as hour;
grunt> dump hours

(2011-12-08T00:03:07-08:00,8)
(2011-12-08T00:08:05-08:00,8)
(2011-12-08T07:33:40+00:00,8)
(2011-12-08T08:28:32+00:00,8)
(2011-12-08T09:26:25+01:00,8)
(2011-12-08T13:02:32+05:30,8)
(2011-12-08T13:20:27+05:30,8)
(2011-12-08T13:40:44+05:30,8)
(2011-12-08T13:54:54+05:30,8)
(2011-12-08T13:59:47+05:30,8)

Jython can operate on complex types as well. Lets prepare a grouped relation, containing email subjects between pairs of email addresses, and then run it through a Jython UDF to get a word count.

The UDF computes word counts given a bag of subjects and a tuple of (from, to) pairs:

# Given from, to email address pairs and a bag of their subjects, return from, to and a bag of word counts
# Input format: {group: (from: chararray,to: chararray),subjects: {(subject: chararray)}}
@outputSchema("t:(from:chararray, to:chararray, word_counts:bag{t2:(word:chararray, total:int)})")
def word_count_subjects(group, subjects):
  to = group[0]
  _from = group[1]
  word_counts = {}
  for subject in subjects:
    words = subject[0].split()
    for word in words:
      word_counts[word] = word_counts.get(word, 0) + 1
  return to, _from, sorted(word_counts.items(), key=lambda word_count: word_count[1], reverse=True)

Import the UDF in Pig and run it against our grouped relation:

grunt> register 'udfs.py' using jython as myfuncs;
grunt> emails = limit emails 100;
grunt> emails = filter emails by (from is not null) and (to is not null);
grunt> pairs = foreach emails generate flatten(from) as from, flatten(to) as to, subject;
grunt> gft = group pairs by (from, to);
grunt> gft = foreach gft generate group, pairs.(subject) as subjects;
grunt> describe gft

gft: {group: (from: chararray,to: chararray),subjects: {(subject: chararray)}}
grunt> to_from_word_counts = foreach gft generate myfuncs.word_count_subjects(group, subjects); grunt> store to_from_word_counts into '/tmp/jython_test.txt';
Our result:
(tim@nada1.de,user@jruby.codehaus.org,{(file,2),([jruby-user],2),(.so,2),(a,2),(Problems,2),(Re:,2),(loading,2)})
(victor@x.com,user@avro.apache.org,{(in,1),(avdl,1),(of,1),(classpath,1),(Importing,1),(RE:,1),(from,1),(project,1)})
(info@meetup.com,russell.jurney@gmail.com,{(Dave,1),(Stories,1),(comment,1),(a,1),(War,1),(posted,1),(Data,1),(Big,1),(for,1),(Nielsen,1)})
(jira@apache.org,russell.jurney@gmail.com,{([jira],8),(in,8),(a,8),(bag,6),(fails,6),(Avro,4),(as,4),(of,4),(STORE,4),(single-field,4),((PIG-2411),4),(tuples,4),(to,4),(UDF,4),(AvroStorage,4),(PiggyBank,4),(when,4),(arrays,4),([Updated],3),([Commented],3),(UDF),2),(used,2),(SQL,2),(with,2),(java.lang.NullPointerException,2),(interface,2),(Util.getSchemaFromString,2),([Created],2),(MongoStorage,2),(for,2),(name,2),(tuple,2),((as,2),(Pig,2),((PIG-824),2),(no,2),(has,2),((PIG-2509),2)})
Once you get the hang of processing tuples in Jython, any UDF is a breeze!