The Kinesis consumer plugin reads from a Kinesis data stream and creates metrics using one of the supported input data formats.
[[inputs.kinesis_consumer]]
## Amazon REGION of kinesis endpoint.
region = "ap-southeast-2"
## Amazon Credentials
## Credentials are loaded in the following order
## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
## 2) Assumed credentials via STS if role_arn is specified
## 3) explicit credentials from 'access_key' and 'secret_key'
## 4) shared profile from 'profile'
## 5) environment variables
## 6) shared credentials file
## 7) EC2 Instance Profile
# access_key = ""
# secret_key = ""
# token = ""
# role_arn = ""
# web_identity_token_file = ""
# role_session_name = ""
# profile = ""
# shared_credential_file = ""
## Endpoint to make request against, the correct endpoint is automatically
## determined and this option should only be set if you wish to override the
## default.
## ex: endpoint_url = "http://localhost:8000"
# endpoint_url = ""
## Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName"
## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported)
# shard_iterator_type = "TRIM_HORIZON"
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
##
## The content encoding of the data from kinesis
## If you are processing a cloudwatch logs kinesis stream then set this to "gzip"
## as AWS compresses cloudwatch log data before it is sent to kinesis (aws
## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding
## is done automatically by the golang sdk, as data is read from kinesis)
##
# content_encoding = "identity"
## Optional
## Configuration for a dynamodb checkpoint
[inputs.kinesis_consumer.checkpoint_dynamodb]
## unique name for this consumer
app_name = "default"
table_name = "default"
Kinesis:
- DescribeStream
- GetRecords
- GetShardIterator
DynamoDB:
- GetItem
- PutItem
The DynamoDB checkpoint stores the last processed record in a DynamoDB. To leverage this functionality, create a table with the following string type keys:
Partition key: namespace
Sort key: shard_id