Source:
Paul Graham, Revenge of the Nerds.
Jython UDFs were added to Pig in version 0.8, and are pretty stable in the current version, 0.9.2. They are highly convenient, and a major timesaver.
Using Jython UDFs is simple. Create a UDF inudfs.py
:
# Extracts the hour from an iso8601 datetime string
@outputSchema("hour:chararray")
def hour(iso_string):
tuple_time = time.strptime(iso_string, "%Y-%m-%dT%H:%M:%S")
return str(tuple_time[2])
Import and run the UDF in Pig via:
grunt> register 'udfs.py' using jython as myfuncs;
2012-02-13 16:19:33,325 [main] INFO org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: myfuncs.hour
grunt> describe times;
times: {date: chararray}
grunt> hours = foreach times generate date, myfuncs.hour(date) as hour;
grunt> dump hours
(2011-12-08T00:03:07-08:00,8)
(2011-12-08T00:08:05-08:00,8)
(2011-12-08T07:33:40+00:00,8)
(2011-12-08T08:28:32+00:00,8)
(2011-12-08T09:26:25+01:00,8)
(2011-12-08T13:02:32+05:30,8)
(2011-12-08T13:20:27+05:30,8)
(2011-12-08T13:40:44+05:30,8)
(2011-12-08T13:54:54+05:30,8)
(2011-12-08T13:59:47+05:30,8)
Jython can operate on complex types as well. Lets prepare a grouped relation, containing email subjects between pairs of email addresses, and then run it through a Jython UDF to get a word count.
The UDF computes word counts given a bag of subjects and a tuple of (from, to) pairs:
# Given from, to email address pairs and a bag of their subjects, return from, to and a bag of word counts
# Input format: {group: (from: chararray,to: chararray),subjects: {(subject: chararray)}}
@outputSchema("t:(from:chararray, to:chararray, word_counts:bag{t2:(word:chararray, total:int)})")
def word_count_subjects(group, subjects):
to = group[0]
_from = group[1]
word_counts = {}
for subject in subjects:
words = subject[0].split()
for word in words:
word_counts[word] = word_counts.get(word, 0) + 1
return to, _from, sorted(word_counts.items(), key=lambda word_count: word_count[1], reverse=True)
Import the UDF in Pig and run it against our grouped relation:
grunt> register 'udfs.py' using jython as myfuncs;
grunt> emails = limit emails 100;
grunt> emails = filter emails by (from is not null) and (to is not null);
grunt> pairs = foreach emails generate flatten(from) as from, flatten(to) as to, subject;
grunt> gft = group pairs by (from, to);
grunt> gft = foreach gft generate group, pairs.(subject) as subjects;
grunt> describe gft
gft: {group: (from: chararray,to: chararray),subjects: {(subject: chararray)}}
grunt> to_from_word_counts = foreach gft generate myfuncs.word_count_subjects(group, subjects);
grunt> store to_from_word_counts into '/tmp/jython_test.txt';
Our result:
(tim@nada1.de,user@jruby.codehaus.org,{(file,2),([jruby-user],2),(.so,2),(a,2),(Problems,2),(Re:,2),(loading,2)})
(victor@x.com,user@avro.apache.org,{(in,1),(avdl,1),(of,1),(classpath,1),(Importing,1),(RE:,1),(from,1),(project,1)})
(info@meetup.com,russell.jurney@gmail.com,{(Dave,1),(Stories,1),(comment,1),(a,1),(War,1),(posted,1),(Data,1),(Big,1),(for,1),(Nielsen,1)})
(jira@apache.org,russell.jurney@gmail.com,{([jira],8),(in,8),(a,8),(bag,6),(fails,6),(Avro,4),(as,4),(of,4),(STORE,4),(single-field,4),((PIG-2411),4),(tuples,4),(to,4),(UDF,4),(AvroStorage,4),(PiggyBank,4),(when,4),(arrays,4),([Updated],3),([Commented],3),(UDF),2),(used,2),(SQL,2),(with,2),(java.lang.NullPointerException,2),(interface,2),(Util.getSchemaFromString,2),([Created],2),(MongoStorage,2),(for,2),(name,2),(tuple,2),((as,2),(Pig,2),((PIG-824),2),(no,2),(has,2),((PIG-2509),2)})
Once you get the hang of processing tuples in Jython, any UDF is a breeze!