Skip to content

Commit

Permalink
rewrite flaky broker_deleted_recreated
Browse files Browse the repository at this point in the history
  • Loading branch information
maschmid committed Nov 27, 2024
1 parent 6235e66 commit 06873f3
Show file tree
Hide file tree
Showing 2 changed files with 314 additions and 2 deletions.
86 changes: 84 additions & 2 deletions test/rekt/features/broker_deleted_recreated.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@
package features

import (
"context"
cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"

Check failure on line 24 in test/rekt/features/broker_deleted_recreated.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

SA1019: "k8s.io/utils/pointer" is deprecated: Use functions in k8s.io/utils/ptr instead: ptr.To to obtain a pointer, ptr.Deref to dereference a pointer, ptr.Equal to compare dereferenced pointers. (staticcheck)
kafkabroker "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker"
"knative.dev/eventing-kafka-broker/test/e2e_new/bogus_config"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub/assert"
"math"
"strconv"

"knative.dev/pkg/system"

Expand All @@ -36,15 +45,88 @@ import (
brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker"
)

func compose(steps ...feature.StepFn) feature.StepFn {
return func(ctx context.Context, t feature.T) {
for _, s := range steps {
s(ctx, t)
}
}
}

// BrokerDeletedRecreated tests that when a broker and trigger is deleted and re-created, the original sink will eventually stop receiving events
func BrokerDeletedRecreated() *feature.Feature {
f := feature.NewFeatureNamed("broker deleted and recreated")

brokerName := feature.MakeRandomK8sName("broker")
triggerName := feature.MakeRandomK8sName("trigger")

f.Setup("test broker", featuressteps.BrokerSmokeTest(brokerName, triggerName))
sink1 := feature.MakeRandomK8sName("asink")
sink2 := feature.MakeRandomK8sName("bsink")

event := cetest.FullEvent()
event.SetID(uuid.New().String())

eventMatchers := []cetest.EventMatcher{
cetest.HasId(event.ID()),
cetest.HasSource(event.Source()),
cetest.HasType(event.Type()),
cetest.HasSubject(event.Subject()),
}

backoffPolicy := eventingduck.BackoffPolicyLinear

f.Setup("test broker", compose(
eventshub.Install(sink1, eventshub.StartReceiver),
broker.Install(brokerName, broker.WithEnvConfig()...),
broker.IsReady(brokerName),
trigger.Install(
triggerName,
trigger.WithBrokerName(brokerName),
trigger.WithRetry(3, &backoffPolicy, pointer.String("PT1S")),
trigger.WithSubscriber(service.AsKReference(sink1), ""),
),
trigger.IsReady(triggerName),
eventshub.Install(
feature.MakeRandomK8sName("source"),
eventshub.StartSenderToResource(broker.GVR(), brokerName),
eventshub.AddSequence,
eventshub.InputEvent(event),
// We want to send 1 event/s until the timeout
func(ctx context.Context, envs map[string]string) error {
_, timeout := environment.PollTimingsFromContext(ctx)
envs["PERIOD"] = "1" // in seconds
envs["MAX_MESSAGES"] = strconv.Itoa(int(math.Ceil(timeout.Seconds())))
return nil
},
),
assert.OnStore(sink1).MatchEvent(eventMatchers...).AtLeast(1),
))

f.Requirement("delete broker", featuressteps.DeleteBroker(brokerName))
f.Assert("test broker after deletion", featuressteps.BrokerSmokeTest(brokerName, triggerName))
f.Assert("test broker after deletion", compose(
eventshub.Install(sink2, eventshub.StartReceiver),
broker.Install(brokerName, broker.WithEnvConfig()...),
broker.IsReady(brokerName),
trigger.Install(
triggerName,
trigger.WithBrokerName(brokerName),
trigger.WithRetry(3, &backoffPolicy, pointer.String("PT1S")),
trigger.WithSubscriber(service.AsKReference(sink2), ""),
),
trigger.IsReady(triggerName),
// We need to check both that
// 1. sink1 eventually stops receiving new events
// 2. sink2 eventually starts receiving all events
// therefore, we check that eventually, the last few events sent (16 for no particular reason) are all received by the sink2 only
// and contain an uninterrupted (without any missing sequence numbers) source sequence as sent by the source with eventshub.AddSequence
EventSequenceOnStores(sink1, sink2).
MatchingReceived(eventMatchers...). // ... when ...
OrderedBySourceSequence(). // ..., and taken the ...
LastN(16). // ... events, the sequence...
ContainsOnlyEventsObservedBy(sink2). // ...and...
IsAnUninterruptedSourceSequence().
Eventually(),
))

return f
}
Expand Down
230 changes: 230 additions & 0 deletions test/rekt/features/sequence_assertions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package features

Check failure on line 1 in test/rekt/features/sequence_assertions.go

View workflow job for this annotation

GitHub Actions / style / Golang / Auto-format and Check

Please run goimports. diff --git a/test/rekt/features/sequence_assertions.go b/test/rekt/features/sequence_assertions.go index 519f3e7..21a1e7c 100644 --- a/test/rekt/features/sequence_assertions.go +++ b/test/rekt/features/sequence_assertions.go @@ -4,15 +4,17 @@ import ( "cmp" "context" "fmt" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "slices" + cetest "github.com/cloudevents/sdk-go/v2/test" types2 "github.com/cloudevents/sdk-go/v2/types" "k8s.io/apimachinery/pkg/util/wait" "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/eventshub" "knative.dev/reconciler-test/pkg/feature" - "slices" ) type sequenceTransformationOrAssertion func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error)

Check failure on line 1 in test/rekt/features/sequence_assertions.go

View workflow job for this annotation

GitHub Actions / style / Golang / Boilerplate Check (go)

[Go headers] reported by reviewdog 🐶 missing boilerplate: Raw Output: test/rekt/features/sequence_assertions.go:1: missing boilerplate: /* * Copyright 2024 The Knative Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */

import (
"cmp"
"context"
"fmt"
"knative.dev/reconciler-test/pkg/eventshub/assert"

cetest "github.com/cloudevents/sdk-go/v2/test"
types2 "github.com/cloudevents/sdk-go/v2/types"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/feature"
"slices"
)

type sequenceTransformationOrAssertion func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error)

type SequenceAssertionBuilder struct {
storeNames []string
matchers []eventshub.EventInfoMatcherCtx
transformsOrAssertions []sequenceTransformationOrAssertion
}

func getEventInfoSourceSequenceNumber(eventInfo eventshub.EventInfo) (int32, error) {
sequenceExtension, ok := eventInfo.Event.Extensions()["sequence"]
if !ok {
return 0, fmt.Errorf("event does not contain a sequence extension: %s", eventInfo.String())
}

sequenceNumber, err := types2.ToInteger(sequenceExtension)
if err != nil {
return 0, fmt.Errorf("event \"sequence\" extension value %q is not a number: %s", sequenceExtension, eventInfo.String())
}

return sequenceNumber, nil
}

func orderBySourceSequence(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
sorted := slices.Clone(events)

var conversionError error

slices.SortFunc(sorted, func(a, b eventshub.EventInfo) int {
var err error
var an, bn int32
an, err = getEventInfoSourceSequenceNumber(a)
if err != nil {
conversionError = err
return 0
}

bn, err = getEventInfoSourceSequenceNumber(b)
if err != nil {
conversionError = err
return 0
}

return cmp.Compare(an, bn)
})

if conversionError != nil {
return nil, conversionError
}

return sorted, nil
}

func reverse(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
reversed := slices.Clone(events)
slices.Reverse(reversed)
return reversed, nil
}

func firstN(n int, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
if len(events) < n {
return nil, fmt.Errorf("expected at least %d events, got %d", n, len(events))
}

return events[:n], nil
}

func lastN(n int, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
if len(events) < n {
return nil, fmt.Errorf("expected at least %d events, got %d", n, len(events))
}

return events[len(events)-n:], nil
}

// EventSequenceOnStores starts an assertion about sequence of events received by the given named stores
// The assertions are specially designed for checking sequences as generated by sources with eventshub.AddSequence
func EventSequenceOnStores(names ...string) SequenceAssertionBuilder {
return SequenceAssertionBuilder{
storeNames: names,
}
}

func (b SequenceAssertionBuilder) MatchingReceived(matchers ...cetest.EventMatcher) SequenceAssertionBuilder {
b.matchers = append(b.matchers, assert.MatchKind(eventshub.EventReceived).WithContext())
b.matchers = append(b.matchers, assert.MatchEvent(matchers...).WithContext())
return b
}

func (b SequenceAssertionBuilder) Reversed() SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, reverse)
return b
}

func (b SequenceAssertionBuilder) OrderedBySourceSequence() SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, orderBySourceSequence)
return b
}

func (b SequenceAssertionBuilder) FirstN(n int) SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
return firstN(n, events)
})
return b
}

func (b SequenceAssertionBuilder) LastN(n int) SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
return lastN(n, events)
})
return b
}

func (b SequenceAssertionBuilder) ContainsOnly(matcher eventshub.EventInfoMatcher) SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
for _, event := range events {
err := matcher(event)
if err != nil {
return nil, err
}
}

return events, nil
})
return b
}

func (b SequenceAssertionBuilder) ContainsOnlyEventsObservedBy(observerName string) SequenceAssertionBuilder {
return b.ContainsOnly(func(info eventshub.EventInfo) error {
if info.Observer != observerName {
return fmt.Errorf("expected observer to be %s, got %s", observerName, info.Observer)
}
return nil
})
}

func (b SequenceAssertionBuilder) IsAnUninterruptedSourceSequence() SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
if len(events) == 0 {
return nil, fmt.Errorf("no events received")
}

firstSequenceNumber, err := getEventInfoSourceSequenceNumber(events[0])
if err != nil {
return nil, err
}

expectedSequenceNumber := firstSequenceNumber - 1
for _, event := range events {
expectedSequenceNumber++

sequenceNumber, err := getEventInfoSourceSequenceNumber(event)
if err != nil {
return nil, err
}

if sequenceNumber != expectedSequenceNumber {
return nil, fmt.Errorf("expected sequence number %d, got %d", expectedSequenceNumber, sequenceNumber)
}
}

return events, nil
})
return b
}

func (b SequenceAssertionBuilder) Eventually() feature.StepFn {
return func(ctx context.Context, t feature.T) {
retryInterval, retryTimeout := environment.PollTimingsFromContext(ctx)

var internalErr error

wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) {

Check failure on line 189 in test/rekt/features/sequence_assertions.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

SA1019: wait.PollImmediate is deprecated: This method does not return errors from context, use PollUntilContextTimeout. Note that the new method will no longer return ErrWaitTimeout and instead return errors defined by the context package. Will be removed in a future release. (staticcheck)

events := make([]eventshub.EventInfo, 0)
for _, storeName := range b.storeNames {
store := eventshub.StoreFromContext(ctx, storeName)

storeEvents, _, _, err := store.Find(func(info eventshub.EventInfo) error {
for _, matcher := range b.matchers {
err := matcher.WithContext(ctx)(info)
if err != nil {
return err
}
}
return nil
})

if err != nil {
internalErr = err
return false, nil
}

events = append(events, storeEvents...)
}

for _, transformOrAssertion := range b.transformsOrAssertions {
var err error
events, err = transformOrAssertion(ctx, events)
if err != nil {
internalErr = err
return false, nil
}
}

internalErr = nil
return true, nil
})

if internalErr != nil {
t.Fatal(internalErr)
}
}
}

0 comments on commit 06873f3

Please sign in to comment.