-
Notifications
You must be signed in to change notification settings - Fork 37
Home
Welcome to PyCascading! PyCascading is a light wrapper aroung Cascading that lets you write Python code suitable for MapReduce-like execution. Data may reside on any backend that Cascading supports, most importantly Hadoop and the local filesystem. The workflow is defined in Python, and functions that operate on the data are also defined in Python. PyCascading is open source, and I'd like to thank Twitter for its support to me in this.
The equivalent of the "Hello, world!" program in MapReduce is counting the words in a text file, and this is how it looks like in PyCascading:
from pycascading.helpers import *
def main():
flow = Flow()
input = flow.source(Hfs(TextLine(), 'input_file.txt'))
output = flow.sink(Hfs(TextDelimited(), 'output_folder'))
@udf
def split_words(tuple):
for word in tuple.get(1).split():
yield [word]
input | map_replace(split_words, 'word') | group_by('word', native.count()) | output
flow.run(num_reducers=10)
PyCascading works with data flows by applying various operations on the flow. The operations can broadly be characterized into three classes. Map-like operations apply a Python function on each record of data stream, join-like operations perform a joining of two or more streams a'la SQL JOIN, and group-like operations apply aggregations on groups of data having the same value in a given field (in SQL, GROUP BY).
Since PyCascading is built on Cascading, it is very well worth knowing how Cascading works. The user guide covers everything. You can get started without reading the full Cascading guide, but if you are new to Cascading, at the very least read the chapter on (data processing)[http://www.cascading.org/1.2/userguide/htmlsingle/#N20106], with particular attention to pipes, field algebra, and sources and sinks. It's interesting!
PyCascading needs to be built before it can be used (it's not too hard, though). The requirements for building the 'master' branch are:
- Cascading 2.0 or Cascading 1.2.*
- Jython 2.5.2+
- Hadoop 0.20.2+, the version you build with preferably matching the Hadoop runtime
- A Java JDK (PyCascading is developed with Sun's version)
- Ant
After cloning the repo, edit the java/dependencies.properties
file
to point the variables where the Cascading, Jython, and Hadoop
frameworks were downloaded to. Note that only Jython needs to be
"installed", the Cascading and Hadoop archives can simply be extracted
somewhere.
After this, still in the java
folder, invoke ant
. If the Java JDK
is all right this should result in a successful build within a few
seconds. The result is that two archive files created in the build
directory. One of them is a Java jar (pycascading.jar
) with the
PyCascading, Cascading, and Jython classes and jars needed for running
a script. The other is a tarball (build/pycascading.tgz
) with the
PyCascading sources and other Python system libraries.
PyCascading scripts can be run in remote or local mode. The two modes of execution are described below.
In remote mode the PyCascading scripts are run on a Hadoop cluster that the user has SSH access to. In general, a Hadoop job is a jar file that needs to be submitted to Hadoop for execution. In the case of PyCascading, this jar contains the PyCascading and Cascading classes, and even if we make changes to the Python scripts, this jar file does not change. So we can create the jar once, copy it to the server, and just use it locally on the server whenever we submit a new job.
The remote_deploy.sh
script, included with PyCascading, can copy
this master jar to the user's home directory on the Hadoop
server. It is also used to copy the PyCascading scripts to the server
to be run, after you edited them on your computer.
After building PyCascading, deploy the master jar to the Hadoop server like this:
remote_deploy.sh -m -s <user@hadoop_server>
user@hadoop_server
is the SSH account for the Hadoop server. The
-m
option specifies that the master archives should be copied to the
server in this case. They are approximately 12MB, so it shouldn't take
much time for this to complete. The master archives will be
stored in ~/pycascading/master
by default, but this--and other
default options (Hadoop server, SSH keys, etc.)--are defined in the
beginning of the remote_deploy.sh
script, and may be changed there.
Once the master archives are on the Hadoop server, the deployment of PyCascading scripts will be very fast and use very little bandwith.
To run a PyCascading script, we first invoke
remote_deploy.sh -s <user@hadoop_server> <script.py>
to copy the sources to the server, and to set up a few helper
scripts. If all went well, there will be a temporary folder created
for the script in ~/pycascading/deploys
by default (this location
can be changed in remote_deploy.sh
), and you will also see the path
to the run.sh
shell script that was generated on the server. This
shell script can be run to submit the PyCascading job to Hadoop.
Note that you may pass in command line parameters for the script, and
they can be read in the usual way from sys.argv
.
After this you can SSH into the server, and start the run.sh
script
to invoke Hadoop with the PyCascading framework. I use screen
to run
the jobs, so even if the network connection goes down, the job won't
get killed. run.sh
also assumes that the hadoop
command is on the
search path, so if it isn't, it is a good practice to set PATH
where
hadoop
will be found.
For testing, an alternative to using a full-blown Hadoop cluster is to run Hadoop in pseudo-distributed mode on your laptop/workstation. Performance is not great, as jobs won't benefit from parallel processing, but for quick tests and smallish files it's OK. For more information on setting up Hadoop in pseudo-distributed mode, see this.
Local mode means that we are not using HDFS at all, and all source and
sink taps are bound to local files. By default, if you do not specify
file://
or hdfs://
prefixes for your taps, the files/folders will
be searched on the default file system, which in the case of local
execution is the local file system. I tend not to use any prefixes
when specifying input/output paths, so the PyCascading script can be
run without changes either in local or remote mode.
Local mode is great for experimentation and testing scripts quickly,
as execution is very fast compared to the time it takes to set up
proper MR jobs on a Hadoop cluster. In particular, if you are trying
out the PyCascaading examples, local mode is the best way to see
results in the shortest time. Scripts can be run in local mode
local_run.sh
, like this:
local_run.sh examples/word_count.py
PyCascading scripts are simple Python scripts that use the PyCascading libraries, and are executed with Hadoop. The script's purpose is two-fold: 1) it creates the workflow for the Cascading job; 2) it defines all the Python functions that will be executed by the mappers and reducers as filters, aggregators, etc.
They won't run directly with python
, as you would expect from normal
Python scripts, but are instead executed through remote_deploy.sh
or
local_run.sh
. One reason for this is that we need to do some setup
and bookkeeping in the beginning before the script can start. The
other reason is that PyCascading scripts contain the definitions of
functions applied to the data, so the source is imported by the
map/reduce processes in the Hadoop cluster. Thus they cannot contain
anything substantial in their main body (outside of function and class
definitions), otherwise execution would start there every time.
In particular, the Cascading flow is created and executed in the
main()
function of the script. This way when the mappers/reducers
import the script source, they won't accidentally try to build and run
the flow again. But these are the technical details, it suffices if
you know that building the flow should happen in main()
function. When we finished building the flow, we submit it to the
Cascading executor with flow.run()
.
The way Cascading works is with flows, where we choose some source files as input, apply a series of atomic operations operations on them in sequence, and write the resulting output in some other HDFS files. This process can be best described as building data processing pipelines, as data is flowing through these pipes from sources to sinks, and along the way it's modified, and pipes are split and merged based on some rules.
We will use the word count example above to see how we construct pipelines from basic operations, and then take a look at what kinds of operations we have available in detail a bit later.
First, we need to import everything from the pycascading.helpers
module. This is the easiest way to have everything available for the
script without having to use module names. pycascading.helpers
also
imports frequently used Cascading classes such as Fields
, Tuple
,
and TupleEntry
, as well as the primitive types from java.lang
(Integer
etc.). These are useful to have handy when reading/writing
HDFS files.
As mentioned, PyCascading executes the main()
function of your
script, and thus it's there where we define and run the Cascading
flow. The first statement of main()
creates a Flow
object, which
corresponds to a Cascading flow. In a flow data comes from sources,
goes through various pipe operations, and the results are finally
written into one or more sinks. Sources and sinks are bound to flows,
and they are created next with
input = flow.source(Hfs(TextLine(), 'input_file.txt'))
output = flow.sink(Hfs(TextDelimited(), 'output_folder'))
Flow.source
and Flow.sink
both take Cascading taps as arguments,
which in this case we specify using
Hfs
. Hfs
is a standard Cascading Java class, and we pass in the two required
parameters to it, a Scheme
describing the formatting of the data,
and the location of the files (input_file.txt
is a normal file, and
output_folder
is the name where the part-*
files will be generated
by the reducers). Note that we didn't specify resource locators in the
file names: if we run the script with remote_deploy.sh
, they default
to HDFS, and if we run it with local_run.sh
, they will be local.
input
and output
are the head and tail of the data flow pipeline,
respectively. input
will be used as a source from where data comes
from, and the results will be written into output
. The operations
are chained together with the pipe operator |
:
input | map_replace(split_words, 'word') | group_by('word', native.count()) | output
This means that records from input
are mapped to new records using
the Python function split_words
. split_words
splits up the
incoming lines on whitespaces using the split()
Python function, and
it produces a single column, which we will call word
. map_replace
takes some fields from the input tuples (in the example it takes all
the fields), and replaces these fields with the fields in the
output. In the example, the incoming fields are offset
and line
,
and these are going to be mapped to the output of split_words
,
which we call word
.
We then group the words (that's what group_by
does on the words
field), and count how many times each word occurs. The second
parameter to group_by
is the aggregator we are applying to each
grouping, in this case the built-in Cascading Count
aggregator. The
aggregator's output field will be appended to the grouping field, and
this is what we are piping into output
: the words and the times they
appeared in the text.