From 92a7acf2c2b8a690c659aae1bc1fedc1e98d741e Mon Sep 17 00:00:00 2001 From: Dylan Ratcliffe Date: Fri, 22 Sep 2023 14:18:03 +0000 Subject: [PATCH 1/2] Increase map parallelism & improve wait tracking --- sources/iam/policy.go | 29 ++++++++--------------------- sources/iam/role.go | 30 +++++++++++++++--------------- sources/limit_bucket.go | 23 +++++++++++++++++++---- 3 files changed, 42 insertions(+), 40 deletions(-) diff --git a/sources/iam/policy.go b/sources/iam/policy.go index b67f0d38..c9dc9b16 100644 --- a/sources/iam/policy.go +++ b/sources/iam/policy.go @@ -37,7 +37,8 @@ func policyGetFunc(ctx context.Context, client IAMClient, scope, query string, l }, } - <-limit.C + limit.Wait(ctx) + out, err := client.GetPolicy(ctx, &iam.GetPolicyInput{ PolicyArn: sources.PtrString(a.String()), }) @@ -83,11 +84,10 @@ func addTags(ctx context.Context, client IAMClient, details *PolicyDetails, limi // Only create new spans on trace level logging ctx, span = tracer.Start(ctx, "addTags") defer span.End() - } else { - span = trace.SpanFromContext(ctx) } - wait := limit.TimeWait() + limit.Wait(ctx) + out, err := client.ListPolicyTags(ctx, &iam.ListPolicyTagsInput{ PolicyArn: details.Policy.Arn, }) @@ -96,10 +96,6 @@ func addTags(ctx context.Context, client IAMClient, details *PolicyDetails, limi return err } - span.SetAttributes( - attribute.Int64("om.aws.rateLimit.waitTimeMilliseconds", wait.Milliseconds()), - ) - details.Policy.Tags = out.Tags return nil @@ -111,8 +107,6 @@ func addPolicyEntities(ctx context.Context, client IAMClient, details *PolicyDet // Only create new spans on trace level logging ctx, span = tracer.Start(ctx, "addPolicyEntities") defer span.End() - } else { - span = trace.SpanFromContext(ctx) } if details == nil { @@ -127,10 +121,9 @@ func addPolicyEntities(ctx context.Context, client IAMClient, details *PolicyDet PolicyArn: details.Policy.Arn, }) - var waitTime time.Duration - for paginator.HasMorePages() { - waitTime += limit.TimeWait() + limit.Wait(ctx) + out, err := paginator.NextPage(ctx) if err != nil { @@ -142,10 +135,6 @@ func addPolicyEntities(ctx context.Context, client IAMClient, details *PolicyDet details.PolicyUsers = append(details.PolicyUsers, out.PolicyUsers...) } - span.SetAttributes( - attribute.Int64("om.aws.rateLimit.waitTimeMilliseconds", waitTime.Milliseconds()), - ) - return nil } @@ -177,10 +166,9 @@ func policyListFunc(ctx context.Context, client IAMClient, scope string, limit * Scope: iamScope, }) - var waitTime time.Duration - for paginator.HasMorePages() { - waitTime += limit.TimeWait() + limit.Wait(ctx) + out, err := paginator.NextPage(ctx) if err != nil { @@ -192,7 +180,6 @@ func policyListFunc(ctx context.Context, client IAMClient, scope string, limit * span.SetAttributes( attribute.Int("om.aws.numPolicies", len(policies)), - attribute.Int64("om.aws.rateLimit.waitTimeMilliseconds", waitTime.Milliseconds()), ) policyDetails, err := iter.MapErr[types.Policy, *PolicyDetails](policies, func(p *types.Policy) (*PolicyDetails, error) { diff --git a/sources/iam/role.go b/sources/iam/role.go index 8f0cdf82..727723e2 100644 --- a/sources/iam/role.go +++ b/sources/iam/role.go @@ -24,7 +24,8 @@ type RoleDetails struct { } func roleGetFunc(ctx context.Context, client IAMClient, scope, query string, limit *sources.LimitBucket) (*RoleDetails, error) { - <-limit.C + limit.Wait(ctx) + out, err := client.GetRole(ctx, &iam.GetRoleInput{ RoleName: &query, }) @@ -85,10 +86,10 @@ func getEmbeddedPolicies(ctx context.Context, client IAMClient, roleName string, defer span.End() policies := make([]embeddedPolicy, 0) - var waitTime time.Duration for policiesPaginator.HasMorePages() { - waitTime += limit.TimeWait() + limit.Wait(ctx) + out, err := policiesPaginator.NextPage(ctx) if err != nil { @@ -114,11 +115,7 @@ func getRolePolicyDetails(ctx context.Context, client IAMClient, roleName string ctx, span := tracer.Start(ctx, "getRolePolicyDetails") defer span.End() - wait := limit.TimeWait() - - span.SetAttributes( - attribute.Int64("om.aws.rateLimit.waitTimeMilliseconds", wait.Milliseconds()), - ) + limit.Wait(ctx) policy, err := client.GetRolePolicy(ctx, &iam.GetRolePolicyInput{ RoleName: &roleName, @@ -165,7 +162,8 @@ func getAttachedPolicies(ctx context.Context, client IAMClient, roleName string, attachedPolicies := make([]types.AttachedPolicy, 0) for paginator.HasMorePages() { - <-limit.C + limit.Wait(ctx) + out, err := paginator.NextPage(ctx) if err != nil { @@ -179,7 +177,8 @@ func getAttachedPolicies(ctx context.Context, client IAMClient, roleName string, } func getRoleTags(ctx context.Context, client IAMClient, roleName string, limit *sources.LimitBucket) ([]types.Tag, error) { - <-limit.C + limit.Wait(ctx) + out, err := client.ListRoleTags(ctx, &iam.ListRoleTagsInput{ RoleName: &roleName, }) @@ -197,17 +196,20 @@ func roleListFunc(ctx context.Context, client IAMClient, scope string, limit *so ctx, span := tracer.Start(ctx, "roleListFunc") defer span.End() - var waitTime time.Duration + mapper := iter.Mapper[types.Role, *RoleDetails]{ + MaxGoroutines: 100, + } for paginator.HasMorePages() { - wait := limit.TimeWait() + limit.Wait(ctx) + out, err := paginator.NextPage(ctx) if err != nil { return nil, err } - newRoles, err := iter.MapErr(out.Roles, func(role *types.Role) (*RoleDetails, error) { + newRoles, err := mapper.MapErr(out.Roles, func(role *types.Role) (*RoleDetails, error) { details := RoleDetails{ Role: role, } @@ -226,12 +228,10 @@ func roleListFunc(ctx context.Context, client IAMClient, scope string, limit *so } roles = append(roles, newRoles...) - waitTime += wait } span.SetAttributes( attribute.Int("om.aws.numRoles", len(roles)), - attribute.Int64("om.aws.rateLimit.waitTimeMilliseconds", waitTime.Milliseconds()), ) return roles, nil diff --git a/sources/limit_bucket.go b/sources/limit_bucket.go index c2688ff8..7d2082cd 100644 --- a/sources/limit_bucket.go +++ b/sources/limit_bucket.go @@ -5,6 +5,8 @@ import ( "time" "github.com/getsentry/sentry-go" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // DefaultRefillDuration How often LimitBuckets are refilled by default @@ -58,13 +60,26 @@ func (b *LimitBucket) Start(ctx context.Context) { }(ctx, b) } -// Waits for a token and returns the duration waited -func (b *LimitBucket) TimeWait() time.Duration { +// Waits for a token. Passing a context allows allows this to add a span event +// if there is a long wait, and also allows this to be cancelled +func (b *LimitBucket) Wait(ctx context.Context) { start := time.Now() - <-b.C + select { + case <-ctx.Done(): + return + case <-b.C: + waitTime := time.Since(start) + + if waitTime > 300*time.Millisecond { + span := trace.SpanFromContext(ctx) + span.AddEvent("waited for late limit", trace.WithAttributes( + attribute.Int64("om.aws.rateLimit.waitTimeMilliseconds", waitTime.Milliseconds()), + )) + } - return time.Since(start) + return + } } // refill refills the bucket the specified amount From 525dba991c163a523470ee9a2d1d9317f54e5561 Mon Sep 17 00:00:00 2001 From: Dylan Ratcliffe Date: Fri, 22 Sep 2023 14:22:54 +0000 Subject: [PATCH 2/2] Standardise rate limiting --- sources/autoscaling/auto_scaling_group.go | 2 +- sources/ec2/address.go | 2 +- sources/ec2/az.go | 2 +- sources/ec2/capacity_reservation.go | 2 +- sources/ec2/capacity_reservation_fleet.go | 2 +- sources/ec2/egress_internet_gateway.go | 2 +- sources/ec2/iam_instance_profile_association.go | 2 +- sources/ec2/image.go | 2 +- sources/ec2/instance.go | 2 +- sources/ec2/instance_event_window.go | 2 +- sources/ec2/instance_status.go | 2 +- sources/ec2/internet_gateway.go | 2 +- sources/ec2/key_pair.go | 2 +- sources/ec2/launch_template.go | 2 +- sources/ec2/launch_template_version.go | 2 +- sources/ec2/nat_gateway.go | 2 +- sources/ec2/network_acl.go | 2 +- sources/ec2/network_interface.go | 2 +- sources/ec2/network_interface_permissions.go | 2 +- sources/ec2/placement_group.go | 2 +- sources/ec2/region.go | 2 +- sources/ec2/reserved_instance.go | 2 +- sources/ec2/route_table.go | 2 +- sources/ec2/sg.go | 2 +- sources/ec2/sg_rule.go | 2 +- sources/ec2/snapshot.go | 2 +- sources/ec2/subnet.go | 2 +- sources/ec2/volume.go | 2 +- sources/ec2/volume_status.go | 2 +- sources/ec2/vpc.go | 2 +- sources/ec2/vpc_peering_connection.go | 2 +- sources/efs/access_point.go | 2 +- sources/efs/backup_policy.go | 2 +- sources/efs/file_system.go | 2 +- sources/efs/mount_target.go | 2 +- sources/efs/replication_configuration.go | 2 +- sources/iam/group.go | 4 ++-- sources/iam/instance_profile.go | 4 ++-- sources/iam/user.go | 8 ++++---- sources/limit_bucket.go | 6 ++---- sources/limit_bucket_test.go | 2 +- 41 files changed, 47 insertions(+), 49 deletions(-) diff --git a/sources/autoscaling/auto_scaling_group.go b/sources/autoscaling/auto_scaling_group.go index b334f654..6f5aa3f3 100644 --- a/sources/autoscaling/auto_scaling_group.go +++ b/sources/autoscaling/auto_scaling_group.go @@ -246,7 +246,7 @@ func NewAutoScalingGroupSource(config aws.Config, accountID string, limit *sourc return autoscaling.NewDescribeAutoScalingGroupsPaginator(client, params) }, DescribeFunc: func(ctx context.Context, client *autoscaling.Client, input *autoscaling.DescribeAutoScalingGroupsInput) (*autoscaling.DescribeAutoScalingGroupsOutput, error) { - <-limit.C // Wait for rate limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for rate limiting return client.DescribeAutoScalingGroups(ctx, input) }, OutputMapper: autoScalingGroupOutputMapper, diff --git a/sources/ec2/address.go b/sources/ec2/address.go index 1754f78e..bb75ff03 100644 --- a/sources/ec2/address.go +++ b/sources/ec2/address.go @@ -156,7 +156,7 @@ func NewAddressSource(config aws.Config, accountID string, limit *sources.LimitB AccountID: accountID, ItemType: "ec2-address", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeAddressesInput) (*ec2.DescribeAddressesOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeAddresses(ctx, input) }, InputMapperGet: addressInputMapperGet, diff --git a/sources/ec2/az.go b/sources/ec2/az.go index 09ae0048..c6fda0d9 100644 --- a/sources/ec2/az.go +++ b/sources/ec2/az.go @@ -87,7 +87,7 @@ func NewAvailabilityZoneSource(config aws.Config, accountID string, limit *sourc AccountID: accountID, ItemType: "ec2-availability-zone", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeAvailabilityZonesInput) (*ec2.DescribeAvailabilityZonesOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeAvailabilityZones(ctx, input) }, InputMapperGet: availabilityZoneInputMapperGet, diff --git a/sources/ec2/capacity_reservation.go b/sources/ec2/capacity_reservation.go index 379e636d..1fc103bb 100644 --- a/sources/ec2/capacity_reservation.go +++ b/sources/ec2/capacity_reservation.go @@ -124,7 +124,7 @@ func NewCapacityReservationSource(config aws.Config, accountID string, limit *so AccountID: accountID, ItemType: "ec2-capacity-reservation", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeCapacityReservationsInput) (*ec2.DescribeCapacityReservationsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeCapacityReservations(ctx, input) }, InputMapperGet: func(scope, query string) (*ec2.DescribeCapacityReservationsInput, error) { diff --git a/sources/ec2/capacity_reservation_fleet.go b/sources/ec2/capacity_reservation_fleet.go index f9bb485b..dc8cb04d 100644 --- a/sources/ec2/capacity_reservation_fleet.go +++ b/sources/ec2/capacity_reservation_fleet.go @@ -107,7 +107,7 @@ func NewCapacityReservationFleetSource(config aws.Config, accountID string, limi AccountID: accountID, ItemType: "ec2-capacity-reservation-fleet", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeCapacityReservationFleetsInput) (*ec2.DescribeCapacityReservationFleetsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeCapacityReservationFleets(ctx, input) }, InputMapperGet: func(scope, query string) (*ec2.DescribeCapacityReservationFleetsInput, error) { diff --git a/sources/ec2/egress_internet_gateway.go b/sources/ec2/egress_internet_gateway.go index 85ad21c0..38571140 100644 --- a/sources/ec2/egress_internet_gateway.go +++ b/sources/ec2/egress_internet_gateway.go @@ -86,7 +86,7 @@ func NewEgressOnlyInternetGatewaySource(config aws.Config, accountID string, lim AccountID: accountID, ItemType: "ec2-egress-only-internet-gateway", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeEgressOnlyInternetGatewaysInput) (*ec2.DescribeEgressOnlyInternetGatewaysOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeEgressOnlyInternetGateways(ctx, input) }, InputMapperGet: egressOnlyInternetGatewayInputMapperGet, diff --git a/sources/ec2/iam_instance_profile_association.go b/sources/ec2/iam_instance_profile_association.go index f61fac10..1488f216 100644 --- a/sources/ec2/iam_instance_profile_association.go +++ b/sources/ec2/iam_instance_profile_association.go @@ -86,7 +86,7 @@ func NewIamInstanceProfileAssociationSource(config aws.Config, accountID string, AccountID: accountID, ItemType: "ec2-iam-instance-profile-association", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeIamInstanceProfileAssociationsInput) (*ec2.DescribeIamInstanceProfileAssociationsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeIamInstanceProfileAssociations(ctx, input) }, InputMapperGet: func(scope, query string) (*ec2.DescribeIamInstanceProfileAssociationsInput, error) { diff --git a/sources/ec2/image.go b/sources/ec2/image.go index c2ea47fb..a78124e6 100644 --- a/sources/ec2/image.go +++ b/sources/ec2/image.go @@ -77,7 +77,7 @@ func NewImageSource(config aws.Config, accountID string, limit *sources.LimitBuc AccountID: accountID, ItemType: "ec2-image", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeImagesInput) (*ec2.DescribeImagesOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeImages(ctx, input) }, InputMapperGet: imageInputMapperGet, diff --git a/sources/ec2/instance.go b/sources/ec2/instance.go index e1bb8258..7667f22b 100644 --- a/sources/ec2/instance.go +++ b/sources/ec2/instance.go @@ -489,7 +489,7 @@ func NewInstanceSource(config aws.Config, accountID string, limit *sources.Limit AccountID: accountID, ItemType: "ec2-instance", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeInstances(ctx, input) }, InputMapperGet: instanceInputMapperGet, diff --git a/sources/ec2/instance_event_window.go b/sources/ec2/instance_event_window.go index f533f687..388fbd40 100644 --- a/sources/ec2/instance_event_window.go +++ b/sources/ec2/instance_event_window.go @@ -101,7 +101,7 @@ func NewInstanceEventWindowSource(config aws.Config, accountID string, limit *so AccountID: accountID, ItemType: "ec2-instance-event-window", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeInstanceEventWindowsInput) (*ec2.DescribeInstanceEventWindowsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeInstanceEventWindows(ctx, input) }, InputMapperGet: instanceEventWindowInputMapperGet, diff --git a/sources/ec2/instance_status.go b/sources/ec2/instance_status.go index 60d65e5f..6ca706a8 100644 --- a/sources/ec2/instance_status.go +++ b/sources/ec2/instance_status.go @@ -103,7 +103,7 @@ func NewInstanceStatusSource(config aws.Config, accountID string, limit *sources AccountID: accountID, ItemType: "ec2-instance-status", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeInstanceStatusInput) (*ec2.DescribeInstanceStatusOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeInstanceStatus(ctx, input) }, InputMapperGet: instanceStatusInputMapperGet, diff --git a/sources/ec2/internet_gateway.go b/sources/ec2/internet_gateway.go index 9b0f9118..708af8e9 100644 --- a/sources/ec2/internet_gateway.go +++ b/sources/ec2/internet_gateway.go @@ -87,7 +87,7 @@ func NewInternetGatewaySource(config aws.Config, accountID string, limit *source AccountID: accountID, ItemType: "ec2-internet-gateway", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeInternetGatewaysInput) (*ec2.DescribeInternetGatewaysOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeInternetGateways(ctx, input) }, InputMapperGet: internetGatewayInputMapperGet, diff --git a/sources/ec2/key_pair.go b/sources/ec2/key_pair.go index 618b5d72..824d205b 100644 --- a/sources/ec2/key_pair.go +++ b/sources/ec2/key_pair.go @@ -66,7 +66,7 @@ func NewKeyPairSource(config aws.Config, accountID string, limit *sources.LimitB AccountID: accountID, ItemType: "ec2-key-pair", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeKeyPairsInput) (*ec2.DescribeKeyPairsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeKeyPairs(ctx, input) }, InputMapperGet: keyPairInputMapperGet, diff --git a/sources/ec2/launch_template.go b/sources/ec2/launch_template.go index 70938118..a4e00355 100644 --- a/sources/ec2/launch_template.go +++ b/sources/ec2/launch_template.go @@ -66,7 +66,7 @@ func NewLaunchTemplateSource(config aws.Config, accountID string, limit *sources AccountID: accountID, ItemType: "ec2-launch-template", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeLaunchTemplatesInput) (*ec2.DescribeLaunchTemplatesOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeLaunchTemplates(ctx, input) }, InputMapperGet: launchTemplateInputMapperGet, diff --git a/sources/ec2/launch_template_version.go b/sources/ec2/launch_template_version.go index e44552e1..d7392774 100644 --- a/sources/ec2/launch_template_version.go +++ b/sources/ec2/launch_template_version.go @@ -343,7 +343,7 @@ func NewLaunchTemplateVersionSource(config aws.Config, accountID string, limit * AccountID: accountID, ItemType: "ec2-launch-template-version", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeLaunchTemplateVersionsInput) (*ec2.DescribeLaunchTemplateVersionsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeLaunchTemplateVersions(ctx, input) }, InputMapperGet: launchTemplateVersionInputMapperGet, diff --git a/sources/ec2/nat_gateway.go b/sources/ec2/nat_gateway.go index 6fa99419..9ec0e3e2 100644 --- a/sources/ec2/nat_gateway.go +++ b/sources/ec2/nat_gateway.go @@ -157,7 +157,7 @@ func NewNatGatewaySource(config aws.Config, accountID string, limit *sources.Lim AccountID: accountID, ItemType: "ec2-nat-gateway", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeNatGatewaysInput) (*ec2.DescribeNatGatewaysOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeNatGateways(ctx, input) }, InputMapperGet: natGatewayInputMapperGet, diff --git a/sources/ec2/network_acl.go b/sources/ec2/network_acl.go index 1265f0ab..f89f52e1 100644 --- a/sources/ec2/network_acl.go +++ b/sources/ec2/network_acl.go @@ -104,7 +104,7 @@ func NewNetworkAclSource(config aws.Config, accountID string, limit *sources.Lim AccountID: accountID, ItemType: "ec2-network-acl", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeNetworkAclsInput) (*ec2.DescribeNetworkAclsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeNetworkAcls(ctx, input) }, InputMapperGet: networkAclInputMapperGet, diff --git a/sources/ec2/network_interface.go b/sources/ec2/network_interface.go index 34766db2..8f98f9b5 100644 --- a/sources/ec2/network_interface.go +++ b/sources/ec2/network_interface.go @@ -285,7 +285,7 @@ func NewNetworkInterfaceSource(config aws.Config, accountID string, limit *sourc AccountID: accountID, ItemType: "ec2-network-interface", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeNetworkInterfacesInput) (*ec2.DescribeNetworkInterfacesOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeNetworkInterfaces(ctx, input) }, InputMapperGet: networkInterfaceInputMapperGet, diff --git a/sources/ec2/network_interface_permissions.go b/sources/ec2/network_interface_permissions.go index ea255ddf..e73973d9 100644 --- a/sources/ec2/network_interface_permissions.go +++ b/sources/ec2/network_interface_permissions.go @@ -82,7 +82,7 @@ func NewNetworkInterfacePermissionSource(config aws.Config, accountID string, li AccountID: accountID, ItemType: "ec2-network-interface-permission", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeNetworkInterfacePermissionsInput) (*ec2.DescribeNetworkInterfacePermissionsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeNetworkInterfacePermissions(ctx, input) }, InputMapperGet: networkInterfacePermissionInputMapperGet, diff --git a/sources/ec2/placement_group.go b/sources/ec2/placement_group.go index d086e8d2..616b7254 100644 --- a/sources/ec2/placement_group.go +++ b/sources/ec2/placement_group.go @@ -66,7 +66,7 @@ func NewPlacementGroupSource(config aws.Config, accountID string, limit *sources AccountID: accountID, ItemType: "ec2-placement-group", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribePlacementGroupsInput) (*ec2.DescribePlacementGroupsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribePlacementGroups(ctx, input) }, InputMapperGet: placementGroupInputMapperGet, diff --git a/sources/ec2/region.go b/sources/ec2/region.go index 2f2563c7..983a7057 100644 --- a/sources/ec2/region.go +++ b/sources/ec2/region.go @@ -64,7 +64,7 @@ func NewRegionSource(config aws.Config, accountID string, limit *sources.LimitBu AccountID: accountID, ItemType: "ec2-region", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeRegionsInput) (*ec2.DescribeRegionsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeRegions(ctx, input) }, InputMapperGet: regionInputMapperGet, diff --git a/sources/ec2/reserved_instance.go b/sources/ec2/reserved_instance.go index 834fb0aa..faa2cc41 100644 --- a/sources/ec2/reserved_instance.go +++ b/sources/ec2/reserved_instance.go @@ -80,7 +80,7 @@ func NewReservedInstanceSource(config aws.Config, accountID string, limit *sourc AccountID: accountID, ItemType: "ec2-reserved-instance", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeReservedInstancesInput) (*ec2.DescribeReservedInstancesOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeReservedInstances(ctx, input) }, InputMapperGet: reservedInstanceInputMapperGet, diff --git a/sources/ec2/route_table.go b/sources/ec2/route_table.go index b733a188..70ecc7a2 100644 --- a/sources/ec2/route_table.go +++ b/sources/ec2/route_table.go @@ -279,7 +279,7 @@ func NewRouteTableSource(config aws.Config, accountID string, limit *sources.Lim AccountID: accountID, ItemType: "ec2-route-table", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeRouteTablesInput) (*ec2.DescribeRouteTablesOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeRouteTables(ctx, input) }, InputMapperGet: routeTableInputMapperGet, diff --git a/sources/ec2/sg.go b/sources/ec2/sg.go index 0087c1e2..dea5d837 100644 --- a/sources/ec2/sg.go +++ b/sources/ec2/sg.go @@ -90,7 +90,7 @@ func NewSecurityGroupSource(config aws.Config, accountID string, limit *sources. AccountID: accountID, ItemType: "ec2-security-group", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeSecurityGroupsInput) (*ec2.DescribeSecurityGroupsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeSecurityGroups(ctx, input) }, InputMapperGet: securityGroupInputMapperGet, diff --git a/sources/ec2/sg_rule.go b/sources/ec2/sg_rule.go index eebecbb5..dd2bba30 100644 --- a/sources/ec2/sg_rule.go +++ b/sources/ec2/sg_rule.go @@ -102,7 +102,7 @@ func NewSecurityGroupRuleSource(config aws.Config, accountID string, limit *sour AccountID: accountID, ItemType: "ec2-security-group-rule", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeSecurityGroupRulesInput) (*ec2.DescribeSecurityGroupRulesOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeSecurityGroupRules(ctx, input) }, InputMapperGet: securityGroupRuleInputMapperGet, diff --git a/sources/ec2/snapshot.go b/sources/ec2/snapshot.go index 726174c7..0eeb6722 100644 --- a/sources/ec2/snapshot.go +++ b/sources/ec2/snapshot.go @@ -94,7 +94,7 @@ func NewSnapshotSource(config aws.Config, accountID string, limit *sources.Limit AccountID: accountID, ItemType: "ec2-snapshot", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeSnapshotsInput) (*ec2.DescribeSnapshotsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeSnapshots(ctx, input) }, InputMapperGet: snapshotInputMapperGet, diff --git a/sources/ec2/subnet.go b/sources/ec2/subnet.go index 003298ec..bb1a355d 100644 --- a/sources/ec2/subnet.go +++ b/sources/ec2/subnet.go @@ -102,7 +102,7 @@ func NewSubnetSource(config aws.Config, accountID string, limit *sources.LimitBu AccountID: accountID, ItemType: "ec2-subnet", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeSubnetsInput) (*ec2.DescribeSubnetsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeSubnets(ctx, input) }, InputMapperGet: subnetInputMapperGet, diff --git a/sources/ec2/volume.go b/sources/ec2/volume.go index c7047647..4e9e2555 100644 --- a/sources/ec2/volume.go +++ b/sources/ec2/volume.go @@ -100,7 +100,7 @@ func NewVolumeSource(config aws.Config, accountID string, limit *sources.LimitBu AccountID: accountID, ItemType: "ec2-volume", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeVolumesInput) (*ec2.DescribeVolumesOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeVolumes(ctx, input) }, InputMapperGet: volumeInputMapperGet, diff --git a/sources/ec2/volume_status.go b/sources/ec2/volume_status.go index 8303e3a3..06747d74 100644 --- a/sources/ec2/volume_status.go +++ b/sources/ec2/volume_status.go @@ -112,7 +112,7 @@ func NewVolumeStatusSource(config aws.Config, accountID string, limit *sources.L AccountID: accountID, ItemType: "ec2-volume-status", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeVolumeStatusInput) (*ec2.DescribeVolumeStatusOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeVolumeStatus(ctx, input) }, InputMapperGet: volumeStatusInputMapperGet, diff --git a/sources/ec2/vpc.go b/sources/ec2/vpc.go index 6b730980..8b78669e 100644 --- a/sources/ec2/vpc.go +++ b/sources/ec2/vpc.go @@ -65,7 +65,7 @@ func NewVpcSource(config aws.Config, accountID string, limit *sources.LimitBucke AccountID: accountID, ItemType: "ec2-vpc", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeVpcsInput) (*ec2.DescribeVpcsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeVpcs(ctx, input) }, InputMapperGet: vpcInputMapperGet, diff --git a/sources/ec2/vpc_peering_connection.go b/sources/ec2/vpc_peering_connection.go index c806641c..3ac87a43 100644 --- a/sources/ec2/vpc_peering_connection.go +++ b/sources/ec2/vpc_peering_connection.go @@ -155,7 +155,7 @@ func NewVpcPeeringConnectionSource(config aws.Config, accountID string, limit *s AccountID: accountID, ItemType: "ec2-vpc-peering-connection", DescribeFunc: func(ctx context.Context, client *ec2.Client, input *ec2.DescribeVpcPeeringConnectionsInput) (*ec2.DescribeVpcPeeringConnectionsOutput, error) { - <-limit.C // Wait for late limiting + limit.Wait(ctx) // Wait for rate limiting // Wait for late limiting return client.DescribeVpcPeeringConnections(ctx, input) }, InputMapperGet: func(scope, query string) (*ec2.DescribeVpcPeeringConnectionsInput, error) { diff --git a/sources/efs/access_point.go b/sources/efs/access_point.go index 282625a1..d16d764c 100644 --- a/sources/efs/access_point.go +++ b/sources/efs/access_point.go @@ -72,7 +72,7 @@ func NewAccessPointSource(config aws.Config, accountID string, limit *sources.Li AccountID: accountID, DescribeFunc: func(ctx context.Context, client *efs.Client, input *efs.DescribeAccessPointsInput) (*efs.DescribeAccessPointsOutput, error) { // Wait for rate limiting - <-limit.C + limit.Wait(ctx) // Wait for rate limiting return client.DescribeAccessPoints(ctx, input) }, PaginatorBuilder: func(client *efs.Client, params *efs.DescribeAccessPointsInput) sources.Paginator[*efs.DescribeAccessPointsOutput, *efs.Options] { diff --git a/sources/efs/backup_policy.go b/sources/efs/backup_policy.go index e9f55597..2b568f14 100644 --- a/sources/efs/backup_policy.go +++ b/sources/efs/backup_policy.go @@ -67,7 +67,7 @@ func NewBackupPolicySource(config aws.Config, accountID string, limit *sources.L AccountID: accountID, DescribeFunc: func(ctx context.Context, client *efs.Client, input *efs.DescribeBackupPolicyInput) (*efs.DescribeBackupPolicyOutput, error) { // Wait for rate limiting - <-limit.C + limit.Wait(ctx) // Wait for rate limiting return client.DescribeBackupPolicy(ctx, input) }, InputMapperGet: func(scope, query string) (*efs.DescribeBackupPolicyInput, error) { diff --git a/sources/efs/file_system.go b/sources/efs/file_system.go index 80778a27..d36f0ea2 100644 --- a/sources/efs/file_system.go +++ b/sources/efs/file_system.go @@ -126,7 +126,7 @@ func NewFileSystemSource(config aws.Config, accountID string, limit *sources.Lim AccountID: accountID, DescribeFunc: func(ctx context.Context, client *efs.Client, input *efs.DescribeFileSystemsInput) (*efs.DescribeFileSystemsOutput, error) { // Wait for rate limiting - <-limit.C + limit.Wait(ctx) // Wait for rate limiting return client.DescribeFileSystems(ctx, input) }, PaginatorBuilder: func(client *efs.Client, params *efs.DescribeFileSystemsInput) sources.Paginator[*efs.DescribeFileSystemsOutput, *efs.Options] { diff --git a/sources/efs/mount_target.go b/sources/efs/mount_target.go index 155d8ede..6e66bd8c 100644 --- a/sources/efs/mount_target.go +++ b/sources/efs/mount_target.go @@ -155,7 +155,7 @@ func NewMountTargetSource(config aws.Config, accountID string, limit *sources.Li AccountID: accountID, DescribeFunc: func(ctx context.Context, client *efs.Client, input *efs.DescribeMountTargetsInput) (*efs.DescribeMountTargetsOutput, error) { // Wait for rate limiting - <-limit.C + limit.Wait(ctx) // Wait for rate limiting return client.DescribeMountTargets(ctx, input) }, InputMapperGet: func(scope, query string) (*efs.DescribeMountTargetsInput, error) { diff --git a/sources/efs/replication_configuration.go b/sources/efs/replication_configuration.go index 93796f07..c28ffc72 100644 --- a/sources/efs/replication_configuration.go +++ b/sources/efs/replication_configuration.go @@ -140,7 +140,7 @@ func NewReplicationConfigurationSource(config aws.Config, accountID string, limi AccountID: accountID, DescribeFunc: func(ctx context.Context, client *efs.Client, input *efs.DescribeReplicationConfigurationsInput) (*efs.DescribeReplicationConfigurationsOutput, error) { // Wait for rate limiting - <-limit.C + limit.Wait(ctx) // Wait for rate limiting return client.DescribeReplicationConfigurations(ctx, input) }, InputMapperGet: func(scope, query string) (*efs.DescribeReplicationConfigurationsInput, error) { diff --git a/sources/iam/group.go b/sources/iam/group.go index c3b6a2ab..f8a84067 100644 --- a/sources/iam/group.go +++ b/sources/iam/group.go @@ -74,11 +74,11 @@ func NewGroupSource(config aws.Config, accountID string, region string, limit *s CacheDuration: 1 * time.Hour, // IAM has very low rate limits, we need to cache for a long time AccountID: accountID, GetFunc: func(ctx context.Context, client *iam.Client, scope, query string) (*types.Group, error) { - <-limit.C + limit.Wait(ctx) // Wait for rate limiting return groupGetFunc(ctx, client, scope, query) }, ListFunc: func(ctx context.Context, client *iam.Client, scope string) ([]*types.Group, error) { - <-limit.C + limit.Wait(ctx) // Wait for rate limiting return groupListFunc(ctx, client, scope) }, ItemMapper: groupItemMapper, diff --git a/sources/iam/instance_profile.go b/sources/iam/instance_profile.go index 92edb02a..6867a894 100644 --- a/sources/iam/instance_profile.go +++ b/sources/iam/instance_profile.go @@ -114,11 +114,11 @@ func NewInstanceProfileSource(config aws.Config, accountID string, region string CacheDuration: 1 * time.Hour, // IAM has very low rate limits, we need to cache for a long time AccountID: accountID, GetFunc: func(ctx context.Context, client *iam.Client, scope, query string) (*types.InstanceProfile, error) { - <-limit.C + limit.Wait(ctx) // Wait for rate limiting return instanceProfileGetFunc(ctx, client, scope, query) }, ListFunc: func(ctx context.Context, client *iam.Client, scope string) ([]*types.InstanceProfile, error) { - <-limit.C + limit.Wait(ctx) // Wait for rate limiting return instanceProfileListFunc(ctx, client, scope) }, ItemMapper: instanceProfileItemMapper, diff --git a/sources/iam/user.go b/sources/iam/user.go index c5b34156..e052fafe 100644 --- a/sources/iam/user.go +++ b/sources/iam/user.go @@ -18,7 +18,7 @@ type UserDetails struct { } func userGetFunc(ctx context.Context, client IAMClient, scope, query string, limit *sources.LimitBucket) (*UserDetails, error) { - <-limit.C + limit.Wait(ctx) // Wait for rate limiting out, err := client.GetUser(ctx, &iam.GetUserInput{ UserName: &query, }) @@ -68,7 +68,7 @@ func getUserGroups(ctx context.Context, client IAMClient, userName *string, limi }) for paginator.HasMorePages() { - <-limit.C + limit.Wait(ctx) // Wait for rate limiting out, err = paginator.NextPage(ctx) if err != nil { @@ -96,7 +96,7 @@ func getUserTags(ctx context.Context, client IAMClient, userName *string, limit tags := make([]types.Tag, 0) for paginator.HasMorePages() { - <-limit.C + limit.Wait(ctx) // Wait for rate limiting out, err = paginator.NextPage(ctx) if err != nil { @@ -117,7 +117,7 @@ func userListFunc(ctx context.Context, client IAMClient, scope string, limit *so paginator := iam.NewListUsersPaginator(client, &iam.ListUsersInput{}) for paginator.HasMorePages() { - <-limit.C + limit.Wait(ctx) // Wait for rate limiting out, err = paginator.NextPage(ctx) if err != nil { diff --git a/sources/limit_bucket.go b/sources/limit_bucket.go index 7d2082cd..c1b5e4a8 100644 --- a/sources/limit_bucket.go +++ b/sources/limit_bucket.go @@ -25,8 +25,7 @@ type LimitBucket struct { RefillDuration time.Duration // Channel tokens are stored in - C <-chan struct{} - c chan struct{} // Internal version of `C` + c chan struct{} // Channel that sends whether or not the bucket is full each time the // bucket is refilled @@ -40,7 +39,6 @@ func (b *LimitBucket) Start(ctx context.Context) { tokenChan := make(chan struct{}, b.MaxCapacity) b.c = tokenChan - b.C = tokenChan go func(ctx context.Context, bucket *LimitBucket) { defer sentry.Recover() @@ -68,7 +66,7 @@ func (b *LimitBucket) Wait(ctx context.Context) { select { case <-ctx.Done(): return - case <-b.C: + case <-b.c: waitTime := time.Since(start) if waitTime > 300*time.Millisecond { diff --git a/sources/limit_bucket_test.go b/sources/limit_bucket_test.go index 47482c1e..ff3b00e8 100644 --- a/sources/limit_bucket_test.go +++ b/sources/limit_bucket_test.go @@ -58,7 +58,7 @@ func TestWaiting(t *testing.T) { // Execute 100 actions for i := 0; i < 100; i++ { // Get permission - <-b.C + <-b.c } timeTaken := time.Since(start)