Integrates Storm and Cassandra by providing a generic and configurable backtype.storm.Bolt
implementation that writes Storm Tuple
objects to a Cassandra Column Family.
How the Storm Tuple
data is written to Cassandra is dynamically configurable -- you
provide classes that "determine" a column family, row key, and column name/values, and the
bolt will write the data to a Cassandra cluster.
Primary development of storm-cassandra will take place at: https://github.com/ptgoetz/storm-cassandra
Point/stable (non-SNAPSHOT) release souce code will be pushed to: https://github.com/nathanmarz/storm-contrib
Maven artifacts for releases will be available on maven central.
$ mvn install
Basic Usage
CassandraBolt
expects that a Cassandra hostname, port, and keyspace be set in the Storm topology configuration:
Config config = new Config();
config.put(CassandraBolt.CASSANDRA_HOST, "localhost");
config.put(CassandraBolt.CASSANDRA_PORT, 9160);
config.put(CassandraBolt.CASSANDRA_KEYSPACE, "testKeyspace");
The CassandraBolt
class provides a convenience constructor that takes a column family name, and row key field value as arguments:
IRichBolt cassandraBolt = new CassandraBolt("columnFamily", "rowKey");
The above constructor will create a CassandraBolt
that writes to the "columnFamily
" column family, and will look for/use a field
named "rowKey
" in the backtype.storm.tuple.Tuple
objects it receives as the Cassandra row key.
For each field in the backtype.storm.Tuple
received, the CassandraBolt
will write a column name/value pair.
For example, given the constructor listed above, a tuple value of:
{rowKey: 12345, field1: "foo", field2: "bar}
Would yield the following Cassandra row (as seen from cassandra-cli
):
RowKey: 12345
=> (column=field1, value=foo, timestamp=1321938505071001)
=> (column=field2, value=bar, timestamp=1321938505072000)
The "examples" directory contains two examples:
-
CassandraReachTopology
-
PersistentWordCount
The CassandraReachTopology
example is a Storm Distributed RPC example
that is essentially a clone of Nathan Marz' ReachTopology
,
that instead of using in-memory data stores is backed by a Cassandra database and uses generic
storm-cassandra bolts to query the database.
The sample PersistentWordCount
topology illustrates the basic usage of the Cassandra Bolt implementation. It reuses the TestWordSpout
spout and TestWordCounter
bolt from the Storm tutorial, and adds an instance of CassandraBolt
to persist the results.
In order to run the examples, you will need a Cassandra database running on localhost:9160
.
$ cd examples
$ mvn install
Install and run Apache Cassandra.
Create the sample schema using cassandra-cli
:
$ cd schema
$ cassandra-cli -h localhost -f cassandra_schema.txt
To run the CassandraReachTopology
execute the following maven command:
$ mvn exec:java -Dexec.mainClass=backtype.storm.contrib.cassandra.example.CassandraReachTopology
Among the output, you should see the following:
Reach of http://github.com/hmsonline: 3
Reach of http://github.com/nathanmarz: 3
Reach of http://github.com/ptgoetz: 4
Reach of http://github.com/boneill: 0
To enable logging of all tuples sent within the topology, run the following command:
$ mvn exec:java -Dexec.mainClass=backtype.storm.contrib.cassandra.example.CassandraReachTopology -Ddebug=true
The PersistentWordCount
example build the following topology:
TestWordSpout ==> TestWordCounter ==> CassandraBolt
Data Flow
TestWordSpout
emits words at random from a pre-defined list.TestWordCounter
receives a word, updates a counter for that word, and emits a tuple containing the word and corresponding count ("word", "count").- The
CassandraBolt
receives the ("word", "count") tuple and writes it to the Cassandra database using the word as the row key.
Run the PersistentWordCount
topology:
$ mvn exec:java -Dexec.mainClass=backtype.storm.contrib.cassandra.example.PersistentWordCount
View the end result in cassandra-cli
:
$ cassandra-cli -h localhost
[default@unknown] use stormks;
[default@stromks] list stormcf;
The output should resemble the following:
Using default limit of 100
-------------------
RowKey: nathan
=> (column=count, value=22, timestamp=1322332601951001)
=> (column=word, value=nathan, timestamp=1322332601951000)
-------------------
RowKey: mike
=> (column=count, value=11, timestamp=1322332600330001)
=> (column=word, value=mike, timestamp=1322332600330000)
-------------------
RowKey: jackson
=> (column=count, value=17, timestamp=1322332600633001)
=> (column=word, value=jackson, timestamp=1322332600633000)
-------------------
RowKey: golda
=> (column=count, value=31, timestamp=1322332602155001)
=> (column=word, value=golda, timestamp=1322332602155000)
-------------------
RowKey: bertels
=> (column=count, value=16, timestamp=1322332602257000)
=> (column=word, value=bertels, timestamp=1322332602255000)
5 Rows Returned.
Elapsed time: 8 msec(s).