Skip to content

Commit

Permalink
fix: improve abortion check to use nats events (#4345)
Browse files Browse the repository at this point in the history
* fix: improve abortion check to use nats events

* fix: remove global bus var

* fix: use interfaces instead of bus object

* fix: synchronize using channels instead of atomic and move logic from event handler
  • Loading branch information
nicufk authored Sep 8, 2023
1 parent f71967a commit a74028e
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 103 deletions.
2 changes: 2 additions & 0 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ func main() {
configMapConfig,
configMapClient,
testsuiteExecutionsClient,
eventBus,
)

slackLoader, err := newSlackLoader(cfg, envs)
Expand Down Expand Up @@ -431,6 +432,7 @@ func main() {
cfg.TestkubeDashboardURI,
cfg.TestkubeHelmchartVersion,
mode,
eventBus,
)

if mode == common.ModeAgent {
Expand Down
4 changes: 4 additions & 0 deletions internal/app/api/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
testkubeclientset "github.com/kubeshop/testkube-operator/pkg/clientset/versioned"
"github.com/kubeshop/testkube/internal/app/api/metrics"
"github.com/kubeshop/testkube/pkg/event"
"github.com/kubeshop/testkube/pkg/event/bus"
"github.com/kubeshop/testkube/pkg/event/kind/cdevent"
"github.com/kubeshop/testkube/pkg/event/kind/slack"
"github.com/kubeshop/testkube/pkg/event/kind/webhook"
Expand Down Expand Up @@ -85,6 +86,7 @@ func NewTestkubeAPI(
dashboardURI string,
helmchartVersion string,
mode string,
eventsBus bus.Bus,
) TestkubeAPI {

var httpConfig server.Config
Expand Down Expand Up @@ -127,6 +129,7 @@ func NewTestkubeAPI(
TemplatesClient: templatesClient,
helmchartVersion: helmchartVersion,
mode: mode,
eventsBus: eventsBus,
}

// will be reused in websockets handler
Expand Down Expand Up @@ -181,6 +184,7 @@ type TestkubeAPI struct {
TemplatesClient *templatesclientv1.TemplatesClient
helmchartVersion string
mode string
eventsBus bus.Bus
}

type storageParams struct {
Expand Down
14 changes: 10 additions & 4 deletions internal/app/api/v1/testsuites.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/crd"
"github.com/kubeshop/testkube/pkg/datefilter"
"github.com/kubeshop/testkube/pkg/event/bus"
testsmapper "github.com/kubeshop/testkube/pkg/mapper/tests"
testsuiteexecutionsmapper "github.com/kubeshop/testkube/pkg/mapper/testsuiteexecutions"
testsuitesmapper "github.com/kubeshop/testkube/pkg/mapper/testsuites"
Expand Down Expand Up @@ -803,11 +804,14 @@ func (s TestkubeAPI) AbortTestSuiteHandler() fiber.Handler {

for _, execution := range executions {
execution.Status = testkube.TestSuiteExecutionStatusAborting
err = s.TestExecutionResults.Update(c.Context(), execution)
s.Log.Infow("aborting test suite execution", "executionID", execution.Id)
err := s.eventsBus.PublishTopic(bus.InternalPublishTopic, testkube.NewEventEndTestSuiteAborted(&execution))

if err != nil {
return s.Error(c, http.StatusInternalServerError, fmt.Errorf("%s: could not update test suite execution: %w", errPrefix, err))
return s.Error(c, http.StatusInternalServerError, fmt.Errorf("%s: could not sent test suite abortion event: %w", errPrefix, err))
}

s.Log.Infow("test suite execution aborted, event sent", "executionID", c.Params("executionID"))
}

return c.Status(http.StatusNoContent).SendString("")
Expand All @@ -828,11 +832,13 @@ func (s TestkubeAPI) AbortTestSuiteExecutionHandler() fiber.Handler {
}

execution.Status = testkube.TestSuiteExecutionStatusAborting
err = s.TestExecutionResults.Update(c.Context(), execution)

err = s.eventsBus.PublishTopic(bus.InternalPublishTopic, testkube.NewEventEndTestSuiteAborted(&execution))

if err != nil {
return s.Error(c, http.StatusInternalServerError, fmt.Errorf("%s: could not update test suite execution: %w", errPrefix, err))
return s.Error(c, http.StatusInternalServerError, fmt.Errorf("%s: could not sent test suite abortion event: %w", errPrefix, err))
}
s.Log.Infow("test suite execution aborted, event sent", "executionID", c.Params("executionID"))

return c.Status(http.StatusNoContent).SendString("")
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/event/bus/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ import (
"github.com/kubeshop/testkube/pkg/log"
)

var _ Bus = (*NATSBus)(nil)
var (
_ Bus = (*NATSBus)(nil)
)

const (
SubscribeBuffer = 1
SubscriptionName = "events"
SubscribeBuffer = 1
SubscriptionName = "events"
InternalPublishTopic = "internal.all"
InternalSubscribeTopic = "internal.>"
)

func NewNATSConnection(uri string) (*nats.EncodedConn, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduler
import (
"go.uber.org/zap"

"github.com/kubeshop/testkube/pkg/event/bus"
"github.com/kubeshop/testkube/pkg/repository/config"

executorsv1 "github.com/kubeshop/testkube-operator/client/executors/v1"
Expand Down Expand Up @@ -35,6 +36,7 @@ type Scheduler struct {
configMap config.Repository
configMapClient configmap.Interface
testSuiteExecutionsClient testsuiteexecutionsclientv1.Interface
eventsBus bus.Bus
}

func NewScheduler(
Expand All @@ -53,6 +55,7 @@ func NewScheduler(
configMap config.Repository,
configMapClient configmap.Interface,
testSuiteExecutionsClient testsuiteexecutionsclientv1.Interface,
eventsBus bus.Bus,
) *Scheduler {
return &Scheduler{
metrics: metrics,
Expand All @@ -70,5 +73,6 @@ func NewScheduler(
configMap: configMap,
configMapClient: configMapClient,
testSuiteExecutionsClient: testSuiteExecutionsClient,
eventsBus: eventsBus,
}
}
Loading

0 comments on commit a74028e

Please sign in to comment.