Skip to content

Spin up a Hadoop, Kafka, Zookeeper, Druid Cluster on Ubuntu 14.04 LTS with Ansible and Vagrant

Notifications You must be signed in to change notification settings


Folders and files

Last commit message
Last commit date

Latest commit



13 Commits

Repository files navigation

##Hadoop, Kafka, Druid Data Ingestion Lab The purpose of this lab is to test data ingestion with a variety of open source tools. We will focus on these:

  • Zookeeper Ensemble
  • Hadoop HDFS and Map Reduce Cluster
  • Kafka cluster
  • Druid ingestion

We will build a network of Virtualbox VMs with Vagrant and Ansible, to imitate bare metal infrastructure.

Default hosts created for the lab:

  • core1.lan - bind9 DNS server
  • hadoop[0-2].lan - Hadoop cluster
  • zookeeper[0:2].lan - Zookeeper ensemble
  • kafka[0:2].lan - Kafka cluster
  • druid0.lan - Database for data ingestion


To test who is the Quorum leader...this should not return standalone, if it does you must restart each node quickly:

echo srvr | nc zookeeper0.lan 2181 | grep Mode

Also you can run to find out if the node is running smoothly:

echo ruok | nc zookeeper0.lan 2181


###IMPORTANT Host resolution in Hadoop clustering is critical. This repo uses a bind9 server to resolve hostnames, forward and reverse records. While writing to the /etc/hosts file is a simple solution, this probably gets messy. I have a hybrid solution here. On each host, if you only write it's own entry, and use DNS for other hostname resolution, it works. For instance: hadoop0.lan's /etc/hosts file: localhost hadoop0.lan hadoop0

####Additional note: I fought missing jars getting the following hdfs namenode -format command to work

cp /opt/hadoop-2.7.2/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/slf4j-api-1.7.10.jar /opt/hadoop-2.7.2/share/hadoop/common/lib/
cp /opt/hadoop-2.7.2/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/slf4j-log4j12-1.7.10.jar /opt/hadoop-2.7.2/share/hadoop/common/lib/
cp /opt/hadoop-2.7.2/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/commons-configuration-1.6.jar /opt/hadoop-2.7.2/share/hadoop/common/lib/

###format HDFS by changing user and running the commands (On the master namenode):

sudo su - hadoop
hdfs namenode -format

###Start Namenode:


###Start Mapreduce v2 Daemon:


####To verify daemons on each host, as the hadoop user:

This will display the different daemons running on the host where the command was issued.


####To verify the cluster:

Navigate to the Name node Web UI:


Additionally, navigate to Hadoop UI:


You should see all cluster nodes with storage capacity stats. If not, you may have a networking issue/DNS resolution issue.

Troubleshoot this by checking connection availability with the telnet and netstat tool. Netstat will show you interface/process bindings. All of these should be bound to or all interfaces Additionally, ipv6 should be disabled

netstat -ntple

Proto Recv-Q Send-Q Local Address           Foreign Address         State       User       Inode       PID/Program name
tcp        0      0   *               LISTEN      0          7784        -               
tcp        0      0  *               LISTEN      999        47462       19258/java      
tcp        0      0 *               LISTEN      999        45671       18871/java      
tcp        0      0    *               LISTEN      0          28282       -               
tcp        0      0  *               LISTEN      999        47466       19258/java      
tcp        0      0  *               LISTEN      999        47448       19258/java      
tcp        0      0  *               LISTEN      999        47472       19258/java      
tcp        0      0  *               LISTEN      999        47456       19258/java      
tcp        0      0 *               LISTEN      107        7854        -               
tcp        0      0  *               LISTEN      999        45679       18871/java      
tcp        0      0 *               LISTEN      999        46676       19108/java      
tcp6       0      0 :::111                  :::*                    LISTEN      0          7787        -               
tcp6       0      0 :::22                   :::*                    LISTEN      0          28284       -               
tcp6       0      0 :::34334                :::*                    LISTEN      107        7860        -  

Begin trying to connect to services:

telnet <server> <port>
telnet hadoop0.lan 9000

If you get connection refused errors, check the processes and logs. Below is an example of the namenode log:

cat /opt/hadoop-2.7.2/logs/hadoop-hadoop-namenode-hadoop0.lan.log

###Test by running jobs:

####Use Map Reduce v2:

hadoop jar /opt/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar pi 30 100

####Use YARN:

yarn jar /opt/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar pi 30 100

####Verify the mapreduce job:



Create a hdfs directory for druid deep storage:

hdfs dfs -mkdir /user
hdfs dfs -mkdir /user/druid
hdfs dfs -mkdir /user/druid/segments
hdfs dfs -mkdir /user/druid/indexing-logs
hdfs dfs -chmod 0777 /user/druid
hdfs dfs -chmod 0777 /user/druid/segments
hdfs dfs -chmod 0777 /user/druid/indexing-logs


Starting a coordinator node as druid (See below for upstart scripts):

java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/coordinator:lib/* io.druid.cli.Main server coordinator

Had to append hadoop jars to the classpath (got the classes by: executing $ hadoop classpath and appending):

java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/coordinator:lib/*:/opt/hadoop-2.7.2/etc/hadoop:/opt/hadoop-2.7.2/share/hadoop/common/lib/*:/opt/hadoop-2.7.2/share/hadoop/common/*:/opt/hadoop-2.7.2/share/hadoop/hdfs:/opt/hadoop-2.7.2/share/hadoop/hdfs/lib/*:/opt/hadoop-2.7.2/share/hadoop/hdfs/*:/opt/hadoop-2.7.2/share/hadoop/yarn/lib/*:/opt/hadoop-2.7.2/share/hadoop/yarn/*:/opt/hadoop-2.7.2/share/hadoop/mapreduce/lib/*:/opt/hadoop-2.7.2/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar io.druid.cli.Main server coordinator

Starting a historical node as druid:

java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/historical:lib/* io.druid.cli.Main server historical

Had to append hadoop jars to the classpath (got the classes by: executing $ hadoop classpath and appending):

java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/historical:lib/*:/opt/hadoop-2.7.2/etc/hadoop:/opt/hadoop-2.7.2/share/hadoop/common/lib/*:/opt/hadoop-2.7.2/share/hadoop/common/*:/opt/hadoop-2.7.2/share/hadoop/hdfs:/opt/hadoop-2.7.2/share/hadoop/hdfs/lib/*:/opt/hadoop-2.7.2/share/hadoop/hdfs/*:/opt/hadoop-2.7.2/share/hadoop/yarn/lib/*:/opt/hadoop-2.7.2/share/hadoop/yarn/*:/opt/hadoop-2.7.2/share/hadoop/mapreduce/lib/*:/opt/hadoop-2.7.2/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar io.druid.cli.Main server historical

Starting a broker node as druid:

java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/broker:lib/* io.druid.cli.Main server broker

Had to append hadoop jars to the classpath (got the classes by: executing $ hadoop classpath and appending):

java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/broker:lib/*:/opt/hadoop-2.7.2/etc/hadoop:/opt/hadoop-2.7.2/share/hadoop/common/lib/*:/opt/hadoop-2.7.2/share/hadoop/common/*:/opt/hadoop-2.7.2/share/hadoop/hdfs:/opt/hadoop-2.7.2/share/hadoop/hdfs/lib/*:/opt/hadoop-2.7.2/share/hadoop/hdfs/*:/opt/hadoop-2.7.2/share/hadoop/yarn/lib/*:/opt/hadoop-2.7.2/share/hadoop/yarn/*:/opt/hadoop-2.7.2/share/hadoop/mapreduce/lib/*:/opt/hadoop-2.7.2/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar io.druid.cli.Main server broker

Starting realtime node as druid:

java -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/wikipedia/wikipedia_realtime.spec -classpath config/_common:config/realtime:lib/* io.druid.cli.Main server realtime

Had to append hadoop jars to the classpath (got the classes by: executing $ hadoop classpath and appending):

java -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/wikipedia/wikipedia_realtime.spec -classpath config/_common:config/realtime:lib/*:/opt/hadoop-2.7.2/etc/hadoop:/opt/hadoop-2.7.2/share/hadoop/common/lib/*:/opt/hadoop-2.7.2/share/hadoop/common/*:/opt/hadoop-2.7.2/share/hadoop/hdfs:/opt/hadoop-2.7.2/share/hadoop/hdfs/lib/*:/opt/hadoop-2.7.2/share/hadoop/hdfs/*:/opt/hadoop-2.7.2/share/hadoop/yarn/lib/*:/opt/hadoop-2.7.2/share/hadoop/yarn/*:/opt/hadoop-2.7.2/share/hadoop/mapreduce/lib/*:/opt/hadoop-2.7.2/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar io.druid.cli.Main server realtime

Upstart Files:

  • start|stop|restart druid-coordinator
  • start|stop|restart druid-historical
  • start|stop|restart druid-broker
  • start|stop|restart druid-middlemanager
  • start|stop|restart druid-overlord

runtime logs at /var/log/upstart/*

start druid-overlord
start druid-coordinator 
start druid-historical
start druid-broker
start druid-middlemanager

One by one, check for errors as the servers start up:

tail -n 1000 -f /var/log/upstart/druid-overlord.log
tail -n 1000 -f /var/log/upstart/druid-coordinator.log
tail -n 1000 -f /var/log/upstart/druid-historical.log
tail -n 1000 -f /var/log/upstart/druid-broker.log
tail -n 1000 -f /var/log/upstart/druid-middlemanager.log

Cluster Batch Data Loading

(Druid Documentation)[]

In my experience to get around the capacity errors, you need 2 historical nodes.

###Druid Batch Hadoop Indexer tasks


  • On Druid, extract the json example data. /opt/druid<version>/bin/init
  • Copy the example druid json data (druid0.lan:/opt/druid/quickstart/wikiticker-2015-09-12-sampled.json) to Hadoop
hdfs dfs -mkdir /user/druid/quickstart
hdfs dfs -chmod -R 0777 /user/druid/quickstart
hdfs dfs -put ./wikiticker-2015-09-12-sampled.json /user/druid/quickstart/
hdfs dfs -chmod 0777 /user/druid/quickstart/wikiticker-2015-09-12-sampled.json

Update the wikiticker-indexer.json spec file to point the path to the data in Hadoop:

"inputSpec" : {
  "type" : "static",
  "paths" : "hdfs://hadoop0.lan:9000/user/druid/quickstart/wikiticker-2015-09-12-sampled.json"

Post index task to overlord:

root@druid0:/opt/druid- curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/wikiticker-index.json localhost:8090/druid/indexer/v1/task

###Tranquility Kafka Server The tranquility role installs a kafka consumer server and http server to communicate with druid in realtime.

start tranquility_kafka

####Then on kafka, use the bin tools:

./bin/ --create --zookeeper zookeeper0.lan:2181,zookeeper1.lan:2181,zookeeper2.lan:2181 --replication-factor 3 --partitions 1 --topic metrics
./bin/ --create --zookeeper zookeeper0.lan:2181 --replication-factor 1 --partitions 1 --topic metrics

####Start a Kafka Producer:

./bin/ --broker-list kafka0.lan:9092,kafka1.lan:9092,kafka2.lan:9092 --topic metrics
./bin/ --broker-list zookeeper0.lan:9092 --topic metrics

Generate Metrics with a python script, on

###Tranquility HTTP server

start tranquility_http

####In the same folder, generate and POST to http server - Generate metrics Task:

/opt/druid- | curl -XPOST -H'Content-Type: application/json' --data-binary @- http://druid0.lan:8200/v1/post/metrics

####Use the Metabase web app to see realtime druid data

####Coordinator console:


####Druid Console: http://druid0.lan:8081

HDFS Segment: /user/druid/segments/wikipedia/20130831T000000.000Z_20130901T000000.000Z/2016-01-17T03_23_16.720Z/0

###Additional Queries

To query for ALL data (across realtime and historical nodes) issue the query to the broker node:

curl -X POST 'http://localhost:8082/druid/v2/?pretty' -H 'content-type: application/json' -d@examples/wikipedia/query.body


may have to update /var/log/kafka/ is the broker id has changed

Create a topic for ingestion:

./bin/ --create --zookeeper zookeeper0.lan:2181,zookeeper1.lan:2181,zookeeper2.lan:2181 --replication-factor 3 --partitions 1 --topic pageviews
./bin/ --create --zookeeper zookeeper0.lan:2181 --replication-factor 1 --partitions 1 --topic pageviews

Check the replication and partition:

bin/ --describe --zookeeper zookeeper0.lan:2181,zookeeper1.lan:2181,zookeeper2.lan:2181 --topic pageviews

Create a producer:

./bin/ --broker-list kafka0.lan:9092,kafka1.lan:9092,kafka2.lan:9092 --topic pageviews
./bin/ --broker-list zookeeper0.lan:9092 --topic pageviews

###Tranquility Kafka Consumer root@druid0:/opt/druid- cat /opt/druid-

  "dataSources" : {
    "pageviews-kafka" : {
      "spec" : {
        "dataSchema" : {
          "dataSource" : "pageviews-kafka",
          "parser" : {
            "type" : "string",
            "parseSpec" : {
              "timestampSpec" : {
                "column" : "time",
                "format" : "auto"
              "dimensionsSpec" : {
                "dimensions" : ["url","user"]
              "format" : "json"
          "granularitySpec" : {
            "type" : "uniform",
            "segmentGranularity" : "minute",
            "queryGranularity" : "none"
          "metricsSpec" : [
              "type" : "count",
              "name" : "views"
              "name" : "latencyMs",
              "type" : "doubleSum",
              "fieldName" : "latencyMs"
        "ioConfig" : {
          "type" : "realtime"
        "tuningConfig" : {
          "type" : "realtime",
          "maxRowsInMemory" : "100000",
          "intermediatePersistPeriod" : "PT10M",
          "windowPeriod" : "PT10M"
      "properties" : {
        "task.partitions" : "1",
        "task.replicants" : "1",
        "topicPattern" : "pageviews"
  "properties" : {
    "zookeeper.connect" : "zookeeper0.lan:2181,zookeeper1.lan:2181,zookeeper2.lan:2181",
    "druid.discovery.curator.path" : "/druid/discovery",
    "druid.selectors.indexing.serviceName" : "druid/overlord",
    "commit.periodMillis" : "15000",
    "consumer.numThreads" : "2",
    "kafka.zookeeper.connect" : "zookeeper0.lan:2181,zookeeper1.lan:2181,zookeeper2.lan:2181",
    "" : "tranquility-kafka"

###Metabase UI

java -jar metabase.jar

Browse to:


###Generating Metrics root@druid0:/opt/druid- cat genmetrics

#!/usr/bin/env python

import argparse
import json
import random
import sys
from datetime import datetime

def main():
  parser = argparse.ArgumentParser(description='Generate example page request latency metrics.')
  parser.add_argument('--count', '-c', type=int, default=25, help='Number of events to generate (negative for unlimited)')
  args = parser.parse_args()

  count = 0
  while args.count < 0 or count < args.count:
    timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")

    r = random.randint(1, 4)
    if r == 1 or r == 2:
      user = 'bill'
    elif r == 3:
      user = 'tom'
      user = 'user' + str(random.randint(1, 99))

    u = random.randint(1, 4)
    if u == 1 or u == 2:
      url = '/'
    elif u == 3:
      url = '/login'
      url = '/page' + str(random.randint(1, 99))

    views = random.randint(1, 200)

    latencyMs = random.randint(80, 400)

      'time': timestamp,
      'latencyMs': int(latencyMs),
      'url': url,
      'views': int(views)

    count += 1

except KeyboardInterrupt:

Paste output into the Kafka Producer


Spin up a Hadoop, Kafka, Zookeeper, Druid Cluster on Ubuntu 14.04 LTS with Ansible and Vagrant






No releases published


No packages published