Skip to content

Commit

Permalink
fix: conflicting subscription name with control plane (#6042)
Browse files Browse the repository at this point in the history
* fix: conflicting subscription name with control plane

* fix: events to agentevents refactor in whole codebase
  • Loading branch information
exu authored Nov 19, 2024
1 parent 54a3f3d commit 15ddfbe
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 16 deletions.
2 changes: 1 addition & 1 deletion internal/graphql/services/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (s *executorsService) List(selector string) ([]testkube.ExecutorDetails, er
}

func (s *executorsService) SubscribeList(ctx context.Context, selector string) (<-chan []testkube.ExecutorDetails, error) {
return HandleSubscription(ctx, "events.executor.>", s, func() ([]testkube.ExecutorDetails, error) {
return HandleSubscription(ctx, "agentevents.executor.>", s, func() ([]testkube.ExecutorDetails, error) {
return s.List(selector)
})
}
2 changes: 1 addition & 1 deletion internal/graphql/services/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestExecutorsService_SubscribeList(t *testing.T) {
assert.NoError(t, err)
<-ch
client.Client = getMockExecutorClient(k8sObjects2).Client
assert.NoError(t, srvMock.BusMock().PublishTopic("events.executor.create", testkube.Event{
assert.NoError(t, srvMock.BusMock().PublishTopic("agentevents.executor.create", testkube.Event{
Type_: testkube.EventCreated,
Resource: testkube.EventResourceExecutor,
}))
Expand Down
10 changes: 5 additions & 5 deletions pkg/api/v1/testkube/model_event_extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
)

const (
TestStartSubject = "events.test.start"
TestStopSubject = "events.test.stop"
TestStartSubject = "agentevents.test.start"
TestStopSubject = "agentevents.test.stop"
)

// check if Event implements model generic event type
Expand Down Expand Up @@ -270,14 +270,14 @@ func (e Event) Topic() string {
}

if e.Resource == nil {
return "events.all"
return "agentevents.all"
}

if e.ResourceId == "" {
return "events." + string(*e.Resource)
return "agentevents." + string(*e.Resource)
}

return "events." + string(*e.Resource) + "." + e.ResourceId
return "agentevents." + string(*e.Resource) + "." + e.ResourceId
}

// GetResourceId implmenents generic event trigger
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/v1/testkube/model_event_extended_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,20 +339,20 @@ func TestEvent_Topic(t *testing.T) {
t.Parallel()

evt := Event{Type_: EventStartTest, Resource: nil}
assert.Equal(t, "events.all", evt.Topic())
assert.Equal(t, "agentevents.all", evt.Topic())
})

t.Run("should return event topic with resource name and id if set", func(t *testing.T) {
t.Parallel()

evt := Event{Type_: EventStartTest, Resource: EventResourceExecutor, ResourceId: "a12"}
assert.Equal(t, "events.executor.a12", evt.Topic())
assert.Equal(t, "agentevents.executor.a12", evt.Topic())
})

t.Run("should return event topic with resource name when id not set", func(t *testing.T) {
t.Parallel()

evt := Event{Type_: EventStartTest, Resource: EventResourceExecutor}
assert.Equal(t, "events.executor", evt.Topic())
assert.Equal(t, "agentevents.executor", evt.Topic())
})
}
2 changes: 1 addition & 1 deletion pkg/event/bus/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (

const (
SubscribeBuffer = 1
SubscriptionName = "events"
SubscriptionName = "agentevents"
InternalPublishTopic = "internal.all"
InternalSubscribeTopic = "internal.>"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/event/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (e *Emitter) Listen(ctx context.Context) {
}

func (e *Emitter) startListener(l common.Listener) {
err := e.Bus.SubscribeTopic("events.>", l.Name(), e.notifyHandler(l))
err := e.Bus.SubscribeTopic("agentevents.>", l.Name(), e.notifyHandler(l))
if err != nil {
e.Log.Errorw("error while starting listener", "error", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/logs/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
const (
StreamPrefix = "log"

StartSubject = "events.logs.start"
StopSubject = "events.logs.stop"
StartSubject = "agentevents.logs.start"
StopSubject = "agentevents.logs.stop"
)

//go:generate mockgen -destination=./mock_stream.go -package=client "github.com/kubeshop/testkube/pkg/logs/client" Stream
Expand Down
4 changes: 2 additions & 2 deletions pkg/logs/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const (
StartQueue = "logsstart"
StopQueue = "logsstop"

LogStartSubject = "events.logs.start"
LogStopSubject = "events.logs.stop"
LogStartSubject = "agentevents.logs.start"
LogStopSubject = "agentevents.logs.stop"
)

var (
Expand Down

0 comments on commit 15ddfbe

Please sign in to comment.