Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring streams and storage #25

Merged
merged 84 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
6794c8a
chore: update to kstreams 2.3
jeqo Aug 5, 2019
1b9e4fe
refactor: draft aggregation and indexing without lucene
jeqo Aug 5, 2019
546171a
refactor: dependency link aggregation
jeqo Aug 6, 2019
420f706
refactor: compilable set of changes;
jeqo Aug 7, 2019
f4f4fc1
chore: fix formatting
jeqo Aug 7, 2019
af02905
fix: failing tests
jeqo Aug 7, 2019
2c3e799
chore: deprecate and format
jeqo Aug 7, 2019
21214e2
chore: update docker image for it
jeqo Aug 7, 2019
c7f1d64
fix: remote service call and store names
jeqo Aug 7, 2019
0690142
feat: testing suppress on trace aggregation
jeqo Aug 7, 2019
5d42256
feat: unit testing aggregation
jeqo Aug 8, 2019
de706f6
feat: testing traces retention
jeqo Aug 8, 2019
a83fa22
feat: complete streams unit testing
jeqo Aug 9, 2019
c28fd60
feat: it testing trace queries;
jeqo Aug 9, 2019
c0ef7d6
feat: it test for dependencies
jeqo Aug 9, 2019
5d07318
refactor: join stream apps into 2
jeqo Aug 9, 2019
2e55020
chore: remove old comment
jeqo Aug 9, 2019
8627fa5
chore: remove unused plugin
jeqo Aug 9, 2019
f298585
docs: add methods description
jeqo Aug 9, 2019
506fc6d
chore: update versions
jeqo Aug 9, 2019
3832e66
feat: autocomplete tags support
jeqo Aug 10, 2019
44dd4a8
feat: moving to zipkin/distroless images
jeqo Aug 10, 2019
9439dd7
feat: docker image renewed;
jeqo Aug 10, 2019
1a75c0d
fix: failing test
jeqo Aug 10, 2019
a4d732b
chore: clean properties
jeqo Aug 10, 2019
0bc72f9
fix: execution handling and code reuse
jeqo Aug 11, 2019
19d2764
chore: add todo for async call
jeqo Aug 11, 2019
1ef4c4e
fix: issue with dependency query
jeqo Aug 11, 2019
6968287
chore: close producer
jeqo Aug 11, 2019
fb055f2
chore: increase ti
jeqo Aug 11, 2019
e3302f7
chore: trying to chase issue with travis test
jeqo Aug 11, 2019
3a92195
chore: trying to chase issue with travis test
jeqo Aug 11, 2019
1fd43e6
docs: update stream images
jeqo Aug 11, 2019
c2ce59c
fix: ensure topics are created before running tests
jeqo Aug 11, 2019
53e8a7a
chore: remove non needed print exception stack
jeqo Aug 11, 2019
70aa7e7
chore: update configs and docs;
jeqo Aug 11, 2019
2668613
chore: var names and logging
jeqo Aug 12, 2019
9c21aae
chore: var names and logging
jeqo Aug 12, 2019
eb28ea8
chore: var names and logging
jeqo Aug 12, 2019
e69fc21
chore: var names and logging
jeqo Aug 12, 2019
10ab62e
chore: var names and logging
jeqo Aug 12, 2019
4687d83
feat: aligning configs and docker
jeqo Aug 12, 2019
2795943
fix: topic name in tests
jeqo Aug 12, 2019
551b252
chore: update docs and configs
jeqo Aug 12, 2019
f9562f7
chore: doc mounting custom libs
jeqo Aug 12, 2019
67627c2
fix: dependency-links to dependencies
jeqo Aug 12, 2019
e7920e7
docs: add steps to test
jeqo Aug 12, 2019
0c5b9a2
fix: issue with inactivity gap variable
jeqo Aug 13, 2019
9be6a84
chore: simplify link mapping and clean imports
jeqo Aug 13, 2019
e2667c6
docs: add ack to ksteram viz app
jeqo Aug 13, 2019
49bac94
fix: topic name
jeqo Aug 16, 2019
7db654d
chore: add container names
jeqo Aug 16, 2019
428d1d3
chore: update versions
jeqo Aug 16, 2019
1428589
feat: remove topics details and rename vars
jeqo Aug 16, 2019
cca270a
docs: more specific naming
jeqo Aug 16, 2019
a06b8d5
docs: remove too specific variables and simplify config
jeqo Aug 16, 2019
59d29a0
fix: store dir test
jeqo Aug 16, 2019
5175d97
feat: add clients overrides
jeqo Aug 17, 2019
f39db2a
fix: overrides
jeqo Aug 17, 2019
4e889d0
chore: remove 'all' filter
jeqo Aug 18, 2019
95d7c62
chore: rename kafka topic env variables
jeqo Aug 18, 2019
1876dae
docs: update env list
jeqo Aug 18, 2019
ccdd485
feat: back to separated stores to support dependency only use-case
jeqo Aug 19, 2019
f0dada6
docs: updating environments and doc of new approach
jeqo Aug 19, 2019
fdf3960
chore: simplifying testing
jeqo Aug 19, 2019
6aa3b23
chore: rename vars
jeqo Aug 19, 2019
dc54c59
chore: add suppress warning for ignored future
jeqo Aug 19, 2019
be2ded5
feat: rename profile and storage to kafka
jeqo Aug 19, 2019
671125e
docs: component javadocs
jeqo Aug 21, 2019
6a4ad2d
docs: add graph details
jeqo Aug 21, 2019
d4bc461
docs: add graph details
jeqo Aug 21, 2019
fe5f8b0
chore: fix acks
jeqo Aug 21, 2019
a22ee71
update base zipkin version
jeqo Aug 22, 2019
6e4056b
chore: rename from span to spans topic
jeqo Aug 22, 2019
120ae46
Merge branch 'develop' of github.com:jeqo/zipkin-storage-kafka into d…
jeqo Aug 22, 2019
139379d
refactor: simplify config names
jeqo Aug 22, 2019
76c997a
docs: add topics list;
jeqo Aug 22, 2019
d46afc0
fix: joining spans
jeqo Aug 22, 2019
0cba428
chore: update javadocs, config passing, and minimum traces stored.
jeqo Aug 22, 2019
cbba4c8
chore: small fixes
jeqo Aug 22, 2019
6365b87
chore: rename vars
jeqo Aug 23, 2019
a74f862
refactor: rename to flush interval
jeqo Aug 23, 2019
48e87a9
docs: add details about inactivity gap
jeqo Aug 23, 2019
7206365
refactor: more renames
jeqo Aug 23, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .mvn/wrapper/MavenWrapperDownloader.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ Licensed to the Apache Software Foundation (ASF) under one
under the License.
*/

import java.net.*;
import java.io.*;
import java.nio.channels.*;
import java.util.Properties;

public class MavenWrapperDownloader {
Expand Down
155 changes: 99 additions & 56 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,95 +2,138 @@

## Goals

* Provide a fast and reliable storage that enable extensibility via Kafka Topics.
* Provide a fast and reliable storage that enable extensibility via Kafka topics.
* Provide full storage functionality via streaming aggregations (e.g., dependency graph).
* Create a processing space where additional enrichment can be plugged in into the processing
pipeline.
* Remove need for additional storage when Kafka is available.
* More focused on supporting processing than storage: traces and dependency links are emitted
downstream to support metrics aggregation. Storage is currently supported but in a single node.

### Zipkin Storage Component
## Kafka Zipkin Storage

A Zipkin Storage component has the following internal parts:
Storage is composed by 3 main components:

* `Builder`: which configures if
- `strictTraceId(boolean strictTraceId)`
- `searchEnabled(boolean searchEnabled)`
- `autocompleteKeys(List<String> keys)`
- `autocompleteTtl(int autocompleteTtl)`
- `autocompleteCardinality(int autocompleteCardinality)`
* `SpanStore`: main component
- `Call<List<List<Span>>> getTraces(QueryRequest request);`
- `Call<List<Span>> getTrace(String traceId);`
- `Call<List<String>> getServiceNames();`
- `Call<List<String>> getSpanNames(String serviceName);`
- `Call<List<DependencyLink>> getDependencies(long endTs, long lookback);`
* `SpanConsumer`: which ingest spans
- `Call<Void> accept(List<Span> spans)`
* `QueryRequest`: which includes
- `String serviceName, spanName;`
- `Map<String, String> annotationQuery;`
- `Long minDuration, maxDuration;`
- `long endTs, lookback;`
- `int limit;`
- Span Consumer: repartition of collected span batches into individual spans keyed by `traceId`
- Span Aggregation: stream processing of spans into aggregated traces and then into dependency links.
- Span Store: building local state stores to support search and query API.

### Kafka Zipkin Storage
And it is supported by 3 main Kafka topics:

#### `KafkaSpanStore`
- `zipkin-spans`: Topic where list of spans indexed by trace Id are stored.
- `zipkin-trace`: Topic where aggregated traces are stored.
- `zipkin-dependency`: Topic where dependency links per trace are stored.

Span Store is expecting Spans to be stored in topics partitioned by `TraceId`.
### Kafka Span Consumer

> These can be created by Span Consumer, or can be **enriched** by other Stream Processors, outside of
Zipkin Server.
This component processes collected span batches (via HTTP, Kafka, ActiveMQ, etc),
take each element and re-indexed them by `traceId` on the "spans" topic.

Kafka Span Store will need to support different kind of queries:
This component is currently compensating how `KafkaSender` (part of [Zipkin-Reporter](https://github.com/openzipkin/zipkin-reporter-java))
is reporting spans to Kafka, by grouping spans into batches and sending them to a un-keyed
Kafka topic.

Component source code: [KafkaSpanConsumer.java](storage/src/main/java/zipkin2/storage/kafka/KafkaSpanConsumer.java)

##### Get Service Names/Get Span Names
### Stream Processing

Service name to Span names pairs are indexed by aggregating spans.
#### Span Aggregation

##### Get Trace/Find Traces
"Partitioned" Spans are processed to produced two aggregated streams: `Traces` and `Dependencies`.

When search requests are received, span index is used to search for trace ids. After a list is
retrieved, trace DAG is retrieved from trace state store.
**Traces**:

##### Get Dependencies
Spans are grouped by ID and stored on a local
[Session window](https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/SessionWindows.html),
where the `traceId` becomes the token, and `trace-timeout` (default: 1 minute)
(i.e. period of time without receiving a span with the same session; also known as session inactivity gap
in Kafka Streams)
defines if a trace is still active or not. This is evaluated on the next span received on the stream--
regardless of incoming `traceId`. If session window is closed, a trace message is emitted to the
traces topic.

After `spans` are aggregated into traces, traces are processed to collect dependencies.
Dependencies changelog are stored in a Kafka topic to be be stored as materialized view on
Zipkin instances.
![Session Windows](https://kafka.apache.org/20/images/streams-session-windows-02.png)

### Stream processors
> Each color represents a trace. The longer `trace timeout` we have, the longer we wait
to close a window and the longer we wait to emit traces downstream for dependency link and additional
aggregations; but also the more consistent the trace aggregation is.
If we choose a smaller gap, then we emit traces faster with the risk of breaking traces into
smaller chunks, and potentially affecting counters downstream.

#### Trace Aggregation Stream Processor
**Dependencies**

This is the main processors that take incoming spans and aggregate them into:
Once `traces` are emitted downstream as part of the initial processing, dependency links are evaluated
on each trace, and emitted the dependencies topic for further metric aggregation.

- Traces
- Dependencies
Kafka Streams topology:

![service aggregation](docs/service-aggregation-stream.png)
![trace aggregation](docs/trace-aggregation-topology.png)
jeqo marked this conversation as resolved.
Show resolved Hide resolved

![dependency aggregation](docs/dependency-aggregation-stream.png)
#### Trace Store Stream

#### Store Stream Processors
This component build local stores from state received on `spans` Kafka topic
for traces, service names and autocomplete tags.

Global tables for traces, service names and dependencies to be available on local state.
Kafka Streams source code: [TraceStoreTopologySupplier](storage/src/main/java/zipkin2/storage/kafka/streams/TraceStoreTopologySupplier.java)

![trace store](docs/trace-store-stream.png)
Kafka Streams topology:

![service store](docs/service-store-stream.png)
![trace store](docs/trace-store-topology.png)

![dependency store](docs/dependency-store-stream.png)
#### Dependency Store

#### Index Stream Processor
This component build local store from state received on `dependency` Kafka topic.

Custom processor to full-text indexing of traces using Lucene as back-end.
It builds a 1 minute time-window when counts calls and errors.

![span index](docs/span-index-stream.png)
Kafka Streams source code: [DependencyStoreTopologySupplier](storage/src/main/java/zipkin2/storage/kafka/streams/DependencyStoreTopologySupplier.java)

#### Retention Stream Processor
Kafka Streams topology:

This is the processor that keeps track of trace timestamps for cleanup.
![dependency store](docs/dependency-store-topology.png)

![trace retention](docs/trace-retention-stream.png)
### Kafka Span Store

This component supports search and query APIs on top of local state stores build by the Store
Kafka Streams component.

Component source code: [KafkaSpanStore.java](storage/src/main/java/zipkin2/storage/kafka/KafkaSpanStore.java)

#### Get Service Names/Get Span Names/Get Remote Service Names

These queries are supported by service names indexed stores built from `spans` Kafka topic.
jeqo marked this conversation as resolved.
Show resolved Hide resolved

Store names:

- `zipkin-service-names`: key/value store with service name as key and value.
- `zipkin-span-names`: key/value store with service name as key and span names list as value.
- `zipkin-remote-service-names`: key/value store with service name as key and remote service names as value.

#### Get Trace/Find Traces

These queries are supported by two key value stores:

- `zipkin-traces`: indexed by `traceId`, contains span list status received from `spans` Kafka topic.
- `zipkin-traces-by-timestamp`: list of trace IDs indexed by `timestamp`.

`GetTrace` query is supported by `zipkin-traces` store.
`FindTraces` query is supported by both: When receiving a query request time range is used to get
trace IDs, and then query request is tested on each trace to build a response.

#### Get Dependencies

This query is supported 1-minute windowed store from `DependencyStoreStream`.

When a request is received, time range is used to pick valid windows and join counters.

Windowed store:

- `zipkin-dependencies`.

### Kafka Autocomplete Tags

#### Get Keys/Get Values

Supported by a key-value containing list of values valid for `autocompleteKeys`.

- `zipkin-autocomplete-tags`: key-value store.
53 changes: 34 additions & 19 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,49 @@
# the License.
#

FROM openjdk:8
FROM alpine

ARG KAFKA_STORAGE_VERSION=0.1.1
ENV ZIPKIN_REPO https://repo1.maven.org/maven2
ENV ZIPKIN_VERSION 2.16.1
ENV KAFKA_STORAGE_VERSION 0.4.1-SNAPSHOT

ENV ZIPKIN_REPO https://jcenter.bintray.com
ENV ZIPKIN_VERSION 2.12.6
ENV ZIPKIN_LOGGING_LEVEL INFO
WORKDIR /zipkin

RUN apk add unzip curl --no-cache && \
curl -SL $ZIPKIN_REPO/io/zipkin/zipkin-server/$ZIPKIN_VERSION/zipkin-server-$ZIPKIN_VERSION-exec.jar > zipkin-server.jar && \
# don't break when unzip finds an extra header https://github.com/openzipkin/zipkin/issues/1932
unzip zipkin-server.jar ; \
rm zipkin-server.jar

COPY autoconfigure/target/zipkin-autoconfigure-storage-kafka-${KAFKA_STORAGE_VERSION}-module.jar BOOT-INF/lib/kafka-module.jar
RUN unzip -o BOOT-INF/lib/kafka-module.jar lib/* -d BOOT-INF

FROM gcr.io/distroless/java:11-debug

# Use to set heap, trust store or other system properties.
ENV JAVA_OPTS -Djava.security.egd=file:/dev/./urandom

RUN ["/busybox/sh", "-c", "adduser -g '' -D zipkin"]

# Add environment settings for supported storage types
ENV STORAGE_TYPE kafka

COPY --from=0 /zipkin/ /zipkin/
WORKDIR /zipkin

RUN curl -SL $ZIPKIN_REPO/io/zipkin/java/zipkin-server/$ZIPKIN_VERSION/zipkin-server-${ZIPKIN_VERSION}-exec.jar > zipkin.jar
# TODO haven't found a better way to mount libs from custom storage. issue #28
#COPY autoconfigure/target/zipkin-autoconfigure-storage-kafka-${KAFKA_STORAGE_VERSION}-module.jar kafka-module.jar
#ENV MODULE_OPTS -Dloader.path='BOOT-INF/lib/kafka-module.jar,BOOT-INF/lib/kafka-module.jar!/lib' -Dspring.profiles.active=kafka
ENV MODULE_OPTS -Dspring.profiles.active=kafka

RUN ["/busybox/sh", "-c", "ln -s /busybox/* /bin"]

ADD storage/target/zipkin-storage-kafka-${KAFKA_STORAGE_VERSION}.jar zipkin-storage-kafka.jar
ADD autoconfigure/target/zipkin-autoconfigure-storage-kafka-${KAFKA_STORAGE_VERSION}-module.jar zipkin-autoconfigure-storage-kafka.jar
ENV KAFKA_STORAGE_DIR /data
RUN mkdir /data && chown zipkin /data
VOLUME /data

ENV STORAGE_TYPE=kafkastore
USER zipkin

EXPOSE 9410 9411
EXPOSE 9411

CMD exec java \
${JAVA_OPTS} \
-Dloader.path='zipkin-storage-kafka.jar,zipkin-autoconfigure-storage-kafka.jar' \
-Dspring.profiles.active=kafkastore \
-Dcom.linecorp.armeria.annotatedServiceExceptionVerbosity=all \
-Dcom.linecorp.armeria.verboseExceptions=true \
-cp zipkin.jar \
org.springframework.boot.loader.PropertiesLauncher \
--logging.level.zipkin2=${ZIPKIN_LOGGING_LEVEL}
ENTRYPOINT ["/busybox/sh", "-c", "exec java ${MODULE_OPTS} ${JAVA_OPTS} -cp . org.springframework.boot.loader.PropertiesLauncher"]
43 changes: 33 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,33 @@ all: build

OPEN := 'xdg-open'
MAVEN := './mvnw'
VERSION := '0.3.3-SNAPSHOT'
VERSION := '0.4.1-SNAPSHOT'
IMAGE_NAME := 'jeqo/zipkin-kafka'

.PHONY: run
run: build zipkin-local

.PHONY: run-docker
run-docker: build docker-build docker-up

.PHONY: kafka-topics
kafka-topics:
jeqo marked this conversation as resolved.
Show resolved Hide resolved
docker-compose exec kafka-zookeeper /busybox/sh /kafka/bin/kafka-run-class.sh kafka.admin.TopicCommand \
--zookeeper localhost:2181 --create --topic zipkin-spans --partitions 1 --replication-factor 1 --if-not-exists
docker-compose exec kafka-zookeeper /busybox/sh /kafka/bin/kafka-run-class.sh kafka.admin.TopicCommand \
--zookeeper localhost:2181 --create --topic zipkin-trace --partitions 1 --replication-factor 1 --if-not-exists
docker-compose exec kafka-zookeeper /busybox/sh /kafka/bin/kafka-run-class.sh kafka.admin.TopicCommand \
--zookeeper localhost:2181 --create --topic zipkin-dependency --partitions 1 --replication-factor 1 --if-not-exists

.PHONY: docker-build
docker-build:
TAG=${VERSION} \
docker-compose build
docker build -t ${IMAGE_NAME}:latest .
docker build -t ${IMAGE_NAME}:${VERSION} .

.PHONY: docker-push
docker-push: docker-build
TAG=${VERSION} \
docker-compose push
docker push ${IMAGE_NAME}:latest
docker push ${IMAGE_NAME}:${VERSION}

.PHONY: docker-up
docker-up:
Expand All @@ -33,7 +43,7 @@ docker-down:

.PHONY: docker-kafka-up
docker-kafka-up:
docker-compose up -d kafka zookeeper
docker-compose up -d kafka-zookeeper

.PHONY: license-header
license-header:
Expand All @@ -49,22 +59,35 @@ test: build

.PHONY: zipkin-local
zipkin-local:
STORAGE_TYPE=kafkastore \
STORAGE_TYPE=kafka \
KAFKA_BOOTSTRAP_SERVERS=localhost:19092 \
java \
-Dloader.path='storage/target/zipkin-storage-kafka-${VERSION}.jar,autoconfigure/target/zipkin-autoconfigure-storage-kafka-${VERSION}-module.jar' \
-Dspring.profiles.active=kafkastore \
-Dloader.path='autoconfigure/target/zipkin-autoconfigure-storage-kafka-${VERSION}-module.jar,autoconfigure/target/zipkin-autoconfigure-storage-kafka-${VERSION}-module.jar!/lib' \
-Dspring.profiles.active=kafka \
-cp zipkin.jar \
org.springframework.boot.loader.PropertiesLauncher

.PHONY: get-zipkin
get-zipkin:
curl -sSL https://zipkin.io/quickstart.sh | bash -s

.PHONY: zipkin-test-multi
zipkin-test-multi:
curl -s https://raw.githubusercontent.com/openzipkin/zipkin/master/zipkin-lens/testdata/netflix.json | \
curl -X POST -s localhost:9411/api/v2/spans -H'Content-Type: application/json' -d @- ; \
${OPEN} 'http://localhost:9412/zipkin/?lookback=custom&startTs=1'
sleep 61
curl -s https://raw.githubusercontent.com/openzipkin/zipkin/master/zipkin-lens/testdata/messaging.json | \
curl -X POST -s localhost:9411/api/v2/spans -H'Content-Type: application/json' -d @- ; \

.PHONY: zipkin-test
zipkin-test:
curl -s https://raw.githubusercontent.com/openzipkin/zipkin/master/zipkin-ui/testdata/netflix.json | \
curl -s https://raw.githubusercontent.com/openzipkin/zipkin/master/zipkin-lens/testdata/netflix.json | \
jeqo marked this conversation as resolved.
Show resolved Hide resolved
curl -X POST -s localhost:9411/api/v2/spans -H'Content-Type: application/json' -d @- ; \
${OPEN} 'http://localhost:9411/zipkin/?lookback=custom&startTs=1'
sleep 61
curl -s https://raw.githubusercontent.com/openzipkin/zipkin/master/zipkin-lens/testdata/messaging.json | \
curl -X POST -s localhost:9411/api/v2/spans -H'Content-Type: application/json' -d @- ; \

.PHONY: release
release:
Expand Down
Loading