- Repository for metadata
- Configuration properties
- Configure repository
- Connecting to kafka properties
- Kafka version compatibility
- Configure metadata pooling/scraping
- Enabling/disabling specific clusters
- Customizing web app
- Filter options
- Prometheus metrics
- Security / Users
- High availability setup
- Customizing topic creation wizard
- Reading/consuming records utility
- Oldest record age sampling
- Record structure analysis
- Scraping broker's disk capacity
- KStream detection
- Topic inspection
- ACLs inspection
- Cluster inspection
- SQL querying - SQLite
- Autopilot
- Slack integration
- Jira integration
- Miscellaneous UI Settings
- Miscellaneous Backend Settings
- Logging
- Writing custom plugins
Repository represents source of truth for expected/wanted topics, ACLs and Entity-quotas. It contains following entity types:
Entity | Breakdown | Description |
---|---|---|
Kafka clusters | File per kafka cluster | Kafkistry will use those to know which clusters to connect to |
Topics | One file per topic name | Configuration of topic with possible overrides per specific clusters |
ACLs | One file per principal | List of expected ACLs for principal |
Quota entities | One file per user/client | Amount of operational quotas for clients and users connecting to kafka |
Default repository's storage implementation is GIT repository containing *.yaml
files.
By default, when update is made through UI, Kafkistry will create a new branch, commit changes and push it to upstream. That branch can then be merged through pull request on remote side and Kafkistry will fetch it on next poll.
Kafkistry can also work with GIT repo without having remote/origin specified. In that case, commits will be made but without pushing to upstream.
There is application-defaults.yaml which has definitions of some default properties and defines alias/alternative property names to be used.
Kafkistry is a Spring Boot application using its configuration properties processors which allows multiple ways for properties being passed in. (see more info here spring boot externalizing configuration docs)
Each configuration property has its full name, for example:
app.record-analyzer.executor.concurrency
, this specific example is defined in application-defaults.yaml as:
app.record-analyzer.executor.concurrency: ${RECORD_ANALYZER_CONCURRENCY:1}
Which basically adds alternative/alias property name as well as default value being 1
.
Spring framework allows this property to be defined in following ways (not exhaustive list):
Mechanism | Examples, from lowest to highest precedence |
---|---|
in application.yaml | RECORD_ANALYZER_CONCURRENCY: 5 |
as environment variable | export RECORD_ANALYZER_CONCURRENCY=5 |
as JVM option | -DRECORD_ANALYZER_CONCURRENCY=5 |
as command line argument | --RECORD_ANALYZER_CONCURRENCY=5 |
in application.yaml | app.record-analyzer.executor.concurrency: 5 |
as environment variable | export APP_RECORDANALYZER_EXECUTOR_CONCURRENCY=5 |
as JVM option | -Dapp.record-analyzer.executor.concurrency=5 |
as command line argument | --app.record-analyzer.executor.concurrency=5 |
Same applies for all possible properties Kafkistry is configurable by.
NOTE: Only some properties have defined shorter alias in application.yaml.
There are two modes/implementations of repository:
- Regular directory
dir
storage which simply read/writes*.yaml
files in local directory. - Git directory
git
storage which is exact format to simpledir
implementation except that it's backed by GIT
- activate
dir
spring profile with environment variableSPRING_PROFILES_ACTIVE=dir
- specify desired path with env variable
APP_REPOSITOY_DIR_PATH=/desired/path
(default value isCURRENT_WORK_DIR/kafkistry/repository
)
This implementation is basic and does not support showing history and remote persistence (in contrast to git)
This implementation is default and preferred to simple dir
.
Kafkistry treats contents of main/master branch as source of truth.
Git allows writing changes and committing them to separate branch, which then could be reviewed through PR (pull-request) on remote side (GitHub/Bitbucket/Gitlab/...)
Example use case:
- Somebody within your company wants to create new topic
- He/she could then create new topic's description through Kafkistry's UI and submit it
- Kafkistry will make commit in new branch and push it to remote
- This person then creates a PR for administrator to review
- Why review matters? For example:
- you may want to check that topic name conforms your naming conventions;
- number of partitions this person specified might be huge;
- specified
retention.bytes
might be beyond available disk space on kafka brokers - ...list could go on
- PR gets approved/merged
- Kafkistry fetches newly merged changes and displays that this new topic is expected to exist but it's missing
- administrator then simply creates this missing topic
Properties:
Property | Description |
---|---|
GIT_REMOTE_SSH_URI |
URI of remote git dir to clone/use. For example ssh://kr-user@git-host:1234/my-kafkistry-repo.git . It can be omitted, in this case Kafkistry will simply initialize empty repository without remote, and commits won't be pushed anywhere. |
LOCAL_GIT_DIR |
Path to directory where will Kafkistry clone/init git repository |
GIT_MAIN_BRANCH |
Default value: master |
GIT_SSH_PASSWORD |
Password to use to authenticate kr-user as ssh client. Can be omitted if private key s used. |
SSH_PRIVATE_KEY |
Literal value of ssh private key. Example how to generate keypair: ssh-keygen -t rsa -C "my kafkistry key" -m PEM , add public key to your git repo server |
SSH_PRIVATE_KEY_PATH |
Path to private key file, alternative to SSH_PRIVATE_KEY |
SSH_PRIVATE_KEY_PASS |
Passphrase for given private key, can be omitted if private key has a passphrase |
GIT_TIMEOUT_SEC |
Timeout for git remote calls (push/pull). Default 30 sec |
GIT_REFRESH_INTERVAL_SEC |
Git periodic fetch polling interval. Default 30 sec |
app.repository.git.strict-ssh-host-key-checking |
Default false (ssh would fail if current host never connected to remote and having no saved fingerprint of remote host) |
GIT_BROWSE_BRANCH_BASE_URL |
Optional. Used for UI generating links for external git repository browsing for showing branch. Example value: https://my-bitbucket/projects/my-project/repos/my-repo/compare/commits?sourceBranch= |
GIT_BROWSE_COMMIT_BASE_URL |
Optional. Used for UI generating links for external gir repository browsing for showing commit. Example value: https://my-bitbucket/projects/my-project/repos/my-repo/commits/ |
GIT_COMMIT_TO_MASTER_BY_DEFAULT |
Should Kafkistry commit directly to main/master branch. Default false |
By default, there is no need to set up any special properties for Kafkistry to use when connecting to kafka via AdminClient
unless, your kafka cluster(s) require non PLAINTEXT connections.
Kafkistry stores list of kafka clusters which it needs to connect to in it's repository,
and for security reasons. It's by design that no additional properties are stored there except bootstrap.servers
and which security protocol to use.
Any additional property for Kafkistry to use when connecting to kafka can be passed by environment variables
with name prefixed by APP_KAFKA_PROPERTIES_*
.
Few examples:
AdminClient property | Environment variable name to pass by |
---|---|
ssl.truststore.location |
AP<br/>P_KAFKA_PROPERTIES_SSL_TRUSTSTORE_LOCATION |
sasl.mechanism |
APP_KAFKA_PROPERTIES_SASL_MECHANISM |
sasl.jaas.config |
APP_KAFKA_PROPERTIES_SASL_JAAS_CONFIG |
Any defined property like that will be used by Kafkistry to connect to all kafka clusters.
In case when there is need to connect to different kafka clusters with different properties then "profiles" come into play.
You can specify different properties per different profile by specifying environment variable with name of
following pattern: KAFKA_PROFILES_<PROFILE-NAME>.PROPERTIES_<KAFKA_PROPERTY>
NOTE: when passing properties by environment variable PROFILE_NAME
can't contain _
Few examples:
Profile | AdminClient property | Environment variable name to pass by |
---|---|---|
foo |
sasl.mechanism |
APP_KAFKA_PROFILES_FOO_PROPERTIES_SASL_MECHANISM |
foo |
sasl.jaas.config |
APP_KAFKA_PROFILES_FOO_PROPERTIES_SASL_JAAS_CONFIG |
bar |
sasl.mechanism |
APP_KAFKA_PROFILES_BAR_PROPERTIES_SASL_MECHANISM |
bar |
sasl.jaas.config |
APP_KAFKA_PROFILES_BAR_PROPERTIES_SASL_JAAS_CONFIG |
NOTE: those kafka properties profiles have nothing to do with Spring framework's profiles
NOTE: if you plan to pass sasl.jaas.config
via docker's environment variable,
make sure it's one-liner value because docker's -e
and --env-file
does not handle multi-lined values properly
In order to Kafkistry to know about which kafka clusters to connect to, kafka cluster needs to be added to Kafkistry via UI (or manually adding new file in Kafkistry's repository).
When adding cluster, security protocol needs to be chosen:
SSL | SASL | Corresponding security.potocol |
---|---|---|
no | no | PLAINTEXT |
yes | no | SSL |
no | yes | SASL_PLAINTEXT |
yes | yes | SASL_SSL |
When adding new cluster or editing metadata of existing, specific profile(s) can be selected to include properties defined for that particular profile.
After adding cluster to Kafkistry's repository (and merging addition to master in case of GIT repository), Kafkistry will start to periodically scrape metadata from it.
Every interaction with kafka could be treated as read or write operation. You can define different timeout for each of those.
Property | Default | How much will Kafkistry wait for AdminClient's Future to complete |
---|---|---|
KAFKA_READ_TIMEOUT_MS |
15000 (15sec) |
Applies to API calls which have read semantics, i.e. don't actively change state (mostly DESCRIBE calls) |
KAFKA_WRITE_TIMEOUT_MS |
120000 (2min) |
Applies to API calls which have write semantics, i.e. CREATING/DELETING/ALTERING/... operations |
NOTE: If your kafka cluster(s) does not use security restrictions following section is not important.
Even though, Kafkistry is administrative tool, it's just yet another client connecting to kafka, and it is subject for kafka's
Authorizer
to deny some operations Kafkistry want's to perform.
The simplest setup is to specify Kafkistry in super.users
configuration property of all kafka brokers.
This way will allow Kafkistry to always "see" all topics/groups/etc.... Being super user means that kafka will permit all
requests coming from Kafkistry.
Things to know is adding Kafkistry in super.users
sounds too scary:
- Kafkistry periodically does only READ-like operations such as describing topics, groups, configs, fetching topic offsets...
- active WRITE-like operations are performed only by manual action of ADMIN user through UI (or API call to be more precise)
- WRITE-lie operations would be creations/deletions/altering configs/etc
NOTES on approach where Kafkistry is not being super-user:
- functionality Kafkistry won't have permission (ACL) to perform simply won't work
- some statuses might be wrong, for example: a topic might be declared as MISSING, even though it exists but Kafkistry does not have permission to describe it
There are tests for all interactions that Kafkistry does with kafka cluster.
Kafka Version | Compatibility | Need connect to ZK for some operations |
---|---|---|
v > 3.3 |
full? (not tested yet) | no |
2.6 ≤ v ≤ 3.3 |
full | no |
2.1 ≤ v < 2.6 |
full | yes |
1.0 ≤ v < 2.1 |
partial (not tested) | yes |
Kafkistry determines major/minor version of kafka by looking at controller's inter.broker.protocol.version
config property.
This may not reflect an actual build version of broker, but it will mean actual version is greater or equal.
It's done this way to avoid need to connect to zookeeper if not needed.
Based on detected version, Kafkistry will perform interactions/operations either by leveraging API calls from AdminClient
or going to Zookeeper if needed depending upon detected kafka cluster version.
NOTE: secure connection setup to Zookeeper is not tested!
Kafkistry will periodically pool/scrape metadata from all kafka clusters that it connects onto. There are several properties that can be configured. Scraping is broken to several job types, for example:
cluster_state
- to describe cluster, topics, aclsconsumer_groups
- to describe consumer groups and their assignments and offsetstopic_offsets
- to get begin and end offset of each topic-partitionclient_quotas
- to describe client quotas- ...
Property | Default | Description |
---|---|---|
CLUSTER_POOL_INTERVAL |
10000 (ms) |
How often to scrape metadata (topics, consumer groups, ACLs, ...) |
CLUSTER_POOL_RECORD_SAMPLING_INTERVAL |
120000 (ms) (2min) |
How often to read oldest and/or newest messages from every topic-partition |
CLUSTER_POOL_CONCURRENCY |
6 |
Limit for one type of job (e.g. topic_offsets fetching) for how many clusters to process concurrently |
CLUSTER_POOL_CONCURRENCY_PER_CLUSTER |
30 |
How many client's (admin+consumer) to have opened toward one cluster |
There is option to completely disable or enable connecting to specific kafka clusters added to Kafkistry repository.
Disabling specific clusters is useful in situations when you have more than one environment, for example having production environment and staging environment, which are unreachable on network level. Suppose you have common GIT repository for managing topic configurations on clusters from both environments. Deployment of Kafkistry on staging environment could not reach kafka clusters in production environment and vice-versa.
Example below shows that KR-PRRD
can reach KFK-P1
and KFK-P2
, but can't KFK-S1
and KFK-S2
,
KR-STAG
can reach KFK-S1
and KFK-S2
, but can't KFK-P1
and KFK-P2
,
|---------->[GIT]<----------|
| |
| |
|---[KR-PROD]---| |---[KR-STAG]---|
| | | |
| | | |
v v v v
[KFK-P1] [KFK-P2] [KFK-S1] [KFK-S1]
So, it's better to simply disable particular clusters when you know they are unreachable.
Property | Default | Description |
---|---|---|
ENABLED_CLUSTERS |
any | Comma separated list of cluster identifiers to allow attempt to connect to. Can be literal * |
DISABLED_CLUSTERS |
none | Comma separated list of cluster identifiers to prevent attempt to connect to. Can be literal * |
ENABLED_TAGS |
any | Comma separated list of cluster tags to allow attempt to connect to. Matches when any of cluster tags matches tag from this list. Can be literal * |
DISABLED_TAGS |
none | Comma separated list of cluster tags to prevent attempt to connect to. Blocks when any of cluster tags matches tag from this list. Can be literal * |
Property | Default | Description |
---|---|---|
HTTP_PORT |
8080 |
Which port should web server listen on. No need to change if running in docker with port mapping. |
HTTP_ROOT_PATH |
/kafkistry |
This is basically prefix for all http urls paths. Useful if Kafkistry is deployed behind http proxy which will be using url path for routing. NOTE: must not end with / but can be empty. |
HTTP_REQUESTS_LOGGING_ENABLED |
true |
Should each HTTP request to Kafkistry be logged, see filter |
Various features in Kafkistry can be enabled/disabled for specific cluster/topic/consumer/etc This section describes such options, this section is referenced by other sections using filtering.
Property | Default | Description |
---|---|---|
<prefix>.included |
(all) | Which values to include |
<prefix>.excluded |
(none) | Which values to exclude |
Each of those properties can be:
*
- matches all- undefined
- inclusion matches all
- exclusion matches none
- CSV list
- example:
my-cluster-1,my-cluster-2
- example:
- List of strings:
- example:
- "my-topic-1" - "my-topic-2"
Property | Default | Description |
---|---|---|
<prefix>.identifiers.included |
(all) | Which cluster identifiers to include. Can be omitted, * , CSV string or list of strings. |
<prefix>.identifiers.excluded |
(none) | Which cluster identifiers to exclude. Can be omitted, * , CSV string or list of strings. |
<prefix>.tags.included |
(all) | Clusters having which tags to include. Can be omitted, * , CSV string or list of strings. |
<prefix>.tags.excluded |
(none) | Clusters having which tags to exclude. Can be omitted, * , CSV string or list of strings. |
<prefix-for-clusters>:
included: "cluster-1,cluster-2"
<prefix-for-topics>:
excluded:
- "topic-2"
will result in following acceptance matrix:
Filter matches | topic-1 |
topic-2 |
topic-3 |
---|---|---|---|
cluster-1 |
yes | no | yes |
cluster-2 |
yes | no | yes |
cluster-3 |
no | no | no |
Kafkistry is capable of exporting metrics for prometheus. It exports some metrics about itself and metrics about consumer's lag.
By default, metrics can be scraped on /kafkistry/prometheus/metrics
HTTP endpoint.
Path is configurable by: HTTP_ROOT_PATH
(default=/kafkistry
) and PROMETHEUS_PATH
(default=/prometheus/metrics
)
Where HTTP endpoint will be: HTTP_ROOT_PATH
+PROMETHEUS_PATH
(default=/kafkistry/prometheus/metrics
)
Registration of default JVM metrics is done using io.prometheus.client.hotspot.DefaultExports
There are numerous metrics showing how is Kafkistry operating. (UI requests latencies, API request latencies, time since last kafka metadata scrape, etc.)
All Kafkistry-specific metrics have default naming starting with kafkistry_
*.
This prefix can be configured via APP_METRICS_PREFIX
property.
Property | Default | Description |
---|---|---|
app.metrics.enabled |
true |
Enable/disable all metrics / no endpoint available when disabled. |
app.metrics.prefix or APP_METRICS_PREFIX |
kafkistry_ |
See Kafkistry operating metrics |
app.metrics.default-metrics |
true |
See JVM metrics |
app.metrics.api-calls |
true |
Weather or not to track metrics around API beans method calls |
app.metrics.http-calls |
true |
Weather or not to track metrics around incoming HTTP calls |
app.metrics.endpoint-enabled |
true |
Weather or not http endpoint for prometheus is exposed. |
app.metrics.http-path or PROMETHEUS_PATH |
/prometheus/metrics |
See Prometheus' metrics |
app.metrics.pre-cached |
false |
Option to enable precomputation of all samples. Useful when dealing with huge number of samples when each scrape takes long time to compute. With this enabled, computation is performed in another thread, and scrape always gets last computed samples. |
app.metrics.pre-cache-refresh-sec |
10 (10sec): |
Interval how often to pre-compute samples. |
Monitoring lag of consumers is important aspect of whole system.
Definition of consumer lag is difference between the latest offset in topic-partition and last committed offset by consumer.
Lag represents how many messages is consumer lagging from producer(s).
Topic-partition:
----+---+---+---+---+---+-----+---+---+---+
... | | | C | | | ... | | | E |
----+---+---+---+---+---+-----+---+---+---+
| |
|<--------- LAG --------->|
C - committed offset by consumer
E - end offset (latest produced)
Ideally, you want for lag be 0
or oscillate slightly above zero.
Steadily growing lag means that consumer can't keep up with producer's rate.
Available configuration properties
Property | Default | Description |
---|---|---|
app.metrics.consumer-lag.enabled |
true |
Enable/disable exporting lag metric at all |
app.metrics.consumer-lag.enabled-on.clusters.<...filter options...> |
all | For which clusters to include consumer lag metric. See filtering options |
app.metrics.consumer-lag.enabled-on.topics.<...filter options...> |
all | For which topics to include consumer lag metric. See filtering options |
app.metrics.consumer-lag.enabled-on.consumer-groups.<...filter options...> |
all | For which consumer groups to include consumer lag metric. See filtering options |
There are several drawbacks with approach that every consumer exports metric about self-lag.
- It's ok if some members from consumer group go down, group will simply rebalance and you'll still have lag metric form other members which are up and running. What if all consumer members go down? Then you won't have lag metric at all.
- Every consumer member needs to export it's metric for lag. Also, it needs to be implemented individually for each language that's being used for services consuming from kafka.
- Kafkistry offers unified and centralized place for collecting and exporting lag for all consumers on all clusters regardless of language consumers are written in, with having lag even if consumer members are down.
Downsides:
- Kafkistry will introduce slight timed delay for lag metric depending on rate of pooling/scraping topic offsets and latest consumer's commit offsets
- Kafkistry will "see" consumer having lag until it performs commit, lag might be shown as greater that actually is in cases when consumer is configured to perform time or count periodic commits comparing to performing commits after each processed record/batch
- If some service/consumer group is decommissioned/stopped being used Kafkistry will continue exporting lag until consumer group is explicitly deleted
or when retention on topic
__consumer_offsets
deletes the newest commits of that particular group
Kafkistry exports lag as GAUGE metric named kafkistry_consumer_lag
broken down by following labels:
cluster
- on which kafka cluster it is (one of clusters added into Kafkistry). This can be customized by implementing ClusterLabelMetricProvider See more about writing custom pluginsconsumer_group
- consumer group IDtopic
- topic namepartition
- which partition of topicconsumer_host
- hostname/ip of consumer group's member instance or literalunassigned
Value of metric is lag = end_offset - committed_offset
Example of consumer lag metric sample:
kafkistry_consumer_lag{cluster="my-kafka",consumer_group="my-consumer",topic="my-topic",partition="0",consumer_host="/127.0.0.1",} 123.0
Kafkistry can export topic partition end (latest) and begin (oldest) offset as GAUGE metrics:
kafkistry_topic_begin_offset
kafkistry_topic_end_offset
Both of those have the following labels:
cluster
- on which kafka cluster it is (one of clusters added into Kafkistry). This can be customized by implementing ClusterLabelMetricProvider See more about writing custom pluginstopic
- topic namepartition
- which partition of topic
Available configuration properties
Property | Default | Description |
---|---|---|
app.metrics.topic-offsets.enabled |
true |
Enable/disable exporting topic partition offsets metric at all |
app.metrics.topic-offsets.enabled-on.clusters.<...cluster filter options...> |
all | For which clusters to include topic offsets metric. See cluster filtering options |
app.metrics.topic-offsets.enabled-on.topics.<...filter options...> |
all | For which topics to include topic offsets metric. See filtering options |
Use cases:
- rate of increase of
kafkistry_topic_end_offset
directly corresponds to produce rate - difference between end offset and begin offset for particular topic's partition roughly corresponds to
number of messages in that partition.
- NOTE 1: this number will be completely wrong for topics with
cleanup.policy
includingcompact
- NOTE 2: this number might be slightly bigger than actual in case of transaction usage due to transaction markers "using" some offsets
- NOTE 1: this number will be completely wrong for topics with
Retention metrics answer the question:
- is the topic over-provisioned or under-provisioned in regard to topic's configured
retention.bytes
andretention.ms
?
Motivation:
- you may find yourself in situation to create topic with appropriate configuration for load you expect initialy
- after some time, traffic increases, and partitions of your topic are no longer truncated by
retention.ms
, but byretention.bytes
instead - this renders your topic not having time retention that you wanted initially
There are 3 metrics:
kafkistry_topic_effective_retention_ms
- GAUGE which shows age of the oldest record in topic partition
- NOTE: works only if oldest record age sampling is enabled
- value represents difference between
Time.now
andoldestRecord.time
in milliseconds - NOTE: if records are produced with custom supplied timestamp which is way lower than current time when produce is happening then this metric may be completely unusable
kafkistry_topic_size_retention_usage
- GAUGE of ratio between actual size of partition against configured
retention.bytes
- value
0.0
means that partition is empty - value
0.5
means that partition uses 50% of configuredretention.bytes
- value can be greater than
1.0
due to how kafka rolls partition segments
- GAUGE of ratio between actual size of partition against configured
kafkistry_topic_time_retention_usage
- GAUGE of ratio between the oldest record age (effective retention) of partition against configured
retention.ms
- value
0.5
means that partition uses 50% of configuredretention.ms
- value can be greater than
1.0
due to how kafka rolls partition segments
- GAUGE of ratio between the oldest record age (effective retention) of partition against configured
Different regime cases:
- low time_retention_usage and high size_retention_usage
- topic is truncated by size retention (might want alert on this)
- high time_retention_usage and low size_retention_usage
- topic is truncated by time retention
Available configuration properties
Property | Default | Description |
---|---|---|
app.metrics.topic-retention.enabled |
true |
Enable/disable exporting topic partition retention metric at all |
app.metrics.topic-retention.enabled-on.clusters.<...filter options...> |
all | For which clusters to include retention metric. See filtering options |
app.metrics.topic-retention.enabled-on.topics.<...filter options...> |
all | For which topics to include retention metric. See filtering options |
Exposing all statuses of topic per different cluster using following metric name kafkistry_topic_status
.
Used labels:
cluster
- on which kafka cluster it is (one of clusters added into Kafkistry). This can be customized by implementing ClusterLabelMetricProvider See more about writing custom pluginstopic
- topic namestatus
- which partition of topic- Example statuses:
OK
MISSING
WRONG_REPLICATION_FACTOR
- ... See all statuses in here in class
TopicInspectionResultType
. Note that custom statuses can be implemented via TopicExternalInspector
- Example statuses:
valid
-true
/false
category
- one of following enum valuesIssueCategory
level
- one of following enum valuesStatusLevel
owners
- declared owner of topic in TopicDescription orunknown
for topic which are not in kafkistry but exist on cluster
Available configuration properties:
Property | Default | Description |
---|---|---|
app.metrics.topic-status.enabled |
true |
Enable/disable exporting topic statuses state metric at all |
app.metrics.topic-status.omit-status-names |
empty | List of status names not to be included in metrics samples with goal to reduce number of exported metric samples. |
app.metrics.topic-status.include-disabled-clusters |
false |
When false statuses for clusters that are disabled will be omitted, on true there will be included CLUSTER_DISABLED metric status for all disabled clusters. |
app.metrics.topic-status.group-without-topic-name |
false |
When true metric won't have label topic and metric value will correspond to number of topics with equal other labels. |
app.metrics.topic-status.enabled-on.clusters.<...filter options...> |
all | For which clusters to include status metric. See filtering options |
app.metrics.topic-status.enabled-on.topics.<...filter options...> |
all | For which topics to include status metric. See filtering options |
Exposing all statuses of cluster using following metric name kafkistry_cluster_status
.
Used labels:
cluster
- on which kafka cluster it is (one of clusters added into Kafkistry). This can be customized by implementing ClusterLabelMetricProvider See more about writing custom pluginsstatus
- which partition of topic- Example statuses:
VISIBLE
UNREACHABLE
DISK_USAGE_DISBALANCE
OVER_PROMISED_RETENTION
- ... See all statuses in here of enum
StateType
. Note that custom statuses can be implemented via ClusterIssueChecker
- Example statuses:
valid
-true
/false
level
- one of following enum valuesStatusLevel
Available configuration properties:
Property | Default | Description |
---|---|---|
app.metrics.cluster-status.enabled |
true |
Enable/disable exporting cluster statuses state metric at all |
app.metrics.cluster-status.omit-status-names |
empty | List of status names not to be included in metrics samples with goal to reduce number of exported metric samples. |
app.metrics.cluster-status.include-disabled-clusters |
false |
When false statuses for clusters that are disabled will be omitted, on true there will be included CLUSTER_DISABLED metric status for all disabled clusters. |
app.metrics.cluster-status.group-without-topic-name |
false |
When true metric won't have label topic and metric value will correspond to number of topics with equal other labels. |
app.metrics.cluster-status.enabled-on.clusters.<...filter options...> |
all | For which clusters to include status metric. See filtering options |
Security / login when accessing UI, and it's enabled by default. Security is all about authenticating User which is accessing Kafkistry and resolving its authorities.
Any unauthenticated user accessing UI will be redirected to /login
page view.
Kafkistry uses session cookie named KRSESSIONID
to pass it with each HTTP request so that back-end knows which user is performing each request.
There are several options how authentication can be accomplished.
This is the simplest way to give Kafkistry list of users do the authentication. It is static in sense that Kafkistry will read the list on startup and any change (for example, adding new user) will require Restart of Kafkistry.
Options to pass in the users list.
USERS_PASSWORDS
- one user format:
<username>|<password>|<name>|<surname>|<email>|<role>|<token>
username
- username to login withpassword
- plaintext password to login withname
,surname
,email
- displaying it in the UI, in case when using GIT repository, this user metadata will be used as author of commitrole
- used for resolving which authorities user will have- available:
ADMIN
,USER
,READ_SERVICE
see more about which authorities each role has in UserRole
- available:
token
- optional token which can be used to authenticate HTTP requests by being passed via
X-Auth-Token
instead of standard session based approach with authentication via /login page form
- optional token which can be used to authenticate HTTP requests by being passed via
- multiple users can be separated either by new line
\n
or by;
- example with 2 users separated by
;
:USERS_PASSWORDS=u1|p1|Foo|Bar|u1@kr|ADMIN|atoken;u2|p2|Fizz|Buzz|u2@kr|USER|
- this option is great for quick startup, but it is not recommended for production usage if its being passed by Kafkistry argument option or docker environment variable because of passing plaintext passwords
- one user format:
USERS_PASSWORDS_YAML
- value holding yaml- holds same information as raw
USERS_PASSWORDS
but in yaml format and with encrypted/hashed password BCrypt algorithm password
can be encrypted with BCrypt or can be in plaintextattributes
isString
toAny
map which is not used by core kafkistry but it is there to allow custom plugins to store additional information withinUser
object- example:
- holds same information as raw
- user:
username: "admin"
firstName: "Admy"
lastName: "Adminsky"
email: "[email protected]"
role:
name: "ADMIN"
authorities:
- name: "VIEW_DATA"
- name: "REQUEST_CLUSTER_UPDATES"
- name: "REQUEST_TOPIC_UPDATES"
- name: "REQUEST_ACL_UPDATES"
- name: "REQUEST_QUOTA_UPDATES"
- name: "MANAGE_GIT"
- name: "MANAGE_KAFKA"
- name: "MANAGE_CONSUMERS"
- name: "READ_TOPIC"
attributes: {}
token: null
password: "$2a$10$xL07zCVsDMgK2FlB0SUiz..GwkmDVf2pnB2kO1G49/zJhWsxo09cm"
passwordEncrypted: true
# more users...
USERS_PASSWORDS_YAML_FILE
- path to file holding yaml- same format as
USERS_PASSWORDS_YAML
- same format as
NOTE/TIP: any change in user's list will require restart of Kafkistry
NOTE/TIP: If you specify both USERS_PASSWORDS
and USERS_PASSWORDS_YAML_FILE
pointing to
non-existing or empty file, Kafkistry will then write users from USERS_PASSWORDS
into USERS_PASSWORDS_YAML_FILE
in
yaml format with encrypted passwords.
Custom authentication can be achieved by implementing KafkistryUsernamePasswordAuthProvider. This is mechanism which implements classic username+password authentication. The implementation could perform whatever remote call to validate credentials and retrieve metadata for User
Second option is to implement PreAuthUserResolver. This might be useful in situation if Kafkistry will be behind some proxy which will be responsible for login and which will set some cookie in browser. Then, the custom PreAuthUserResolver could read this cookie and make some remote lookup to validate it and actually login the user in Kafkistry without user ever seeing login page.
See more about writing custom plugins
Entities like Topic, Principal, Quote Entity have declared property owner
which represent a team / user group
which owns it. It is useful to have iit in bigger organization wth larger number of teams, topics, principals/applications, etc...
Kafkistry uses UserOwnerVerifier abstraction to check is particular User (logged-in person in UI) a member of particular user group (is user particular owner).
One example where Kafkistry uses it is when request to reset consumer group offsets is made. Kafkistry wants to check that person doing it is allowed to do so by indirectly checking is that user owner of Principal's declared owner, which has ACL to particular consumer group, is able to join that group.
[consumer-group] <- [ACL for READ GROUP] <- [Principal] <-(is owner)- [User]
Similarly to static user list, you can pass in list of which user is member of which owner group.
Options to pass in the owner groups.
OWNER_GROUPS
- one user format:
<owner/group>|<username1>,<username2>,...
owner/group
- owner nameusernameN
- username who s member of owner group
- multiple owner groups can be separated either by new line
\n
or by;
- example with 2 users separated by
;
:OWNER_GROUPS=g1|u1,u2;g1|u1,u3
- one user format:
OWNER_GROUPS_YAML
- value holding yaml- holds same information as raw
OWNER_GROUPS
but in yaml format - example:
- holds same information as raw
- owner: "my-owner-group-1"
usernames:
- "user1"
- "user2"
- owner: "my-owner-group-2"
usernames:
- "user3"
# more owner-groups...
OWNER_GROUPS_YAML_FILE
- path to file holding yaml- same format as
OWNER_GROUPS_YAML
- same format as
NOTE/TIP: any change in owners list will require restart of Kafkistry
In larger organizations you may want to restrict who can manage which consumer group.
Property | Default | Description |
---|---|---|
app.security.consumer-groups.only-admin |
false |
When true then only ADMIN users are able to delete consumer groups, reset offsets regardless of non-admin user actually being owner of particular consumer group |
ALLOW_ACCESS_TO_CONSUMER_GROUPS_NO_OWNERS |
false |
In case when there are no ACL-s affecting particular consumer group, then Kafkistry can't resolve which principal and therefore which owner owns access to consumer group. When this property is false then non-admin users wont be permitted to make changes on such consumer groups. |
Implementation class for above listed properties is ConsumerAlterPermissionAuthorizer
Property | Default | Description |
---|---|---|
app.security.enabled |
true |
Weather or not any security access is enforced, not recommended to disable for production deployments. |
app.security.csrf-enabled |
true |
Weather or not CSRF protection is enabled, not recommended to disable for production deployments. CSRF works by injecting csrf token in every UI view, then all ajax calls to backend are decorated by csrf http header obtained from served html. |
app.security.explain-denied-access |
true |
By default, on unauthorized access Spring security throws AccessDeniedException without any details of deny cause. When explanation is enabled, more details about deny decision are included in denying exception message. Use with caution as it might allow attacker to better understand authorization requirements. |
app.security.denied-access-help-message |
"Please contact your Kafkistry administrator!" |
Addition to deny explanation message to give users help directions. |
Kafkistry can run as multi-node cluster in order to achieve high-availability.
Kafkistry as tool for management and inspection for kafka is not critical part of whole system. If Kafkistry consumer lag metric is being used for monitoring, then it's useful to have HA because you might not want to unavailability of Kafkistry makes you blind for not knowing lag of all consumers.
High availability comes from having more than one instance/node of Kafkistry being deployed so that if one goes down you still have other to handle user requests / export lag.
In multi instance setup each Kafkistry operates very similarly as standalone instance except few things:
- user sessions are being shared across instances so that if user is logged-in on one Kafkistry instance then subsequent
HTTP-request to another Kafkistry instance will work under the same
KRSESSIONID
- Kafkistry instances send event's to each other about changes so that if topic is being created by one Kafkistry instance, and subsequent UI HTTP request goes to another Kafkistry instance, the fact that event about creating will be sent helps to avoid situation of second Kafkistry not being aware of topic being created moments ago by first Kafkistry instance
A typical setup would be to place HTTP proxy in front multiple Kafkistry nodes sto that proxy balances requests across all available Kafkistry nodes. Could be in a simple round-robin strategy, no need for sticky session on proxy because of session sharing between Kafkistry (KR) nodes.
. . . . . . . Hazelcast
. +----+ . /--- cluster
/---->| KR | . /
/ . +----+ ./
+---------+ +--------+ / . .
| User | request | HTTP |/ . +----+ .
| browser |-------->| proxy |-------->| KR | .
+---------+ +--------+\ . +----+ .
\ . .
\ . +----+ .
\---->| KR | .
. +----+ .
. . . . . . .
How are user sessions being shared and events sent between Kafkistry nodes?
- answer: hazelcast in-memory-data-grid (IMDG)
Kafkistry uses Hazelcast IMDG for user's session sharing and sending-receiving events between Kafkistry instances.
In order to make it work Kafkistry nodes must somehow know about each other to connect together by forming hazelcast cluster.
Options to configure hazelcast:
Property | Default | Description |
---|---|---|
HAZELCAST_PORT |
5701 |
Port on which Kafkistry embedded hazelcast will listen on. Make sure to add it in port mapping if running inside docker. |
HAZELCAST_ADVERTIZED_IP |
none | This is an IP address which will be advertised, it's important that each Kafkistry node can reach each other Kafkistry node by this IP and port |
HAZELCAST_DISCOVERY_TYPE |
MULTICAST |
Configure how should Kafkistry nodes discover each other. Available options: NONE , CUSTOM_IMPLEMENTATION , LOCALHOST_IP , MULTICAST , STATIC_IPS |
HAZELCAST_PUBLISH_ACK_TIMEOUT |
6000 (6sec) |
How much time should one Kafkistry instance wait for others acknowledging that they received and handled published event |
Discovery types:
NONE
- don't join to hazelcast clusterMULTICAST
- let the hazelcast try to discover other Kafkistry nodes by sending multicast to networkLOCALHOST_IP
- only discover nodes running on localhost (used mainly for tests)STATIC_IPS
- discover nodes from configured list of IP addresses- when using this option, then the
HAZELCAST_MEMBER_IPS
should be defined which will contain a list of IPs to look
- when using this option, then the
CUSTOM_IMPLEMENTATION
- an option which allows custom implementation to supply seed IPs at runtime- this option might be useful in situations where MULTICAST does not work because of various network configuration
blockers (such as firewalls, big network causing slow discovery,...), and you don't want to hard-code
HAZELCAST_MEMBER_IPS
- To provide an implementation, implement interface CustomDiscoveryIpsProvider
- See more about writing custom plugins
- this option might be useful in situations where MULTICAST does not work because of various network configuration
blockers (such as firewalls, big network causing slow discovery,...), and you don't want to hard-code
Topic wizard proposes partition count based on expected message rate. There is possibility to define thresholds for specific expected message rate onto desired partition count.
Options to default partition count for new topics:
Property | Default | Description |
---|---|---|
WIZARD_DEFAULT_PARTITIONS |
1 |
Default number of partitions to propose when rate is lower than lowest threshold defined in WIZARD_PARTITION_THRESHOLDS |
WIZARD_PARTITION_THRESHOLDS |
{"default":{"100":10,"2000":30,"5000":60}} |
This is JSON representation of data class ThresholdsConfig |
WIZARD_DEFAULT_PARTITIONS
= 1
and WIZARD_PARTITION_THRESHOLDS
=
{
"default": {
"100": 10,
"2000": 30,
"5000": 60
},
"overrides": {
"my-kafka-1": {
"0": 2,
"1000": 4
},
"my-kafka-3": {
"200": 5
}
},
"tagOverrides": {
"some-tag": {
"0": 20
}
}
}
Table shows what would be proposed partition count given some expected message rate for different clusters:
Expected rate | my-kafka-1 | my-kafka-2 | my-kafka-3 | my-kafka-4 [tag=some-tag] |
---|---|---|---|---|
10 msg/sec | 1 partition | 2 partitions | 1 partition | 20 partitions |
200 msg/sec | 2 partitions | 10 partitions | 5 partitions | 20 partitions |
3000 msg/sec | 4 partitions | 30 partitions | 5 partitions | 20 partitions |
Default implementation and UI directly asks user to enter desired name of a topic.
If specific topic naming convention is required then custom implementation of
TopicNameGenerator
can be exposed as spring @Bean
alongside with custom UI form to enter custom attributes needed.
Options for customizing topic name UI inputs:
Property | Default | Description | Requirement |
---|---|---|---|
app.topic-wizard.topic-name.template-name |
defaultWizardTopicName (implementation) |
UI component (html) for input attributes for topic name generator (default implementation only asks for name ) |
Must be classpath resource located in ui/topics/ |
app.topic-wizard.topic-name.js-name |
defaultWizardTopicName (implementation) |
UI component (js) for extracting needed custom attributes from UI to be used for TopicNameGenerator |
Must be classpath resource located in ui/static/topic/ |
app.topic-wizard.topic-name.name-label |
Check topic name |
Not important for logic, but makes more sense to define something like Generated topic name when custom generator is used |
none |
Topic wizard proposes topic property max.message.bytes
value based on expected average message size.
It is possible to define thresholds for lowest possible value wizard will generate for this property,
as well as the highest possible value.
Property | Default | Description |
---|---|---|
MIN_MAX_MESSAGE_BYTES |
0 |
Lowest possible value wizard can suggest for this property, by default there is no limit |
MAX_MAX_MESSAGE_BYTES |
10485760 (10MB) |
By default, most wizard will suggest regardless of average message size is 10 MB |
Kafkistry has utility to consume records from topic and display them in the UI. This feature is mainly intended for developer's debugging content in topics.
Since the result of consuming is translated as single HTTP-request's response, there are few limits to setup to prevent memory exhaustion.
Property | Default | Description |
---|---|---|
CONSUE_ENABLED |
true |
Enable/disable ability to consume records |
CONSUME_MAX_RECORDS |
5000 |
Maximum number of records to return in UI |
CONSUME_MAX_WAIT_MS |
120000 (2min) |
Maximum wait time before responding to user that here are no records |
CONSUME_POOL_BATCH_SIZE |
2000 |
Will be used for consumer max.poll.records property |
CONSUME_POOL_INTERVAL |
1000 (1sec) |
Will be used as argument for KafkaConsumer.pool(timeout) |
When reading from topic a new KafkaConsumer
will be created which will consumer with randomly generated
consumer group ID, and it will not perform commits while consuming.
Topic's records usually contain user sensitive data which shouldn't be visible even to developers.
This feature allows specification which fields in which topics on which kafka clusters are sensitive.
When topic is being consumed by Kafkistry, all sensitive values will be replaced with dummy ***MASKED***
value.
Rules for masking can be defined via following configuration properties:
Property | Default | Description |
---|---|---|
app.masking.rules.<my-rule-name>.target.clusters.<...cluster filter options...> |
all | From which clusters to apply masking of topic records. See cluster filtering options |
app.masking.rules.<my-rule-name>.target.topics.<...filter options...> |
all | From which topics to apply masking of topic records. See filtering options |
app.masking.rules.<my-rule-name>.value-json-paths |
none | Comma-separated list of json path field in record's deserialized value to apply masking replacement |
app.masking.rules.<my-rule-name>.key-json-paths |
none | Comma-separated list of json path field in record's deserialized key to apply masking replacement |
app.masking.rules.<my-rule-name>.headers-json-paths.<header-name> |
none | Comma-separated list of json path field in record's deserialized header to apply masking replacement |
Rules for masking can be obtained by supplying custom implementation of RecordTimestampExtractor as spring bean.
It is useful to know how old is the oldest record in topic partition.
This information gives insight what is the "effective time retention" in cases when topic has
low retention.bytes
and high throughput, which might cause effective retention to be much
less than retention.ms
.
Property | Default | Description |
---|---|---|
OLDEST_RECORD_AGE_ENABLED |
true |
Enable/disable sampling of oldest (lowest offset) record timestamp |
app.oldest-record-age.enabled-on.clusters.<...cluster filter options...> |
all | Nested properties for which clusters to enable sampling. See cluster filtering options |
app.oldest-record-age.enabled-on.topics.<...filter options...> |
all | Nested properties for which topics to enable sampling. See filtering options |
Default implementation for extraction of timestamp from ConsumerRecord
simply uses ConsumerRecord.timestamp()
method.
This behaviour can be changed by supplying custom implementation of
RecordTimestampExtractor
as spring bean.
This feature tries to detect structure of messages in topics.
Current implementation supports analysis only for JSON
serialized messages.
It works by taking samples of messages periodically. Each message goes through analysis to extract structural metadata and store this metadata in time-windowed tree index.
Result of analysis in non-deterministic because of non-deterministic selection of records to be sampled. However, analysis gives pretty good results because of assumption that all messages from one topic should have same or similar structure.
Options to configure analyzer.
Property | Default | Description |
---|---|---|
RECORD_ANALYZER_ENABLED |
true |
|
RECORD_ANALYZER_STORAGE_DIR |
kafkistry/record-analyzer-data |
Path to directory where Kafkistry will persist analyzed data on disk |
RECORD_ANALYZER_TRIM_TIME_WINDOW |
259200000 (3days) |
Width of time window. (If some field in JSON hadn't been seen for more than this time it will be removed from index) |
RECORD_ANALYZER_CONCURRENCY |
1 |
How many threads should handle analysis |
RECORD_ANALYZER_MAX_QUEUE_SIZE |
20000 |
Actual sampling is performed by different thread than analysing thread. This is max size of in-memory queue for records being sampled and waiting for analysis |
RECORD_ANALYZER_TRIM_AND_DUMP_RATE |
120000 (2min) |
How often to evict old fields from tree structure and to persist whole index on disk |
RECORD_ANALYZER_VALUE_SAMPLING_ENABLED |
true |
Should values of json field be analysed, false means that only value type is indexed |
RECORD_ANALYZER_VALUES_MAX_CARDINALITY |
25 |
Threshold for how many different values are considered to be high-cardinality |
app.record-analyzer.enabled-on.clusters.<...cluster filter options...> |
all | From which clusters to perform analysis of topic records. See cluster filtering options |
app.record-analyzer.enabled-on.topics.<...filter options...> |
all | From which topics to perform analysis of topic records. See filtering options |
app.record-analyzer.value-sampling.enabled |
true |
Weather or not to sample values of analyzer records. |
app.record-analyzer.value-sampling.max-number-abs |
1000000 |
In case of numeric field having magnitude bigger than this value will cause to mark it as too-big |
app.record-analyzer.value-sampling.max-string-length |
50 |
In case of string field having length more than this value will cause to mark it as too-big |
app.record-analyzer.value-sampling.max-cardinality |
25 |
In case of sampling sees more different values, particular field will be marked as high-cardinality field |
app.record-analyzer.value-sampling.enabled-on.clusters.<...cluster filter options...> |
all | From which clusters to sample values of topic records. See cluster filtering options |
app.record-analyzer.value-sampling.enabled-on.topics.<...filter options...> |
all | From which topics to sample values of topic records. See filtering options |
app.record-analyzer.value-sampling.included-fields |
none | Comma-separated list of json path field, sample values only from json fields of this whitelist |
app.record-analyzer.value-sampling.excluded-fields |
none | Comma-separated list of json path field, do not sample values from json fields of this blacklist |
Having information about brokers disk size is important part of capacity planing for topic configuration (number of partitions, replication factor and retention).
Unfortunately, brokers themselves do not expose disk metrics (size/free) where message logs are persisting.
Kafkistry allows for alternative ways to obtain information about disks.
This option for Kafkistry having information about disk capacity of brokers is poor man solution. The way it works is to tell Kafkistry about disk capacity via Kafkistry configuration properties. It is naive approach, but it may suffice because disk size is constant for most of the time.
Properties needed to set:
app.kafka.metrics.static.enabled: true
- If all brokers of one kafka cluster have disks with same total capacity:
app.kafka.metrics.static.clusters.<CLUSTER_IDENTIFIER>.total-default: <TOTAL_CAPACITY_BYTES>
- If all brokers of one kafka cluster don't have disks with same total capacity:
app.kafka.metrics.static.clusters.<CLUSTER_IDENTIFIER>.broker-total.<BROKER_ID>: <TOTAL_CAPACITY_BYTES>
- Both options can be used at same time, in case if 4 of 6 brokers have same disk, and 2 of 6 have different
- Example:
app.kafka.metrics.static:
enabled: true
clusters:
my-kafka-1:
total-default: 322122547200 # 300GB
broker-total:
5: 289910292480 # 270GB - broker 5 does not have 300GB
6: 289910292480 # 270GB - broker 6 does not have 300GB
my-kafka-2:
total-default: 1099511627776 # all brokers have 1TB
This option queries prometheus server to get values for broker's disk capacity and/or disk free space. This approach is suitable only if you already use prometheus and do have those metrics collected (e.g., by node_exporter).
Depending on organization of your metrics labels, there are two different ways for fetching metrics:
- one request to Prometheus for each broker
- one bulk request to Prometheus for all brokers of one cluster
Properties to configure:
app.kafka.metrics.prometheus.enabled
- Is prometheus scraping enabled, need to be
true
in order to work - default:
false
- Is prometheus scraping enabled, need to be
app.kafka.metrics.prometheus.bulk
- Weather to use bulk mode, or one-by-one broker mode
- default:
false
app.kafka.metrics.prometheus.prometheus-base-url
- Needed to generate url for making request to prometheus
- Example:
https://my-prom.server
app.kafka.metrics.prometheus.time-offset
- Time offset from now into the past
- Default:
60
(1min)
app.kafka.metrics.prometheus.http-headers.<HTTP_HEADER>
- Add any additional http-header(s) to request Kafkistry will make towards Prometheus
- It might be useful in situation if your Prometheus is sitting behind a proxy which requires some form of authentication header
- Example:
app.kafka.metrics.prometheus.http-headers.X-AUTH-PROM-TOKEN: abcdef0123456789
app.kafka.metrics.prometheus.total-prom-query
- PromQL template for fetching total disk capacity
- Example of query for metrics exported by node_exporter
- single query:
node_filesystem_size{mountpoint='/my/used/mountpoint', instance=~'(?i){brokerHost}.*'}
- bulk query:
node_filesystem_size{mountpoint='/my/used/mountpoint', instance=~'(?i)({brokerHosts}).*'}
- single query:
- NOTE:
{brokerHost}
is a template variable which will be injected into the query (it's broker's advertised hostname).{brokerHosts}
is a template variable of all broker hostnames separated by|
app.kafka.metrics.prometheus.free-prom-query
- PromQL template for fetching free disk capacity.
- Example of query for metrics exported by node_exporter
- single query:
node_filesystem_free{mountpoint='/my/used/mountpoint', instance=~'(?i){brokerHost}.*'}
- bulk query:
node_filesystem_free{mountpoint='/my/used/mountpoint', instance=~'(?i)({brokerHosts}).*'}
- single query:
app.kafka.metrics.prometheus.broker-label-name
- Needed if
bulk: true
- Name of label which holds broker host name (needed to match metric sample with specific broker host)
- Example (corresponding to our PromQL example):
instance
- Needed if
app.kafka.metrics.prometheus.broker-label-host-extract-pattern
- Needed if
bulk: true
- Pattern to extract broker's host from label value (needed to match metric sample with specific broker host)
- Default value:
(.*)
- Example (corresponding to our PromQL example):
- Example with node exporter will likely have node exporter's endpoint inside
instance
label which will contain a port where node exporter exports metrics (for example, value might look likemy-broker-1:9100
). Here, we are interested in only inmy-broker-1
part, without port9100
- example pattern:
^(.*):[0-9]+
- Example with node exporter will likely have node exporter's endpoint inside
- Needed if
In case when above listed mechanisms can't fulfill your needs, there is option to implement your own mechanism for obtaining broker's total disk capacity and free disk space.
Implement interface NodeDiskMetricsProvider which could obtain needed metrics with your mechanism of choice.
See more about writing custom plugins
Kafkistry supports simple detection of KStreams applications being deployed. It works by:
- finding consumer group using partition assignor
streams
- topics consumed by that consumer group are marked as inputTopics of KStream app
- topics having name
<consumerGroup/kStreamAppId>-KSTREAM-*
are marked as internalTopics of KStream app
Property | Default | Description |
---|---|---|
app.kstream.detection.enabled-on.clusters.<...cluster filter options...> |
all | On which clusters to detect KStreams. See cluster filtering options |
app.kstream.detection.enabled-on.consumer-groups.<...filter options...> |
all | For which consumer groups to detect KStreams. See filtering options |
app.kstream.detection.enabled-on.topics.<...filter options...> |
all | Which topics to include in KStreams detection. See filtering options |
Kafkistry inspects various aspects of a topic on specific cluster. Main aspects are existence of topic where it's expected to exist, does it have expected configuration, etc...
Topic inspection can be enriched with custom implementations of TopicExternalInspector.
All built-in extra inspectors are enabled by default. Particular inspectors can be enabled or disabled using properties:
app.topic-inspect.enabled-inspectors
app.topic-inspect.disabled-inspectors
where value is list of fully qualified class names to enable/disable.
Example:
app.topic-inspection:
enabled-inspectors:
- com.infobip.kafkistry.service.topic.inspectors.TopicEmptyInspector
- com.infobip.kafkistry.service.topic.inspectors.TopicUnderprovisionedRetentionInspector
-
TopicUnderprovisionedRetentionInspector
- Check if topic has partitions such that configured
retention.bytes
is too low so that oldest messages are deleted before reachingretention.ms
- When this happens it usually means traffic rate is high so that topic doesn't have time based retention of
retention.ms
as one might expect -
Property Default Description app.topic-inspection.underprovisioned-retention.min-oldness-ratio-to-retention-ms
0.8
Trigger this status when oldest record in partition is younger than 80% of topic's retention.ms
app.topic-inspection.underprovisioned-retention.required-ratio-to-retention-bytes
0.8
Prevent this status from triggering unless partition is "loaded" with data; i.e. partition size is at least 80% of topic's retention.bytes
app.topic-inspection.underprovisioned-retention.drastic-oldness-ratio-to-retention-ms
0.1
Trigger drastic variant of this status when oldest record in partition is younger than 10% of topic's retention.ms
- Check if topic has partitions such that configured
-
TopicOverprovisionedRetentionInspector
- Check if topic has partitions such that configured
retention.bytes
is mush bigger than actual size of partition because messages are being deleted before byretention.ms
- When this happens it usually means traffic rate is low
-
Property Default Description app.topic-inspection.overprovisioned-retention.min-used-ratio-to-retention-bytes
0.2
Trigger this status when partition size is 20% or less than topic's retention.bytes
app.topic-inspection.overprovisioned-retention.required-ratio-to-retention-ms
0.8
Prevent this status from triggering unless messages are being deleted by time retention; i.e. oldest record being at old at least as 80% of topic's retention.ms
app.topic-inspection.overprovisioned-retention.ignore-below-retention-bytes
10485760
(10MB)Don't trigger for small topics; i.e. having retention.bytes
less than 10MB
- Check if topic has partitions such that configured
Secondary aspects of topic inspection are various (and custom pluggable) implementations of ValidationRule.
Any rule that raises violation will be shown as CONFIG_RULE_VIOLATION
.
All built-in validation rules are enabled by default. Particular rules can be enabled or disabled using properties:
app.topic-validation.enabled-rules
app.topic-validation.disabled-rules
where value is list of fully qualified class names to enable/disable.
Example:
app.topic-validation:
disabled-rules:
- com.infobip.kafkistry.service.validation.rules.MinInSyncReplicasTooBigRule
- com.infobip.kafkistry.service.validation.rules.SizeRetentionEnoughToCoverRequiredRule
-
SegmentSizeToRetentionBytesRatioRule
- Verifies that topic's segment size is not too big nor too small comparing to retention size
-
Property Default Description app.topic-validation.segment-size-rule.max-ratio-to-retention-bytes
0.5
Raises violation if topic's segmment.bytes
>0.5 * retention.bytes
app.topic-validation.segment-size-rule.min-ratio-to-retention-bytes
0.02
Raises violation if topic's segmment.bytes
<0.02 * retention.bytes
-
- In most cases, when using KStreams there is requirement that joining topics have the same partition count so called co-partitioning.
- This rule exists to warn before proceeding to change partition count of one of topics involved in specific KStream application
- This rule might raise false positive violation in cases when co-partitioning is not requirement. In such cases this rule can be disabled for particular topic, or whole rule can be disabled.
-
Property Default Description app.topic-validation.kstream-partition-count.enabled-on.clusters.<...cluster filter options..>
all On which clusters to check KStream's topics partition count for co-partition match, see cluster options app.topic-validation.kstream-partition-count.enabled-on.topics.<...filter options...>
all For which topics to check KStream's topics partition count for co-partition match, see options
Kafkistry inspects the status of each ACL rule, for example is it OK, MISSING, UNKNOWN, etc.
Beside that basic inspection, there is checking for ACL rules conflict.
Currently, there are two types of conflicts detection implemented:
- Consumer group conflict
- Triggered when there are at least two different principals which have READ permission on some
group.id
which would allow all of them joining to same consumer group at the same time
- Triggered when there are at least two different principals which have READ permission on some
- Transactional ID conflict
- Triggered when there are at least two different principals which have WRITE permission on some
transactional.id
which would allow all of them using same transactional id at the same time
- Triggered when there are at least two different principals which have WRITE permission on some
Those checks can be disabled globally or per specific cluster:
Property | Default | Description |
---|---|---|
ACL_GROUP_CONFLICT_ENABLED |
true | Flag to globally enable/disable consumer group conflict checking. |
app.acl.conflict-checking.consumer-groups.enabled-on-clusters.<...cluster filter options...> |
all | Selective options to filter for which clusters to perform consumer groups ACL conflict checking. See cluster filter options |
ACL_TRANSACTIONAL_ID_CONFLICT_ENABLED |
true | Flag to globally enable/disable transactional id conflict checking. |
app.acl.conflict-checking.transactional-ids.enabled-on-clusters.<...cluster filter options...> |
all | Selective options to filter for which clusters to perform transactional ids ACL conflict checking. See cluster filter options |
ACL rule is considered "detached" when it references resource (topic/consumer group) which doesn't exist. DETACHED status is warning type which it not problematic on its own but could indicate that corresponding ACL is no longer needed.
Configuration properties:
Property | Default | Description |
---|---|---|
app.acl-inspect.detached.allow-global-check |
false |
When false detached issue will be raised if ACL does not reference any resource (topic/group) looking on resources on same kafka cluster. When true then ACL on cluster c1 referencing resource r1 won't raise issue even though r1 is not found on c1 but is found on some other cluster (say c2), but it will still raise issue if r1 is not found on any cluster. |
app.acl-inspect.detached.ignore-any-user |
false |
When true inspection will ignore possible detach issue for principal User:* . |
There are inspectors on per cluster level looking for issues. Those can be enabled/disables/configured.
Existing checkers are enabled by default, but can be disabled with following property:
-
app.clusters-inspect.excluded-checker-classes
where value is list of fully qualified class names to disable/exclude from evaluation. -
ClusterDisbalanceIssuesChecker
- It reports issues when there is uneven disk usage/replica count/leaders count on brokers within same cluster
-
Property Default Description app.clusters-inspect.disbalance.disk-usage.enabled
true
Is disk usage disbalance checked. app.clusters-inspect.disbalance.disk-usage.max-acceptable-percent
20
(%)Trigger the issue if difference between max disk usage in cluster comparing to min disk usage in cluster is more than percent of average disk usage. app.clusters-inspect.disbalance.disk-usage.min-usage-threshold-percent
10
(%)Won't trigger the issue when disk usage on all brokers is less then given this threshold percentage. (to avoid noisy issue for low utilized clusters). This suppressing issue feature depends on Kafkistry's ability to get disk capacity of brokers. app.clusters-inspect.disbalance.replica-count.enabled
true
Is broker's replica count disbalance checked. app.clusters-inspect.disbalance.replica-count.max-acceptable-percent
10
(%)Trigger the issue if difference between max replica count broker comparing to min replica count in cluster is more than percent of average replica count per broker in cluster. app.clusters-inspect.disbalance.leader-count.enabled
true
Is broker's leader count disbalance checked. app.clusters-inspect.disbalance.leader-count.max-acceptable-percent
10
(%)Trigger the issue if difference between max leader count broker comparing to min leader count in cluster is more than percent of average leader count per broker in cluster.
-
ClusterReplicationThrottleChecker
- It reports issues when there is replication throttle being set
-
- It reports issues when disk usage is "significant"
- It reports issue if all topic's replicas
retention.bytes
hosted on each broker add up to more than broker's disk capacity. - Depends on Kafkistry's ability to get disk capacity of brokers
- Meaning what is meaning of "significant" can be defined via following properties.
-
Property Default Description app.resources.thresholds.low
65.0
(%)Upper bound for usage percentage to be classified as low. app.resources.thresholds.medium
90.0
(%)Upper bound for usage percentage to be classified as medium. (above is high)
-
- It reports issues when there is un-even number of brokers having same
broker-rack
defined. This check is there to ensure that brokers are evenly spread onto different racks/availability-zones. - By default, when all brokers have
broker-rack=null
or somebroker-rack=<CUSTOM_UNDEFINED_VALUE>
situation is then considered as OK. -
Property Default Description app.clusters-inspect.rack.undefined-rack-value
""
(empty)Custom specific value that is considered as "undefined" app.clusters-inspect.rack.tolerate-undefined
true
When true
then issue will be raised if any of brokers has undefinedbroker.rack
app.clusters-inspect.rack.tolerate-all-qual
false
When false
then issue will be raised if all brokers have same non-undefinedbroker.rack
app.clusters-inspect.rack.strict-balance
false
When true
then strict balance is required, meaning that there is exact equal numbers of brokers with samebroker.rack
. Note, strict balance is not possible in situations where number of brokers is not divisible by number of differentbroker.rack
-values
- It reports issues when there is un-even number of brokers having same
Custom checker can be implemented via interface ClusterIssueChecker and being picked up by Spring Framework as bean. (see more on Writing custom plugins)
Kafkistry has variety of different metadata about clusters, topics, consumer groups, etc... All this metadata (as well as some inspection statuses) are periodically stored into embedded SQLite database.
Having ability to make SQL queries with arbitrary joins and filtering makes it very useful tool for getting insights about anything that's available in all data. SQL is simply more expressive than many custom UI's.
You can check schema help and query examples in the UI.
SQL feature uses SQLite
which requires local disk storage.
Properties to configure:
Property | Default | Description |
---|---|---|
SQL_ENABLED |
true |
|
SQLITE_DB_DIR |
kafkistry/sqlite |
Path to directory where SQLite should store its DB. Can be absolute path or relative to working dir |
SQL_AUTO_CREATE_DIR |
true |
If true and specified directory SQLITE_DB_DIR doesn't exist, Kafkistry will attempt to create it before using |
SQL_CLICKHOUSE_OPEN_SECURITY |
false |
Is endpoint for accessing SQL rest API (ClickhouseDB-like API) accessible anonymously (without session and without authentication) |
Kafkistry is capable of exposing its SQL DB access through API similar to ClickhouseDB's API.
Http endpoint which exposes it is:
HTTP GET $HTTP_ROOT_PATH/api/sql/click-house
For example: (depending on HTTP_ROOT_PATH
, see web configuration):
HTTP GET /api/sql/click-house
HTTP GET /kafkistry/api/sql/click-house
HTTP GET /your-http-root-path/api/sql/click-house
This endpoint serves as integration adapter for other tools such as Grafana or Redash.
- Grafana
- It is possible to query Kafkistry SQL directly from Grafana
- setup steps:
- install plugin
ClickHouse by Vertamedia
to your Grafana - add new data source in Grafana for Clickhouse plugin
- URL:
http://<your-kr-host>:8080/<your-http-root-path>/api/sql/click-house
- Whitelisted cookies:
KRSESSIONID
- Auth:
Basic auth
(Enter username/password to Basic Auth Details)- consider adding dedicated user for Grafana with role
READ_SERVICE
into static users list
- consider adding dedicated user for Grafana with role
- install plugin
- Redash
- It is possible to query Kafkistry's SQL directly from Redash
- Setup is almost the same as for Grafana except that Redash doesn't offer to whitelist session cookie,
for that reason, it's needed to set
SQL_CLICKHOUSE_OPEN_SECURITY: true
- add
ClickHouse
data source - Url:
http://<your-kr-host>:8080/<your-http-root-path>/api/sql/click-house
- Username and password can be blank when Kafkistry is deployed with
SQL_CLICKHOUSE_OPEN_SECURITY: true
- Database:
kr
- add
Autopilot is functionality that Kafkistry performs some actions on Kafka clusters on its own without being externally triggered by user through UI or API call.
There are various aut actions implemented, for example:
CreateMissingTopicAction
- will be triggered immediately when autopilot detects that topic should exist but it doesn't exist on kafkaDeleteUnwantdACLAction
- will be triggered immediately when autopilot detects that some existing ACL rule shouldn't exist- ...
Autopilot is beta feature, and it's recommended to be disabled. Reason is that we need to gain confidence in correctness of action being blocked checkers. Even though, thr are multiple checkers in place there is still possibility that something is overlooked and that autopilot will perform some action even though it should wait. (for example: new topic shouldn't be created if some kafka broker is in maintenance)
If you want to enable autopilot, it's recommended to do so only if your state on cluster exactly matches whatever is defined in Kafkistry's repository. Otherwise, autopilot might start to create/delete/alter many things.
Property | Default | Description |
---|---|---|
AUTOPILOT_ENABLED |
true |
Completely enable/disable creation of autopilot components. When disabled, no periodic checking for actions will occur. |
AUTOPILOT_ACTIONS_ENABLED |
false |
Enable/disable all autopilot actions. When false no actions will be executed, when true other filters may apply. |
app.autopilot.actions.action-type.enabled-classes |
all | List of enabled implementations of AutopilotAction class names. When empty, any action is enabled. |
app.autopilot.actions.action-type.disabled-classes |
none | List of disabled implementations of AutopilotAction class names. When empty, any action is enabled. |
app.autopilot.actions.target-type.enabled-values |
all | List of enabled target types for particular action. Known target types are: [TOPIC , ACL ]. When empty, then any target type is enabled. |
app.autopilot.actions.target-type.disabled-values |
none | List of disabled target types for particular action. Known target types are: [TOPIC , ACL ]. When empty, then any target type is enabled. |
app.autopilot.actions.binding-type.enabled-classes |
all | List of enabled implementations of AutopilotBinding class names. When empty, actions coming from any binding are enabled. |
app.autopilot.actions.binding-type.disabled-classes |
none | List of disabled implementations of AutopilotBinding class names. When empty, actions coming from any binding are enabled. |
app.autopilot.actions.cluster.<...filter options...> |
all | Actions targeting which clusters to enable. See filtering options |
app.autopilot.actions.attribute.<...action attribute...>.enabled-values |
all | List of enabled attribute values for actions that have attribute named <...action attribute> . |
app.autopilot.actions.attribute.<...action attribute...>.disabled-values |
all | List of disabled attribute values for actions that have attribute named <...action attribute> . |
AUTOPILOT_STORAGE_DIR |
kafkistry/autopilot |
Path to directory where Kafkistry will store information about Autopilot actions and their outcomes. |
app.autopilot.repository.limits.max-count |
1000 |
Maximal number of different action executions to keep in storage. |
app.autopilot.repository.limits.retention-ms |
604800000 (7 days) |
Delete older action outcomes than specified retention. |
app.autopilot.repository.limits.max-per-action |
50 |
Maximal number of stored state changes per each different action. |
app.autopilot.cycle.repeat-delay-ms |
10000 (10 sec) |
How often to execute one cycle/round of autopilots discovery/checking/execution of actions. |
app.autopilot.cycle.after-startup-delay-ms |
60000 (1 min) |
How long after application startup should autopilot start periodic executions. |
app.autopilot.attempt-delay-ms |
60000 (1 min) |
Once action is tried to execute, how long to wait for re-attempt in case of failure. |
app.autopilot.pending-delay-ms |
10000 (10 sec) |
Once action is discovered and without any blockers, how long to wait for execution. (This is workaround for issue that kafka node can return empty list of Topics/ACLs/Quotas shortly after restart, causing erroneous actions. Value should be >= CLUSTER_POOL_INTERVAL) |
app.autopilot.cluster-requirements.stable-for-last-ms |
600000 (10 min) |
In order for action to be considered for execution, require cluster scraping to be constantly stable for this period of time. |
app.autopilot.cluster-requirements.used-state-providers.<...filter options...> |
all except: disc metrics, record sampling | Which kafka state providers to use for indication if cluster is stable or not. See available names as implementations of AbstractKafkaStateProvider, to change defaults/include/exclude specific, see filtering options |
Custom actions can be implemented via interface AutopilotBinding. Such implementation needs to be exposed as Spring bean to b picked up by Autopilot, and it will be handled in the same way as built-in implementations. (see more on Writing custom plugins)
It is possible to set up Kafkistry to send notification's to slack
channel on each update in Kafkistry
repository or on some action on kafka cluster.
Properties to set:
Property | Default | Description |
---|---|---|
SLACK_ENABLED |
false |
|
SLACK_CHANNEL_ID |
none | ID of slack channel to send notifications into |
SLACK_SECRET_TOKEN |
none | Secret token to allow Kafkistry to authenticate on Slack's API, you'll need create slack bot application for your Slack workspace |
SLACK_PROXY_HOST |
none | Optional property. Use it if your Kafkistry deployment does not have access to public internet and must go through SOCKS proxy to be able to access Slack' API |
SLACK_PROXY_PORT |
none | Optional property. read description for SLACK_PROXY_HOST |
SLACK_LINK_BASE_HOST |
none | User accessible base url for Kafkistry UI. Needed to generate message URL(s) properly. Example value: https://my-kafka-proxy.local |
SLACK_ENVIRONMENT_NAME |
n/a |
Optional property. Useful if you have different Kafkistry(s) deployed in different environments and when message appears in slack channel to be abe to distinguish where it came from. |
SLACK_INCLUDE_AUTOPILOT |
true |
Optional property. Useful if you wish to exclude autopilot generated audit events in slack, then set this to false . |
Kafkistry supports recognizing JIRA
's issue pattern (ABC-123) and generating links in UI pointing directly
to jira that you use.
This is more UI sugar than real integration where jira would be aware of Kafkistry.
Property | Example |
---|---|
JIRA_BASE_URL |
https://your-jira.com/browse/ |
When having JIRA_BASE_URL, Kafkistry UI will inject links to JIRA
<a target='_blank' href="https://your-jira.com/browse/ABC-123">
ABC-123
</a>
Property | Default | Description |
---|---|---|
CUSTOM_JS_SCRIPTS_CSV |
empty | Comma separated list of script files to be included into each UI view page (html) |
CUSTOM_USER_DETAILS_TEMPLATE |
empty | UI component (html) for displaying extra user details. It's a name of *.ftl resource to be included in every UI page. This is shown in main menu on user's hover for mode details. |
app.ui.caching |
true |
Enables caching of static UI resources to speed up pages load, useful to set to false for development purposes. |
app.ui.image.dir-path |
default |
Classpath directory which contain image files. Must be on classpath located in ui/static/img/ |
app.ui.image.icon |
icon.png (resource) |
Browser tab icon image filename to be used in UI. Must be located in ui/static/img/${app.ui.image.dir-path}/ |
app.ui.image.logo |
logo.png (resource) |
Logo image filename to be used in UI. Must be located in ui/static/img/${app.ui.image.dir-path}/ |
app.ui.image.banner |
banner.png (resource) |
Homepage banner image filename to be used in UI. Must be located in ui/static/img/${app.ui.image.dir-path}/ |
app.hostname.value |
(empty/none) | Name of hostname to be shown in UI as Served by: <hostname> |
app.hostname.property |
(empty/none) | Name of system property or environment variable to resolve hostname from, to be shown in UI as Served by: <hostname> |
FORCE_TAG_FOR_PRESENCE |
false | When set to true then only Cluster Tag presence option for topics/acls/quotas will be enabled while All/Only on/Not on will be disabled. |
Property | Default | Description |
---|---|---|
app.consumer-group-principal-resolver.refresh-ms |
20000 (20sec) |
How often to refresh relations between consumer groups and principals that have ACL permission to join those groups. |
app.consumer-group-principal-resolver.cache-enabled |
true |
Weather or not to persist cached relations between consumer groups and principals that have permission to join specific consumer groups. |
CONSUMER_GROUPS_PRINCIPALS_CACHE_DIR |
kafkistry/consumer-group-principal-relations |
Directory where to persist local cache of consumer group to principal and principal owners relations. |
Kafkistry uses logback as logging framework.
Various appenders are defined conditionally by LOGGING_TARGETS
(environment variable) or logging.targets
(config property)
Here are examples of defined LOGGING_TARGETS
and corresponding outcome
Example value | Console output | File(s) | Logstash |
---|---|---|---|
undefined | ENABLED |
n/a | n/a |
"" (empty) |
n/a | n/a | n/a |
console |
ENABLED |
n/a | n/a |
file |
n/a | ENABLED |
n/a |
logstash |
n/a | n/a | ENABLED |
file,logstash |
n/a | ENABLED |
ENABLED |
file,console |
ENABLED |
ENABLED |
n/a |
console,file,logstash |
ENABLED |
ENABLED |
ENABLED |
By default, Kafkistry will output all logging to console output (stdout). Unless, specified differently by
LOGGING_TARGETS
or logging.targets
Logging to files can be enabled if file
is defined in:
LOGGING_TARGETS
(orlogging.targets
) env/property containingfile
Each file logging appender is configured to roll maximum of 10
last files, each having 6000kB
max.
If environment variable LOGSTASH_HOST
is present and LOGGING_TARGETS
(or logging.targets
) env/property contains logstash
,
Kafkistry will send logging to it.
Properties to configure for logstash appender:
Property | Default | Description |
---|---|---|
LOGSTASH_HOST |
n/a | REQUIRED: URL to logstash host where appender should send logging requests. |
LOGSTASH_FACILITY |
kafkistry |
Which facility to declare with each logging message. |
LOGSTASH_LEVEL |
DEBUG |
Max logging level to be sent to logstash. Available levels: OFF , ERROR , WARN , INFO , DEBUG , TRACE , ALL |
Kafkistry is JVM-based application written in Kotlin and using Spring framework for backend, and jQuery with Bootstrap on frontend.
You might need to extend Kafkistry with custom functionality/logic. This section describes how to write custom logic on top of Kafkistry core implementation.
NOTE: following sections will describe one of possible ways to do it.
Kafkistry uses Maven for dependency management.
Simplest way to start with minimal dependency issues is to inherit Kafkistry parent pom.
Example of your pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.infobip.kafkistry</groupId>
<artifactId>kafkistry-parent</artifactId>
<version>{newest-release-version}</version>
</parent>
<groupId>org.mycompany</groupId>
<artifactId>my-company-kafkistry</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<docker.skip-build>false</docker.skip-build>
<docker.to-image>mycompany-kafkistry:${project.version}</docker.to-image>
<docker.main-class>org.mycompany.kafkistry.MyCompanyKafkistryKt</docker.main-class>
</properties>
<dependencies>
<dependency>
<groupId>com.infobip.kafkistry</groupId>
<artifactId>kafkistry-app</artifactId>
<version>${parent.version}</version>
</dependency>
<dependency>
<groupId>com.infobip.kafkistry</groupId>
<artifactId>kafkistry-app</artifactId>
<version>${parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- might want to include spring-test and etc...-->
</dependencies>
</project>
Then write your main class as follows:
package org.mycompany.kafkistry
import com.infobip.kafkistry.Kafkistry
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Import
@SpringBootApplication
@Import(Kafkistry::class)
class MyCompanyKafkistry
fun main(args: Array<String>) {
runApplication<MyCompanyKafkistry>(*args) {
setAdditionalProfiles("defaults")
}
}
From there you can star writing custom implementations of your components,
just make sure that any of your custom bean is picked up by spring framework
(i.e. annotated with @Component
, @Bean
function inside @Configuration
, @Contoller
, etc...)
NOTE: given example is written in Kotlin, but you can write it in Java or even Scala.