The example demonstrates how to create and send Apache Beam SparkRunner metrics to ELK and use the result for Kibana dashboards
The main goal is getting metrics (number of records, failures and etc.) during the ingestion process, collect and analyze them after. Because our main distributed system is Apache Spark and we use ELK for logging, and Spark supports several sinks, we will try to use Spark Slf4J sink and log all Apache Beam metrics to ELK stack, and create a visual dashboard in Kibana.
This example demonstrates how to set up a processing pipeline that can read a text file, filter only foo
words and count number of foo
and non-foo
words as metrics, log them to ELK and create a visual chart.
- 1 Java project level:
- 2 Spark level:
- 3 Logstash level:
- 4 How to run:
- 4.1 Run ELK
- 4.2 Run Spark standalone
- 4.3 Run Spark cluster
- 5 Elasticsearch level:
- 6 Kibana level:
- Dependencies:
- Java classes:
- Resources:
In our case we run Spark as a standalone instance, please check pom.xml to find out dependencies and plugins (if you use Spark cluster, you must delete Spark section in pom.xml and change hadoop dependencies scopes from compile to provided and also you must add GELF library to Spark classpath).
You can find more about Beam Spark runner.
Create simple Apache Beam pipeline class, where we:
- Create pipeline from custom options
InterpretationPipelineOptions options =
PipelinesOptionsFactory.create(InterpretationPipelineOptions.class, args);
Pipeline p = Pipeline.create(options);
}
- Read the source file and apply custom ParDo function for filtering
foo
words
p.apply("Reads file", TextIO.read().from(options.getInputPath()))
.apply("Filters words", ParDo.of(new FilterTextFn()));
}
- Run the pipeline
LOG.info("Running the pipeline");
p.run().waitUntilFinish();
LOG.info("Pipeline has been finished");
}
Add custom DoFn with Counters (Read more about Apache Beam Metrics), where we:
private static class FilterTextFn extends DoFn<String, String> {
private final Counter fooCounter = Metrics.counter(MetricsPipeline.class, "foo");
private final Counter nonfooCounter = Metrics.counter(MetricsPipeline.class, "nonfoo");
@ProcessElement
public void processElement(ProcessContext c) {
String element = c.element();
if (element.equals("foo")) {
fooCounter.inc();
c.output(element);
} else {
nonfooCounter.inc();
}
}
}
Call MDC.put after main method to add id for each logger message:
MDC.put("uuid", UUID.randomUUID().toString());
Basically Slf4jSink.java is adapter for Spark Slf4J sink, which is absent in Apache Beam Spark runner (used v2.7.0).
package org.gbif.pipelines.common.beam;
import java.util.Properties;
import com.codahale.metrics.MetricRegistry;
import org.apache.beam.runners.spark.metrics.AggregatorMetric;
import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
/**
* A Spark {@link org.apache.spark.metrics.sink.Sink} that is tailored to report {@link
* AggregatorMetric} metrics to Slf4j.
*/
public class Slf4jSink extends org.apache.spark.metrics.sink.Slf4jSink {
public Slf4jSink(
final Properties properties,
final MetricRegistry metricRegistry,
final org.apache.spark.SecurityManager securityMgr) {
super(properties, WithMetricsSupport.forRegistry(metricRegistry), securityMgr);
}
}
To turn on metrics support in Spark we must provide metrics.properties and for sending logs to ELK we must change Spark logger properties.
Create metrics.properties file, necessary for Spark monitoring, please read about Spark metrics
executor.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink
driver.sink.slf4j.class=org.gbif.pipelines.common.beam.Slf4jSink
Find the final configuration - metrics.properties
Add ELK appender part to Spark log4j properties, please read about all Logstash/Gelf Loggers settings.
# ELK appender
log4j.appender.gelf=biz.paluch.logging.gelf.log4j.GelfLogAppender
log4j.appender.gelf.Threshold=INFO
log4j.appender.gelf.Host=udp:127.0.0.1
log4j.appender.gelf.Port=12201
log4j.appender.gelf.Version=1.1
log4j.appender.gelf.Facility=examples-metrics
log4j.appender.gelf.ExtractStackTrace=true
log4j.appender.gelf.FilterStackTrace=true
log4j.appender.gelf.MdcProfiling=true
log4j.appender.gelf.TimestampPattern=yyyy-MM-dd HH:mm:ss,SSSS
log4j.appender.gelf.MaximumMessageSize=8192
log4j.appender.gelf.MdcFields=uuid
log4j.appender.gelf.IncludeFullMdc=true
Find the final configuration - log4j.properties
We must add/change Logstash configuration to add new listener for gelf input, add filter for message value and output result to Elasticsearch
Create a simple Logstash configuration file and call it examples-metrics.config
. For more detailed information please read articles Logstash configuration file structure and more complex examples
Add input section to listen the host and port for GELF(The Graylog Extended Log Format) messages
input {
gelf {
host => "127.0.0.1"
port => "12201"
}
}
In our case necessary information about metrics is stored in message
field.
We can add filter section with kv, kv helps automatically parse messages and convert from string to json.
Before kv filter:
"message": "type=GAUGE, name=local-1538473503470.driver.MetricsPipeline.Beam.Metrics.Counts_quantity_using_metrics_ParMultiDo_FilterText.org.gbif.pipelines.examples.MetricsPipeline.foo, value=41.0"
After kv filter:
"message": {
"type": "GAUGE",
"name": "local-1538473503470.driver.MetricsPipeline.Beam.Metrics.Counts_quantity_using_metrics_ParMultiDo_FilterText.org.gbif.pipelines.examples.MetricsPipeline.foo",
"value": "41.0"
}
source
- a field for parsingtarget
- a new field for parsed resultfield_split
- split characters, in our case it is a commatrim_key
- to remove spaces in a key
filter {
kv {
source => "message"
target => "messageKv"
field_split => ","
trim_key => " "
}
}
For output logs to console and Elasticsearch add output section:
output {
stdout {
codec => "rubydebug"
}
elasticsearch {
hosts => "localhost:9200"
index => "examples-metrics"
}
}
Find the final configuration - metrics.properties
If you don't have an ELK instance, you can:
Download and run Elasticsearch
elasticsearch/bin/elasticsearch
kibana/bin/kibana
Download and run Logstash using examples-metrics.config
configuration created in step 3
logstash/bin/logstash -f examples-metrics.config
Standalone Spark, build the project and run:
java -jar target/examples-metrics-BUILD_VERSION-shaded.jar src/main/resources/example.properties
- Remove Spark section in the project pom.xml
- Download logstash-gelf-1.11.2.jar library
- Build the project
- Copy examples-metrics-BUILD_VERSION-shaded.jar, your configs and properties to your Spark gateway and run
spark2-submit --conf spark.metrics.conf=metrics.properties --conf "spark.driver.extraClassPath=logstash-gelf-1.11.2.jar" --driver-java-options "-Dlog4j.configuration=file:log4j.properties" --class org.gbif.pipelines.examples.MetricsPipeline --master yarn examples-metrics-BUILD_VERSION-shaded.jar --runner=SparkRunner --inputPath=foobar.txt
After first run, you will find the new index examples-metrics
in Elasticserach.
This request will return first 10 records:
curl -XGET http://localhost:9200/examples-metrics/_search?pretty
If we check index mapping:
curl -XGET http://localhost:9200/examples-metrics/_mapping?pretty
We will find that messageKv.value
is a text value. If we want to build a visual chart, we must change messageKv.value
type form text
to long
, to do this we can create predefined index mapping.
curl -XGET http://localhost:9200/examples-metrics/_mapping?pretty > examples-metrics.json
"value" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
And change it from text
to long
:
"value" : {
"type" : "long"
}
5.1.3 Create new index template file new-examples-metrics.json
with root json elements and copy mappings root element from examples-metrics.json
:
{
"template": "examples-metrics",
"version": 50001,
"settings": {
"index.refresh_interval": "5s"
},
COPY YOUR MAPPINGS SECTION FROM examples-metrics.json
}
Find the final template - metrics.properties
curl -XDELETE http://localhost:9200/examples-metrics?pretty
curl --header "Content-Type: application/json" -XPUT http://localhost:9200/_template/examples_metrics_template?pretty -d @new-examples-metrics.json
5.1.5 Run pipeline again
Please read the article - Logstash mapping
Time to create a visual chart, but before we must Kibana update index patterns to use we new predefined types.
Go to Kibana
->Management
->Index Patterns
and click the button "Refresh field list" in the right corner.
Go to Kibana
->Discovery
, create and save the query:
{
"query": {
"bool": {
"must": [
{
"match": {
"LoggerName": "metrics"
}
},
{
"match": {
"Thread": "main"
}
},
{
"query_string": {
"query": "messageKv.name:*MetricsPipeline.foo"
}
}
]
}
},
"sort": [
{
"@timestamp": {
"order": "asc"
}
}
]
}
The query returns result where:
- Field
LoggerName
equalsmetrics
- Field
Thread
equalsmain
- Field
messageKv.name
containsMetricsPipeline.foo
line - Sort by
@timestamp
Read more about Elasticsearch query language
Go to Kibana
->Visualize
, click Create a visualization
, select Line
and choose your query: