Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Apache Flume Plugin

James Taylor edited this page Dec 10, 2013 · 9 revisions

The plugin enables us to reliabily and efficiently stream large amounts of data/logs onto HBase using the Phoenix API. The necessary configuration of the custom Phoenix sink and the Event Serializer has to be configured in the Flume configuration file for the Agent. Currently, the only supported Event serializer is a RegexEventSerializer which primarily breaks the Flume Event body based on the regex specified in the configuration file.

Prerequisites:

 1) Phoenix v 3.0.0 SNAPSHOT +
 2) Flume 1.4.0 +

Installation & Setup:

 1) Download Phoenix v 3.0.0 SNAPSHOT from [repo](https://github.com/forcedotcom/phoenix)
 2) Follow the instrunctions as specified at https://github.com/forcedotcom/phoenix/wiki/Building-Project to build the project as the Flume plugin is still under beta
 3) Create a directory plugins.d within $FLUME_HOME directory. Within that, create a subdirectories phoenix-sink/lib 
 4) Copy the generated phoenix-3.0.0-SNAPSHOT.jar onto $FLUME_HOME/plugins.d/phoenix-sink/lib

Configuration:

Property Name Default Description
type com.salesforce.phoenix.flume.sink.PhoenixSink
batchSize 100 Default number of events per transaction
zookeeperQuorum Zookeeper quorum of the HBase cluster
table The name of the table in Hbase to write to.
ddl The CREATE TABLE query for the HBase table where the events will be upserted to. If specified, the query will be executed. Recommended to include the IF NOT EXISTS clause in the ddl.
serializer regex Event serializers for processing the Flume Event . Currently , only regex is supported.
serializer.regex (.*) The regular expression for parsing the event.
serializer.columns The columns that will be extracted from the Flume event for inserting into HBase.
serializer.headers Headers of the Flume Events that go as part of the UPSERT query. The data type for these columns are VARCHAR by default.
serializer.rowkeyType A custom row key generator . Can be one of timestamp,date,uuid,random and nanotimestamp. This should be configured in cases where we need a custom rowkey value to be auto generated and set for the primary key column.

Example configuration for ingesting Apache access logs onto Phoenix. Here we are using UUID as a row key generator for the primary key.

    //configuration for agent
    agent.sources = spooling-source
    agent.sinks = phoenix-sink
    agent.channels = memoryChannel

    //configuration for channel
    agent.channels.memoryChannel.type=memory
    agent.channels.memoryChannel.transactionCapacity=100
    agent.channels.memoryChannel.byteCapacityBufferPercentage=20

    //configuration for source
    agent.sources.spooling-source.type=spooldir
    agent.sources.spooling-source.channels = memoryChannel
    agent.sources.spooling-source.spoolDir = /opt/logs

    //configuration for interceptor
    agent.sources.spooling-source.interceptors = i1
    agent.sources.spooling-source.interceptors.i1.type = host
    agent.sources.spooling-source.interceptors.i1.hostHeader = f_host

    //configuration for sink
    agent.sinks.phoenix-sink.type=com.salesforce.phoenix.flume.sink.PhoenixSink
    agent.sinks.phoenix-sink.channel = memoryChannel
    agent.sinks.phoenix-sink.batchSize = 100
    agent.sinks.phoenix-sink.table = APACHE_LOGS
    agent.sinks.phoenix-sink.ddl = CREATE TABLE IF NOT EXISTS APACHE_LOGS(uid VARCHAR NOT NULL, host   VARCHAR ,identity  VARCHAR  ,user  VARCHAR  ,time VARCHAR ,method VARCHAR ,request VARCHAR,protocol  VARCHAR,status INTEGER ,size INTEGER ,referer  VARCHAR ,agent VARCHAR,f_host VARCHAR  CONSTRAINT pk PRIMARY  KEY (uid))
    agent.sinks.phoenix-sink.zookeeperQuorum = localhost
    agent.sinks.phoenix-sink.serializer = REGEX
  
    agent.sinks.phoenix-sink.serializer.rowkeyType = uuid
    agent.sinks.phoenix-sink.serializer.regex = ([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) \"([^ ]+) ([^ ]+) ([^\"]+)\" (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?
    agent.sinks.phoenix-sink.serializer.columns = host,identity,user,time,method,request,protocol,status,size,referer,agent
    agent.sinks.phoenix-sink.serializer.headers = f_host

Starting the agent:

   $ bin/flume-ng agent -f conf/flume-conf.properties -c ./conf -n agent

Monitoring:

For monitoring the agent and the sink process , enable JMX via flume-env.sh($FLUME_HOME/conf/flume-env.sh) script. Ensure you have the following line uncommented.

JAVA_OPTS="-Xms1g -Xmx1g -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=3141 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

Clone this wiki locally