Skip to content

Commit

Permalink
fix(konnect): handle conflicts when creating consumers (#753)
Browse files Browse the repository at this point in the history
* deal with conflicts when creating consumers

* add envtest for kongconsumers

* address comments
  • Loading branch information
randmonkey authored Oct 18, 2024
1 parent 8e6269f commit a41fdc1
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 8 deletions.
1 change: 1 addition & 0 deletions controller/konnect/ops/kongconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ type ConsumersSDK interface {
CreateConsumer(ctx context.Context, controlPlaneID string, consumerInput sdkkonnectcomp.ConsumerInput, opts ...sdkkonnectops.Option) (*sdkkonnectops.CreateConsumerResponse, error)
UpsertConsumer(ctx context.Context, upsertConsumerRequest sdkkonnectops.UpsertConsumerRequest, opts ...sdkkonnectops.Option) (*sdkkonnectops.UpsertConsumerResponse, error)
DeleteConsumer(ctx context.Context, controlPlaneID string, consumerID string, opts ...sdkkonnectops.Option) (*sdkkonnectops.DeleteConsumerResponse, error)
ListConsumer(ctx context.Context, request sdkkonnectops.ListConsumerRequest, opts ...sdkkonnectops.Option) (*sdkkonnectops.ListConsumerResponse, error)
}
74 changes: 74 additions & 0 deletions controller/konnect/ops/kongconsumer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions controller/konnect/ops/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ func Create[
err = createControlPlane(ctx, sdk.GetControlPlaneSDK(), sdk.GetControlPlaneGroupSDK(), cl, ent)
case *configurationv1alpha1.KongService:
err = createService(ctx, sdk.GetServicesSDK(), ent)

// TODO: modify the create* operation wrappers to not set Programmed conditions and return
// a KonnectEntityCreatedButRelationsFailedError if the entity was created but its relations assignment failed.

case *configurationv1alpha1.KongRoute:
err = createRoute(ctx, sdk.GetRoutesSDK(), ent)

Expand Down Expand Up @@ -138,6 +134,8 @@ func Create[
id, err = getKongServiceForUID(ctx, sdk.GetServicesSDK(), ent)
case *configurationv1alpha1.KongSNI:
id, err = getSNIForUID(ctx, sdk.GetSNIsSDK(), ent)
case *configurationv1.KongConsumer:
id, err = getConsumerForUID(ctx, sdk.GetConsumersSDK(), ent)
// ---------------------------------------------------------------------
// TODO: add other Konnect types
default:
Expand Down
59 changes: 55 additions & 4 deletions controller/konnect/ops/ops_kongconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/kong/gateway-operator/controller/pkg/log"
"github.com/kong/gateway-operator/pkg/consts"
k8sutils "github.com/kong/gateway-operator/pkg/utils/kubernetes"

configurationv1 "github.com/kong/kubernetes-configuration/api/configuration/v1"
Expand Down Expand Up @@ -47,11 +48,19 @@ func createConsumer(
return errWrap
}

if resp == nil || resp.Consumer == nil || resp.Consumer.ID == nil || *resp.Consumer.ID == "" {
return fmt.Errorf("failed creating %s: %w", consumer.GetTypeName(), ErrNilResponse)
}
// Set the Konnect ID in the status to keep it even if ConsumerGroup assignments fail.
consumer.Status.Konnect.SetKonnectID(*resp.Consumer.ID)
id := *resp.Consumer.ID
consumer.SetKonnectID(id)

if err = handleConsumerGroupAssignments(ctx, consumer, cl, cgSDK, cpID); err != nil {
return fmt.Errorf("failed to handle ConsumerGroup assignments: %w", err)
return KonnectEntityCreatedButRelationsFailedError{
KonnectID: id,
Reason: consts.FailedToAttachConsumerToConsumerGroupReason,
Err: err,
}
}

// The Consumer is considered Programmed if it was successfully created and all its _valid_ ConsumerGroup references
Expand All @@ -75,11 +84,12 @@ func updateConsumer(
if cpID == "" {
return fmt.Errorf("can't update %T without a ControlPlaneID", consumer)
}
consumerID := consumer.GetKonnectStatus().GetKonnectID()

_, err := sdk.UpsertConsumer(ctx,
sdkkonnectops.UpsertConsumerRequest{
ControlPlaneID: cpID,
ConsumerID: consumer.GetKonnectStatus().GetKonnectID(),
ConsumerID: consumerID,
Consumer: kongConsumerToSDKConsumerInput(consumer),
},
)
Expand All @@ -93,7 +103,11 @@ func updateConsumer(
}

if err = handleConsumerGroupAssignments(ctx, consumer, cl, cgSDK, cpID); err != nil {
return fmt.Errorf("failed to handle ConsumerGroup assignments: %w", err)
return KonnectEntityCreatedButRelationsFailedError{
KonnectID: consumerID,
Reason: consts.FailedToAttachConsumerToConsumerGroupReason,
Err: err,
}
}

// The Consumer is considered Programmed if it was successfully updated and all its _valid_ ConsumerGroup references
Expand Down Expand Up @@ -331,3 +345,40 @@ func kongConsumerToSDKConsumerInput(
Username: lo.ToPtr(consumer.Username),
}
}

// getConsumerForUID lists consumers in Konnect with given k8s uid as its tag.
func getConsumerForUID(
ctx context.Context,
sdk ConsumersSDK,
consumer *configurationv1.KongConsumer,
) (string, error) {
cpID := consumer.GetControlPlaneID()
reqList := sdkkonnectops.ListConsumerRequest{
// NOTE: only filter on object's UID.
// Other fields like name might have changed in the meantime but that's OK.
// Those will be enforced via subsequent updates.
ControlPlaneID: cpID,
Tags: lo.ToPtr(UIDLabelForObject(consumer)),
}

respList, err := sdk.ListConsumer(ctx, reqList)
if err != nil {
return "", err
}

if respList == nil || respList.Object == nil {
return "", fmt.Errorf("failed listing KongConsumers: %w", ErrNilResponse)
}

for _, entry := range respList.Object.Data {
if entry.ID != nil && *entry.ID != "" {
// return the ID if we found a non-empty one with the given k8s uid
return *entry.ID, nil
}
}
// return UIDNotFound error if no such entry found
return "", EntityWithMatchingUIDNotFoundError{
Entity: consumer,
}

}
3 changes: 3 additions & 0 deletions pkg/consts/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ const (
// KonnectEntitiesFailedToUpdateReason is the reason assigned to Konnect entities that failed to get updated.
// It must be used when Programmed condition is set to False.
KonnectEntitiesFailedToUpdateReason ConditionReason = "FailedToUpdate"
// FailedToAttachConsumerToConsumerGroupReason is the reason assigned to KonnConsumers when failed to attach it to any consumer group.
// It must be used when Programmed condition is set to False.
FailedToAttachConsumerToConsumerGroupReason ConditionReason = "FailedToAttachConsumerToConsumerGroup"
)
54 changes: 54 additions & 0 deletions test/envtest/konnect_entities_kongconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package envtest

import (
"context"
"strings"
"testing"

sdkkonnectcomp "github.com/Kong/sdk-konnect-go/models/components"
sdkkonnectops "github.com/Kong/sdk-konnect-go/models/operations"
sdkkonnecterrs "github.com/Kong/sdk-konnect-go/models/sdkerrors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -301,4 +303,56 @@ func TestKongConsumer(t *testing.T) {
assert.True(c, factory.SDK.ConsumerGroupSDK.AssertExpectations(t))
}, waitTime, tickTime)
})

t.Run("should handle conflict in creation correctly", func(t *testing.T) {
const (
consumerID = "consumer-id-conflict"
username = "user-3"
)
t.Log("Setup mock SDK for creating consumer and listing consumers by UID")
cpID := cp.GetKonnectStatus().GetKonnectID()
sdk.ConsumersSDK.EXPECT().CreateConsumer(
mock.Anything,
cpID,
mock.MatchedBy(func(input sdkkonnectcomp.ConsumerInput) bool {
return input.Username != nil && *input.Username == username
}),
).Return(&sdkkonnectops.CreateConsumerResponse{}, &sdkkonnecterrs.ConflictError{})

sdk.ConsumersSDK.EXPECT().ListConsumer(
mock.Anything,
mock.MatchedBy(func(req sdkkonnectops.ListConsumerRequest) bool {
return req.ControlPlaneID == cpID &&
req.Tags != nil && strings.HasPrefix(*req.Tags, "k8s-uid")
}),
).Return(&sdkkonnectops.ListConsumerResponse{
Object: &sdkkonnectops.ListConsumerResponseBody{
Data: []sdkkonnectcomp.Consumer{
{
ID: lo.ToPtr(consumerID),
},
},
},
}, nil)

t.Log("Creating a KongConsumer")
createdConsumer := deploy.KongConsumerAttachedToCP(t, ctx, clientNamespaced, username, cp)

t.Log("Watching for KongConsumers to verify the created KongConsumer programmed")
watchFor(t, ctx, cWatch, watch.Modified, func(c *configurationv1.KongConsumer) bool {
if c.GetName() != createdConsumer.GetName() {
return false
}
return c.GetKonnectID() == consumerID && lo.ContainsBy(c.Status.Conditions, func(condition metav1.Condition) bool {
return condition.Type == konnectv1alpha1.KonnectEntityProgrammedConditionType &&
condition.Status == metav1.ConditionTrue
})
}, "KongConsumer should be programmed and have ID in status after handling conflict")

t.Log("Ensuring that the SDK's create and list methods are called")
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.True(c, factory.SDK.ConsumersSDK.AssertExpectations(t))
}, waitTime, tickTime)

})
}

0 comments on commit a41fdc1

Please sign in to comment.