Skip to content

A specialized Amazon Kinesis stream reader (based on the Amazon Kinesis Connector Library) that can help you deliver data from Amazon CloudWatch Logs to any other system in near real-time using a CloudWatch Logs Subscription Filter.

License

Notifications You must be signed in to change notification settings

Infra-dev/cloudwatch-logs-subscription-consumer

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

27 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

CloudWatch Logs Subscription Consumer

The CloudWatch Logs Subscription Consumer is a specialized Amazon Kinesis stream reader (based on the Amazon Kinesis Connector Library) that can help you deliver data from Amazon CloudWatch Logs to any other system in near real-time using a CloudWatch Logs Subscription Filter.

The current version of the CloudWatch Logs Subscription Consumer comes with built-in connectors for Elasticsearch and Amazon S3, but it can easily be extended to support other destinations using the Amazon Kinesis Connector Library framework.

One-Click Setup: CloudWatch Logs + Elasticsearch + Kibana

This project includes a sample CloudFormation template that can quickly bring up an Elasticsearch cluster on Amazon EC2 fed with real-time data from any CloudWatch Logs log group. The CloudFormation template will also install Kibana 3 and Kibana 4.1, and it comes bundled with a few sample Kibana 3 dashboards for the following sources of AWS log data:

You can also connect your Elasticsearch cluster to any other custom CloudWatch Logs log group and then use Kibana to interactively analyze your log data with ad-hoc visualizations and custom dashboards.

If you already have an active CloudWatch Logs log group, you can launch a CloudWatch Logs + Elasticsearch + Kibana stack right now with this launch button:

Launch your Elasticsearch stack fed by CloudWatch Logs data

You can find the CloudFormation template in: configuration/cloudformation/cwl-elasticsearch.template

NOTE: This template creates one or more Amazon EC2 instances, an Amazon Kinesis stream and an Elastic Load Balancer. You will be billed for the AWS resources used if you create a stack from this template.

Your CloudFormation stack may take about 10 minutes to create. Once its status transitions to CREATE_COMPLETE you can navigate to the Outputs tab to get the important URLs for your stack.

CloudFormation Output

Sample Kibana 3 Dashboards (Click to Expand)

The following are snapshots of the sample Kibana 3 dashboards that come built-in with the provided CloudFormation stack. Click on any of the screenshots below to expand to a full view.

Amazon VPC Flow Logs

VPC Flow Logs Sample Dashboard

AWS Lambda

Lambda Sample Dashboard

AWS CloudTrail

CloudTrail Sample Dashboard

Setting up Kibana 4 for CloudWatch Logs

The CloudFormation template sets up Kibana 3 with the correct Elasticsearch index patterns for this application, but Kibana 4 needs to be configured manually. When you visit the Kibana 4 URL for the first time you will be prompted to configure an index pattern where you have to:

  • Turn on "Index contains time-based events"
  • Turn on "Use event times to create index names"
  • Pick "Daily" for the "Index pattern interval" field
  • Enter [cwl-]YYYY.MM.DD for the "Index name or pattern" field
  • Choose @timestamp for the "Time-field name"

Then you can go ahead and create the index pattern and start using Kibana 4 with data from CloudWatch Logs. Once the index pattern is configured, you can use the Discover, Visualize and Dashboards sections to interact with your CloudWatch Logs data.

Kibana 4 Discover section with VPC Flow Logs

Kibana 4 Discover

Kibana 4 Dashboard section with VPC Flow Logs

Kibana 4 Dashboard

Elasticsearch Administration

The CloudFormation template also installs the kopf plugin which allows you to monitor and manage your Elasticsearch cluster from a web interface.

Kopf Web Interface

Getting CloudWatch Logs data indexed in Elasticsearch

JSON Data

The CloudWatch Logs Subscription Consumer will automatically put log event messages that are valid JSON as Object fields in Elasticsearch. Elasticsearch is able to understand these Object types and their inner hierarchies, providing query support for all the inner fields. You do not have to specify anything beyond the source log group in the CloudFormation input parameters to have JSON data indexed in Elasticsearch.

Fixed-Column Data

Other log events that have a fixed-column format (such as traditional web server access logs) can get indexed easily in Elasticsearch by defining the field names in the CloudWatch Logs subscription filter pattern using the Filter Pattern Syntax. For example, if you had log data in this format:

127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif" 200 2326

... then you can use the following subscription filter pattern:

[ip, user_identifier, user, timestamp, request, status_code, response_size]

... and its fields would get automatically indexed in Elasticsearch using the specified column names.

Filter patterns can also be used to restrict what flows from CloudWatch Logs to Elasticsearch. You can set conditions on any of the fields. Here are a few examples that would match the sample log event from above:

  1. Equals condition on one field:

[ip, user_identifier, user, timestamp, request, status_code = 200, response_size]

  1. Prefix match on one field:

[ip, user_identifier, user, timestamp, request, status_code = 2*, response_size]

  1. OR condition on one field:

[ip, user_identifier, user, timestamp, request, status_code = 200 || status_code = 400, response_size]

  1. BETWEEN condition on one field:

[ip, user_identifier, user, timestamp, request, status_code >= 200 && status_code <= 204, response_size]

  1. Conditions on multiple fields:

[ip != 10.0.0.1, user_identifier, user, timestamp, request, status_code = 200, response_size]

  1. Compound conditions:

[(ip != 10.* && ip != 192.*) || ip = 127.*, user_identifier, user, timestamp, request, status_code, response_size]

Other Less-Structured Data

The last field in a subscription filter pattern is always greedy, and in case the log event message has more fields than what is expressed in the filter, all additional data would get assigned to the last field. For example, the following filter pattern:

[timestamp, request_id, event]

... would match this log event message:

2015-07-08T01:42:25.679Z 8bd492bcaede Decoded payload: Hello World

... and the indexed fields in Elasticsearch would be:

{
  "timestamp": "2015-07-08T01:42:25.679Z",
  "request_id": "8bd492bcaede",
  "event": "Decoded payload: Hello World"
}

If one of the fields were to contain a valid JSON string, it would get put as an Object field in Elasticsearch rather than as an escaped JSON string. For example, using the [timestamp, request_id, event] filter pattern against the following log event:

2015-07-08T01:42:25.679Z 8bd492bcaede { "payloadSize": 100, "responseCode": "HTTP 200 OK" }

... would result in the following Elasticsearch document:

{
  "timestamp": "2015-07-08T01:42:25.679Z",
  "request_id": "8bd492bcaede",
  "event": {
      "payloadSize": 100, 
      "responseCode": "HTTP 200 OK" 
    }
  }
}
Indexing Amazon VPC Flow Logs

The sample Kibana dashboard for Amazon VPC Flow Logs that comes built-in with the provided CloudFormation stack expects a CloudWatch Log subscription with the following filter pattern.

[version, account_id, interface_id, srcaddr, dstaddr, srcport, dstport, protocol, packets, bytes, start, end, action, log_status]

If you choose "Amazon VPC Flow Logs" in the LogFormat parameter of the CloudFormation template, the subscription filter will get created with the above filter pattern automatically.

If you prefer to only have a subset of the VPC Flow Logs going to Elasticsearch, you can choose "Custom" for the LogFormat parameter, and then specify the above filter pattern with conditions on some of the fields. For example, if you are only interested in analyzing rejected traffic, you can add the condition action = REJECT on the action field.

Indexing AWS Lambda Logs

The sample Kibana dashboard for AWS Lambda that comes built-in with the provided CloudFormation stack expects a CloudWatch Log subscription with the following filter pattern.

[timestamp=*Z, request_id="*-*", event]

If you choose "AWS Lambda" in the LogFormat parameter of the CloudFormation template, the subscription filter will get created with the above filter pattern automatically.

In your JavaScript Lambda functions you can use the JSON.stringify method for logging structured data that would get automatically indexed in Elasticsearch. For example, the following is a slightly modified example from the Kinesis Process Record Lambda template that logs some JSON structured data using JSON.stringify. The first one simply logs the entire Kinesis record, and the second one logs some statistics on the function's activity and performance:

exports.handler = function(event, context) {
    var start = new Date().getTime();
    var bytesRead = 0;

    event.Records.forEach(function(record) {
        // Kinesis data is base64 encoded so decode here
        payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
        bytesRead += payload.length;

        // log each record
        console.log(JSON.stringify(record, null, 2));
    });

    // collect statistics on the function's activity and performance
    console.log(JSON.stringify({ 
        "recordsProcessed": event.Records.length,
        "processTime": new Date().getTime() - start,
        "bytesRead": bytesRead,
    }, null, 2));
    
    context.succeed("Successfully processed " + event.Records.length + " records.");
};

The following snapshot from Kibana shows how the recordsProcessed and bytesRead got indexed in Elasticsearch and rendered as a graph in Kibana. Click on the image to expand to a full view of the sample dashboard for AWS Lambda that comes built-in with the provided CloudFormation stack:

Lambda Sample Dashboard Detail

Indexing AWS CloudTrail Logs

The sample Kibana dashboard for AWS CloudTrail that comes built-in with the provided CloudFormation stack does not require a subscription filter pattern because CloudTrail data is always valid JSON. If you prefer to only have a subset of the CloudTrail Logs going to Elasticsearch, you can choose "Custom" for the LogFormat parameter of the CloudFormation template, and then specify a JSON filter using the Filter Pattern Syntax. Here's an example of a valid JSON filter pattern that would filter the subscription feed to events that were made by Root accounts against the Autoscaling service:

{$.userIdentity.type = Root && $.eventSource = autoscaling*}

Elasticsearch Access Control

The current version of the CloudFormation template allows you to configure two basic methods for controlling access to the Elasticsearch API and Kibana UI:

This is generally considered an insufficient level of access control for clusters holding confidential data. It is highly recommended that you put additional security measures and access control mechanisms before you use this stack with production data.

The nginx setup can be easily modified to enable other security and access control features, such as:

  • Adding HTTPS support to authenticate the endpoint and protect the Basic Auth credentials.
  • Adding HTTPS Client Authentication to restrict access to authorized users only.

You can find the nginx configuration used by the CloudFormation template in: configuration/nginx/nginx.conf

Building from source

Once you check out the code from GitHub, you can build it using Maven. To disable the GPG-signing in the build, use:

mvn clean install -Dgpg.skip=true

Running locally

After building from source you can run the applicaiton locally using any of these three Maven profiles: Stdout, Elasticsearch or S3. For example:

mvn exec:java -P Elasticsearch

The Maven profile defines which connector destination you would use.

You can configure your application by updating the relevant properties file for the Maven profile you choose. You can also override any setting in the properties file using JVM system properties as in the following example:

mvn exec:java -P Stdout -DkinesisInputStream=application-log-stream -DregionName=us-west-2

Related Resources

About

A specialized Amazon Kinesis stream reader (based on the Amazon Kinesis Connector Library) that can help you deliver data from Amazon CloudWatch Logs to any other system in near real-time using a CloudWatch Logs Subscription Filter.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Java 97.3%
  • Nginx 2.7%