Skip to content

Commit

Permalink
TT-10675 add SQS Pump Backend support (#774)
Browse files Browse the repository at this point in the history
* add SQS Pump Backend support

* add conditions for groupID

* add conditions for delay seconds

* add deduplication for FIFO Queues

* add sqs backend to readme

* add comments for sqs configs

* remove pretty print and use log.debug

* add sqs tests

* linting

---------

Co-authored-by: Masoud Haghbin <[email protected]>
Co-authored-by: Masoud Haghbin <[email protected]>
  • Loading branch information
3 people authored Jan 24, 2024
1 parent 19e3047 commit bf9e7e7
Show file tree
Hide file tree
Showing 6 changed files with 415 additions and 12 deletions.
71 changes: 71 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ The table below provides details on the fields within each `tyk_analytics` recor
- [Kafka](#kafka-config)
- [Stdout](#stdout) (i.e. for use by Datadog logging agent in Kubernetes)
- [Timestream](#timestream-config)
- [AWS SQS](#SQS-config)

# Configuration:

Expand Down Expand Up @@ -1268,6 +1269,76 @@ TYK_PMP_PUMPS_CSV_TYPE=csv
TYK_PMP_PUMPS_CSV_META_CSVDIR=./
```

## SQS Config

#### Authentication & Prerequisite

We must authenticate ourselves by providing credentials to AWS. This pump uses the official AWS GO SDK, so instructions on how to authenticate can be found on [their documentation here](https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials).

#### Config Fields

`aws_queue_name` - Specifies the name of the AWS Simple Queue Service (SQS) queue where messages will be sent

`aws_message_group_id` - Specifies the name of the AWS Simple Queue Service (SQS) queue where messages will be sent

`aws_sqs_batch_limit` - Sets the maximum number of messages to include in a single batch when sending messages to the SQS queue

`aws_message_id_deduplication_enabled` - Enables or disables the deduplication of messages based on unique message IDs to prevent unintended duplicates in the queu

`aws_delay_seconds` - Configures the delay (in seconds) before messages sent to the SQS queue become available for processing.

When you initialize a Timestream Pump, the SDK uses its default credential chain to find AWS credentials. This default credential chain looks for credentials in the following order:

- Environment variables.
- Static Credentials (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN`)
- Web Identity Token (`AWS_WEB_IDENTITY_TOKEN_FILE`)
- Shared configuration files.
- SDK defaults to credentials file under `.aws` folder that is placed in the home folder on your computer.
- If your application uses an ECS task definition or RunTask API operation, IAM role for tasks.
- If your application is running on an Amazon EC2 instance, IAM role for Amazon EC2.

If no credentials are provided, SQS Pump won't be able to connect.

###### JSON / Conf File

```json
"sqs": {
"type": "sqs",
"meta": {
"log_field_name": "tyk-analytics-record",
"format": "json",
"aws_queue_name": "access-logs-queue.fifo",
"aws_region": "us-east-1",
"aws_key": "key",
"aws_secret": "secret",
"aws_endpoint": "http://aws-endpoint:4566",
"aws_message_group_id": "message_group_id",
"aws_sqs_batch_limit": 10,
"aws_message_id_deduplication_enabled": true,
"aws_delay_seconds": 0
}
},
```

###### Env Variables

```
# SQS Pump Configuration
TYK_PMP_PUMPS_SQS_TYPE=sqs
TYK_PMP_PUMPS_SQS_META_LOGFIELDNAME=tyk-analytics-record
TYK_PMP_PUMPS_SQS_META_FORMAT=json
TYK_PMP_PUMPS_SQS_META_AWSQUEUENAME=access-logs-queue.fifo
TYK_PMP_PUMPS_SQS_META_AWSREGION=us-east-1
TYK_PMP_PUMPS_SQS_META_AWSKEY=key
TYK_PMP_PUMPS_SQS_META_AWSSECRET=secret
TYK_PMP_PUMPS_SQS_META_AWSENDPOINT=http://aws-endpoint:4566
TYK_PMP_PUMPS_SQS_META_AWSMESSAGEGROUPID=message_group_id
TYK_PMP_PUMPS_SQS_META_AWSSQSBATCHLIMIT=10
TYK_PMP_PUMPS_SQS_META_AWSMESSAGEIDDEDUPLICATIONENABLED=true
TYK_PMP_PUMPS_SQS_META_AWSDELAYSECONDS=0

```

# Base Pump Configurations

The following configurations can be added to any Pump. Keep reading for an example.
Expand Down
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ require (
github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632
github.com/TykTechnologies/storage v1.0.8
github.com/aws/aws-sdk-go-v2 v1.16.14
github.com/aws/aws-sdk-go-v2 v1.22.1
github.com/aws/aws-sdk-go-v2/config v1.9.0
github.com/aws/aws-sdk-go-v2/credentials v1.5.0
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0
github.com/cenkalti/backoff/v4 v4.0.2
github.com/fatih/structs v1.1.0
Expand All @@ -24,6 +26,7 @@ require (
github.com/logzio/logzio-go v0.0.0-20200316143903-ac8fc0e2910e
github.com/mitchellh/mapstructure v1.3.1
github.com/moesif/moesifapi-go v1.0.6
github.com/oklog/ulid/v2 v2.1.0
github.com/olivere/elastic/v7 v7.0.28
github.com/oschwald/maxminddb-golang v1.11.0
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -54,16 +57,15 @@ require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.8.0 // indirect
github.com/aws/smithy-go v1.13.2 // indirect
github.com/aws/smithy-go v1.16.0 // indirect
github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
Expand Down
20 changes: 13 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,28 @@ github.com/aws/aws-sdk-go v1.29.11/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTg
github.com/aws/aws-sdk-go v1.40.32/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/aws/aws-sdk-go-v2 v1.10.0/go.mod h1:U/EyyVvKtzmFeQQcca7eBotKdlpcP2zzU6bXBYcf7CE=
github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
github.com/aws/aws-sdk-go-v2 v1.16.14 h1:db6GvO4Z2UqHt5gvT0lr6J5x5P+oQ7bdRzczVaRekMU=
github.com/aws/aws-sdk-go-v2 v1.16.14/go.mod h1:s/G+UV29dECbF5rf+RNj1xhlmvoNurGSr+McVSRj59w=
github.com/aws/aws-sdk-go-v2 v1.22.1 h1:sjnni/AuoTXxHitsIdT0FwmqUuNUuHtufcVDErVFT9U=
github.com/aws/aws-sdk-go-v2 v1.22.1/go.mod h1:Kd0OJtkW3Q0M0lUWGszapWjEvrXDzRW+D21JNsroB+c=
github.com/aws/aws-sdk-go-v2/config v1.9.0 h1:SkREVSwi+J8MSdjhJ96jijZm5ZDNleI0E4hHCNivh7s=
github.com/aws/aws-sdk-go-v2/config v1.9.0/go.mod h1:qhK5NNSgo9/nOSMu3HyE60WHXZTWTHTgd5qtIF44vOQ=
github.com/aws/aws-sdk-go-v2/credentials v1.5.0 h1:r6470olsn2qyOe2aLzK6q+wfO3dzNcMujRT3gqBgBB8=
github.com/aws/aws-sdk-go-v2/credentials v1.5.0/go.mod h1:kvqTkpzQmzri9PbsiTY+LvwFzM0gY19emlAWwBOJMb0=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 h1:FKaqk7geL3oIqSwGJt5SWUKj8uJ+qLZNqlBuqq6sFyA=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0/go.mod h1:KqEkRkxm/+1Pd/rENRNbQpfblDBYeg5HDSqjB6ks8hA=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 h1:XJLnluKuUxQG255zPNe+04izXl7GSyUVafIsgfv9aw4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 h1:EauRoYZVNPlidZSZJDscjJBQ22JhVF2+tdteatax2Ak=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 h1:fi1ga6WysOyYb5PAf3Exd6B5GiSNpnZim4h1rhlBqx0=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1/go.mod h1:V5CY8wNurvPUibTi9mwqUqpiFZ5LnioKWIFUDtIzdI8=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 h1:ZpaV/j48RlPc4AmOZuPv22pJliXjXq8/reL63YzyFnw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1/go.mod h1:R8aXraabD2e3qv1csxM14/X9WF4wFMIY0kH4YEtYD5M=
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 h1:zPxLGWALExNepElO0gYgoqsbqTlt4ZCrhZ7XlfJ+Qlw=
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5/go.mod h1:6ZBTuDmvpCOD4Sf1i2/I3PgftlEcDGgvi8ocq64oQEg=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0 h1:/T5wKsw/po118HEDvnSE8YU7TESxvZbYM2rnn+Oi7Kk=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0/go.mod h1:X5/JuOxPLU/ogICgDTtnpfaQzdQJO0yKDcpoxWLLJ8Y=
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0 h1:21QmEZkOnaJ4SPRFhhN+8MV5ewb0j1lxTg+RPp0mUeE=
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0/go.mod h1:E02a07/HTyJEHFpp+WMRh33xuNVdsd8WCbLlODeT4lU=
github.com/aws/aws-sdk-go-v2/service/sso v1.5.0 h1:VnrCAJTp1bDxU79UuW/D4z7bwZ7xOc7JjDKpqXL/m04=
github.com/aws/aws-sdk-go-v2/service/sso v1.5.0/go.mod h1:GsqaJOJeOfeYD88/2vHWKXegvDRofDqWwC5i48A2kgs=
github.com/aws/aws-sdk-go-v2/service/sts v1.8.0 h1:7N7RsEVvUcvEg7jrWKU5AnSi4/6b6eY9+wG1g6W4ExE=
Expand All @@ -84,8 +88,8 @@ github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0 h1:/4djuASUYOns1ZhCO
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0/go.mod h1:VN4yDJwgYOO6AzHPE8+QeBwK6wUMOFkSCogZFWifdVc=
github.com/aws/smithy-go v1.8.1/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.13.2 h1:TBLKyeJfXTrTXRHmsv4qWt9IQGYyWThLYaJWSahTOGE=
github.com/aws/smithy-go v1.13.2/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/aws/smithy-go v1.16.0 h1:gJZEH/Fqh+RsvlJ1Zt4tVAtV6bKkp3cC+R6FCZMNzik=
github.com/aws/smithy-go v1.16.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE=
github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 h1:ZgW7EEoTQvz27wleAVF3XVBqc6eBFqB4BNw4Awg4BN8=
github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/benbjohnson/tmpl v1.1.0/go.mod h1:N7W0NUGWuG26caFrID5sE4tvyLaKVp1fbV3Vr+MCul8=
Expand Down Expand Up @@ -216,7 +220,6 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
Expand Down Expand Up @@ -396,6 +399,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/olivere/elastic v6.2.31+incompatible h1:zwJIIsgfiDBuDS3sb6MCbm/e03BPEJoGZvqevZXM254=
github.com/olivere/elastic v6.2.31+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8=
github.com/olivere/elastic/v7 v7.0.12/go.mod h1:14rWX28Pnh3qCKYRVnSGXWLf9MbLonYS/4FDCY3LAPo=
Expand All @@ -420,6 +425,7 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr
github.com/oschwald/maxminddb-golang v1.11.0 h1:aSXMqYR/EPNjGE8epgqwDay+P30hCBZIveY0WZbAWh0=
github.com/oschwald/maxminddb-golang v1.11.0/go.mod h1:YmVI+H0zh3ySFR3w+oz8PCfglAFj3PuCmui13+P9zDg=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
Expand Down
1 change: 1 addition & 0 deletions pumps/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ func init() {
AvailablePumps["sql-graph"] = &GraphSQLPump{}
AvailablePumps["sql-graph-aggregate"] = &GraphSQLAggregatePump{}
AvailablePumps["resurfaceio"] = &ResurfacePump{}
AvailablePumps["sqs"] = &SQSPump{}
}
Loading

0 comments on commit bf9e7e7

Please sign in to comment.