AcroMUSASHI Stream は、Storm をベースとした、ストリームデータの分散処理プラットフォームです。
「ストリームデータ」とは、ビッグデータにおけるひとつのかたちであり、連続的に発生し続ける時系列順のデータのことを言います。AcroMUSASHI Stream を利用することで、多種多様なデバイス/センサー/サービスなどで発生するストリームデータをリアルタイムに処理するシステムを簡単に構築できるようになります。
HTTP/SNMP/JMSといった数十種類のプロトコルに対応したインタフェースや、ビッグデータ処理に欠かせないHadoop/HBase/Cassandraなどのデータストアとの連携機能を提供しており、「M2M」「ログ収集・分析」「SNSアクセス解析」等、データの解析にリアルタイム性を要するシステムを、迅速に立ち上げることが可能です。
AcroMUSASHI Stream を用いた実装の仕方については Examples を参照してください。
acromusashi-stream を用いて開発を行うためには、Mavenのビルド定義ファイルであるpom.xmlに以下の内容を記述してください。
<dependency>
<groupId>jp.co.acroquest.acromusashi</groupId>
<artifactId>acromusashi-stream</artifactId>
<version>0.6.2</version>
</dependency>
acromusashi-stream を利用して、StormのTopologyを開発してください。
acromusashi-stream を利用したシステムを実行するためには、以下の手順が必要です。
- Step1: Storm/必要となるミドルウェアのインストール
- Step2: acromusashi-stream を利用して開発したTopologyのデプロイ
- Step3: Topologyの起動
storm-installerを使用することで、簡単にStormに必要な環境を構築できます。
また、必要に応じて、メッセージキューやNoSQLなどのミドルウェアをインストールしてください。
acromusashi-stream を用いて開発したTopologyのクラスをjarファイルにまとめ、関連するjarファイルと共に、Supervisorにデプロイしてください(関連するjarファイルはSupervisorが動作しているホスト全てにデプロイが必要です)。
その際、下記のディレクトリに配置してください。
関連するjarファイル > /opt/storm/lib 配下
開発したTopologyのjar > /opt/storm 配下
予め、StormのNumbus/Supervisorを起動しておいてください。
# service storm-nimbus start
# service storm-supervisor start
Storm本体の起動が確認できたら、開発したTopologyを起動します。
# cd /opt/storm
# bin/storm jar mytopology-x.x.x-jar acromusashi.stream.example.MyTopology MyTopologyName
ストリームデータを処理するシステムを構築する際にはデータを受信/取得し、ストリーム処理システムに取り込むこと必要があります。 acromusashi-stream では、以下のようなデータに対応しています(ここでは、代表的な一部だけを示しています)。
- HTTP(JSON)
- MQTT
- SNMP Trap
- Syslog
SNMP Trap を受信し、Kestrelなどのキューに格納する処理を、設定だけで実現可能です。
利用方法については、SNMP Trap 受信機能の利用方法を確認してください。
ストリームデータを処理するシステムを構築する際にはデータを一時メッセージキューに格納することで瞬間的な負荷増大に対しても、欠損なく対応できるようになります。
そのため、ストリームデータ処理プラットフォームではメッセージキューからデータを取得するための機能が求められます。
acromusashi-stream では、以下のメッセージキューに対応しています。
Kestrelからデータを取得するためには、KestrelJsonSpoutを利用します。
KestrelからJSON形式のメッセージを取得し、Boltに送信するまでの処理を、シームレスに行えるようになります。
また、KestrelJsonSpoutを用いた場合、Boltにおいて処理に失敗/タイムアウトしたメッセージの再処理が可能です。
あらかじめKestrelをインストールしておく必要がありますので、Kestrelの利用方法を確認してインストールして使用してください。
KestrelJsonSpoutを使用する際にはコンストラクタの引数に以下の設定項目を設定してください。
- 第1引数(Kestrelの接続先情報リスト)
以下の例のように【Kestrelホスト:KestrelThriftポート】形式のStringのListを指定
(例)
- 192.168.0.1:2229
- 192.168.0.2:2229
- 192.168.0.3:2229
- 第2引数(Kestrelのメッセージキューベース名称)
以下の例のように取得対象となるKestrel上のキュー名のベースの名称を指定してください。
(例)MessageQueue
尚、ベース名称をMessageQueueとした場合、実際に取得対象となるキュー名は
MessageQueue_0、MessageQueue_1、MessageQueue_2・・・となります。
- 第3引数(Kestrelから取得したデータのデシリアライズ方式)
文字列として取得する場合、「new StringScheme())」を設定してください。
// Kestrelの接続先情報リスト
List<String> kestrelHosts = Lists.newArrayList("KestrelServer1:2229", "KestrelServer2:2229", "KestrelServer3:2229");
// Kestrelのメッセージキューベース名称
String kestrelQueueName = "MessageQueue";
// KestrelJsonSpoutの並列度
int kestrelPara = 3;
// KestrelJsonSpoutコンポーネントの生成
KestrelJsonSpout kestrelSpout = new KestrelJsonSpout(kestrelHosts, kestrelQueueName, new StringScheme());
// KestrelJsonSpoutをTopologyに登録
getBuilder().setSpout("KestrelJsonSpout", kestrelSpout, kestrelSpoutPara);
// ~~以後、BoltをTopologyに設定~~
RabbitMQからデータを取得するためには、RabbitMqSpoutを利用します。
RabbitMQから文字列形式のメッセージを取得し、グルーピング情報を抽出してBoltに送信するまでの処理を、シームレスに行えるようになります。
あらかじめRabbitMQをインストールしておく必要がありますので、RabbitMQの利用方法を確認してインストールして使用してください。
RabbitMqSpoutにはTopology生成時に以下のフィールドを設定してください。
その上で、rabbitmqClusterContext.xmlを取得して設定を行い、Supervisorが動作しているホスト全ての /opt/storm/conf 配下に配置してください。
MessageKeyExtractorはJsonExtractorを参考に作成してください。
- コンテキスト読込オブジェクト(ContextHelper)
RabbitMQのクラスタ設定を記述したコンテキストファイルを読み込む読込オブジェクト。
SpringContextHelperに以下の例のようにクラスパス上のコンテキストファイルのパスを設定してください。
(例)/rabbitmqClusterContext.xml
- RabbitMQのベースキュー名称(QueueName)
以下の例のように取得対象となるRabbitMQ上のキュー名のベースの名称を指定してください。
(例)MessageQueue
尚、ベース名称をMessageQueueとした場合、実際に取得対象となるキュー名は
MessageQueue0、MessageQueue1、MessageQueue2・・・となります。
- RabbitMQから取得したメッセージからキーを抽出するオブジェクト(MessageKeyExtractor)
RabbitMQから取得したメッセージからキー項目を抽出するオブジェクトをMessageKeyExtractorインタフェースを継承して作成し、設定してください。
// RabbitMQクラスタ設定ファイルパスの指定
String contextPath = "/rabbitmqClusterContext.xml";
// RabbitMQのメッセージキューベース名称
String baseQueueName = "MessageQueue";
// RabbitMqSpoutの並列度
int mqSpoutPara = 3;
// MessageKeyExtractorの設定(IPアドレスを抽出するExtractorを設定)
MessageKeyExtractor extractor = new IpAddressExtractor();
// Springのコンテキスト情報を定義するHelperオブジェクトの生成
SpringContextHelper helper = new SpringContextHelper(contextPath);
// RabbitMqSpoutコンポーネントの生成
RabbitMqSpout rabbitMqSpout = new RabbitMqSpout();
rabbitMqSpout.setContextHelper(helper);
rabbitMqSpout.setQueueName(baseQueueName);
rabbitMqSpout.setMessageKeyExtractor(extractor);
// RabbitMqSpoutをTopologyに登録
getBuilder().setSpout("RabbitMqSpout", rabbitMqSpout, mqSpoutPara);
// ~~以後、BoltをTopologyに設定~~
設定ファイル記述例(rabbitmqClusterContext.xml をベースに下記の個所の修正を行う)
<!-- RabbitMQCluster0が保持するキュー一覧 -->
<util:list id="queueList0">
<value>Message0</value> <!-- ★RabbitMQが保持するキューを設定★ -->
<value>Message1</value> <!-- ★RabbitMQが保持するキューを設定★ -->
</util:list>
~~~~
<!-- RabbitMQプロセス一覧 -->
<property name="mqProcessList">
<util:list list-class="java.util.LinkedList">
<value>rabbitmqserver:5672</value> <!-- ★RabbitMQの待ち受けホスト/ポートを設定★ -->
</util:list>
</property>
~~~~
<!-- 呼出元別、接続先RabbitMQプロセス定義 -->
<property name="connectionProcessMap">
<util:map>
<entry key="rabbitmqserver_Message0"> <!-- ★RabbitMQへのアクセス元ホスト_キュー名称 を設定★ -->
<value>rabbitmqserver:5672</value> <!-- ★RabbitMQの待ち受けホスト/ポートを設定★ -->
</entry>
<entry key="rabbitmqserver_Message1"> <!-- ★RabbitMQへのアクセス元ホスト_キュー名称 を設定★ -->
<value>rabbitmqserver:5672</value> <!-- ★RabbitMQの待ち受けホスト/ポートを設定★ -->
</entry>
</util:map>
</property>
~~~~
<!-- 使用するConnectionFactory (ユーザ名、パスワードを変更) -->
<bean id="connectionFactory0" class="acromusashi.stream.component.rabbitmq.CachingConnectionFactory">
<property name="username" value="guest" /> <!-- ★RabbitMQのユーザ名を設定★ -->
<property name="password" value="guest" /> <!-- ★RabbitMQのパスワードを設定★ -->
<property name="channelCacheSize" value="10" />
</bean>
Hadoopに対してデータを投入するためにはHdfsStoreBoltを使用します。
HDFSに対して一定時間ごとにファイルを切り替えながらデータを投入できるようになります。
実装例は[Hadoop連携]を確認してください。
HdfsStoreBoltを使用するTopologyでは読み込むYAMLファイルに以下の設定項目を設定してください。
## 投入先のHDFSパス
hdfsstorebolt.outputuri : 'hdfs://__NAMENODE_HOST__/HDFS/'
## 投入されるファイル名のヘッダ
hdfsstorebolt.filenameheader : HDFSStoreBolt
## ファイルを切り替えるインターバル
hdfsstorebolt.interval : 10
HBaseに対してデータを投入するためにはCamelHbaseStoreBoltを使用します。
HBaseに対してBoltが受信したデータを投入できるようになります。
実装例は[HBase連携]を確認してください。
Cassandraに対してデータを投入するためにはCassandraStoreBoltを使用します。
Cassandraに対してBoltが受信したデータを投入できるようになります。
あらかじめCassandraをインストールしておく必要がありますので、[Cassandraの利用方法]を確認してインストールして使用してください。
CassandraStoreBoltを使用する際にはコンストラクタの引数に以下の設定項目を設定してください。
TupleMapperはDefaultTupleMapperを参考に作成してください。
- 第1引数(Cassandra投入設定Mapを保持するキー名称)
以下の例のようにYAMLファイル中に指定したCassandra投入設定Mapのキー値を設定
(例)cassandrastore.setting
- 第2引数(Cassandraへの投入内容を生成するTupleMapperオブジェクト)
Cassandraへの投入内容を生成するオブジェクトをTupleMapperインタフェースを継承して作成し、設定してください。
CassandraStoreBoltを使用するTopologyでは読み込むYAMLファイルに以下の設定項目を設定してください。
## CassandraStoreBolt Setting
cassandrastore.setting : ## Cassandra設定グループを示すキー項目、CassandraStoreBoltコンストラクタの第1引数に指定した値とあわせる
cassandra.host : "cassandrahost1:9160,cassandrahost2:9160,cassandrahost3:9160" ## CassandraHost Setting
cassandra.clusterName : 'Test Cluster' ## Cassandra投入先クラスタ名
cassandra.connection.timeout : 5000 ## Cassandra接続タイムアウト
cassandra.keyspace : ## CassandraKeyspace
- keyspace
// ~~SpoutをTopologyに設定~~
// Cassandra投入設定Mapを保持するキー名称
String configKey = "cassandrastore.setting";
// Cassandra投入先Keyspace
String keyspace = "keyspace";
// Cassandra投入先ColumunFamily
String columnFamily = "columnFamily";
// CassandraStoreBoltの並列度
int cassandraPara = 3;
// TupleMapperの生成(投入用のデータを生成するMapperオブジェクト)
TupleMapper<String, String, String> storeMapper = new StoreMapper(keyspace, columnFamily);
// CassandraStoreBoltコンポーネントの生成
CassandraStoreBolt<String, String, String> cassandraStoreBolt = new CassandraStoreBolt<String, String, String>(configKey, storeMapper);
// CassandraStoreBoltをTopologyに登録
getBuilder().setBolt("CassandraStoreBolt", cassandraStoreBolt, cassandraPara).fieldsGrouping("JsonConvertBolt", new Fields(FieldName.MESSAGE_KEY));
Elasticsearchに対してデータを投入するためにはElasticSearchBoltを使用します。
Elasticsearchに対してBoltが受信したデータを投入できるようになります。
実装例は[Elasticsearch連携]を確認してください。
ElasticSearchBoltを使用する際にはコンストラクタの引数に以下の設定項目を設定してください。
EsTupleConverterはJsonConverterを参考に作成してください。
- 第1引数(TupleをIndex Requestに変換するコンバータオブジェクト)
Elasticsearchへの投入内容を生成するオブジェクトをEsTupleConverterインタフェースを継承して作成し、設定してください。
ElasticSearchBoltにはTopology生成時に以下のフィールドを設定してください。
- クラスタ名称(clusterName)
Elasticsearchのクラスタ名称を設定してください。
(例)TestCluster
- ElasticSearchの投入先(servers)
ElasticSearchの投入先を設定してください。
「host1:port1;host2:port2;host3:port3...」という形式で定義してください。
(例)192.168.0.1:9300;192.168.0.2:9300;192.168.0.3:9300
Stormで使用しているYAML形式の設定ファイルを読み込むにはStormConfigGeneratorを使用します。
YAML形式の設定ファイルをStormのConfigオブジェクトとして読み込むことができます。
StormのConfigオブジェクトとして読みこんだオブジェクトからはStormConfigUtilを用いることで値の取得が可能です。
// 指定したパスから設定情報をStormの設定オブジェクト形式で読み込む。
Config conf = StormConfigGenerator.loadStormConfig("/opt/storm/config/TargetTopology.yaml");
// Stormの設定オブジェクトからキー"target.config"を持つ文字列形式の設定項目をデフォルト値""で取得する
String configValue = StormConfigUtil.getStringValue(conf, "target.config", "");
This software is released under the MIT License, see LICENSE.txt.