From 4567a1a591b7196c2530f966e4cedf6fdf05ece4 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Tue, 24 Sep 2024 10:53:35 +0300 Subject: [PATCH] fix: [TKC-2583] tags OR operator for twe (#5861) * fix: tags OR operator Signed-off-by: Vladislav Sukhin * fix: add streaming logs Signed-off-by: Vladislav Sukhin * fix: add id to logs Signed-off-by: Vladislav Sukhin * fix: or for same tags, and for different tags Signed-off-by: Vladislav Sukhin * fix: unit tests Signed-off-by: Vladislav Sukhin * fix: unit test Signed-off-by: Vladislav Sukhin * fix: integration test Signed-off-by: Vladislav Sukhin --------- Signed-off-by: Vladislav Sukhin --- internal/app/api/v1/testworkflowexecutions.go | 23 +++- pkg/repository/testworkflow/mongo.go | 31 ++++- .../testworkflow/mongo_integration_test.go | 112 ++++++++++++++++++ 3 files changed, 160 insertions(+), 6 deletions(-) diff --git a/internal/app/api/v1/testworkflowexecutions.go b/internal/app/api/v1/testworkflowexecutions.go index b008778f42..feea1100ca 100644 --- a/internal/app/api/v1/testworkflowexecutions.go +++ b/internal/app/api/v1/testworkflowexecutions.go @@ -47,14 +47,29 @@ func (s *TestkubeAPI) StreamTestWorkflowExecutionNotificationsHandler() fiber.Ha // Stream the notifications ctx.SetBodyStreamWriter(func(w *bufio.Writer) { - _ = w.Flush() + err := w.Flush() + if err != nil { + s.Log.Errorw("could not flush stream body", "error", err, "id", id) + } + enc := json.NewEncoder(w) for n := range ctrl.Watch(ctx) { if n.Error == nil { - _ = enc.Encode(n.Value) - _, _ = fmt.Fprintf(w, "\n") - _ = w.Flush() + err := enc.Encode(n.Value) + if err != nil { + s.Log.Errorw("could not encode value", "error", err, "id", id) + } + + _, err = fmt.Fprintf(w, "\n") + if err != nil { + s.Log.Errorw("could not print new line", "error", err, "id", id) + } + + err = w.Flush() + if err != nil { + s.Log.Errorw("could not flush stream body", "error", err, "id", id) + } } } }) diff --git a/pkg/repository/testworkflow/mongo.go b/pkg/repository/testworkflow/mongo.go index e7c7ff704a..1697364eda 100644 --- a/pkg/repository/testworkflow/mongo.go +++ b/pkg/repository/testworkflow/mongo.go @@ -363,14 +363,41 @@ func composeQueryAndOpts(filter Filter) (bson.M, *options.FindOptions) { if filter.TagSelector() != "" { items := strings.Split(filter.TagSelector(), ",") + inValues := make(map[string][]string) + existsValues := make(map[string]struct{}) for _, item := range items { elements := strings.Split(item, "=") if len(elements) == 2 { - query["tags."+elements[0]] = elements[1] + inValues["tags."+utils.EscapeDots(elements[0])] = append(inValues["tags."+utils.EscapeDots(elements[0])], elements[1]) } else if len(elements) == 1 { - query["tags."+elements[0]] = bson.M{"$exists": true} + existsValues["tags."+utils.EscapeDots(elements[0])] = struct{}{} } } + subquery := bson.A{} + for tag, values := range inValues { + if _, ok := existsValues[tag]; ok { + subquery = append(subquery, bson.M{tag: bson.M{"$exists": true}}) + delete(existsValues, tag) + continue + } + + tagValues := bson.A{} + for _, value := range values { + tagValues = append(tagValues, value) + } + + if len(tagValues) > 0 { + subquery = append(subquery, bson.M{tag: bson.M{"$in": tagValues}}) + } + } + + for tag := range existsValues { + subquery = append(subquery, bson.M{tag: bson.M{"$exists": true}}) + } + + if len(subquery) > 0 { + query["$and"] = subquery + } } if filter.LabelSelector() != nil && len(filter.LabelSelector().Or) > 0 { diff --git a/pkg/repository/testworkflow/mongo_integration_test.go b/pkg/repository/testworkflow/mongo_integration_test.go index e5023d1258..784c68069e 100644 --- a/pkg/repository/testworkflow/mongo_integration_test.go +++ b/pkg/repository/testworkflow/mongo_integration_test.go @@ -213,3 +213,115 @@ func strPtr(s string) *string { func boolPtr(b bool) *bool { return &b } + +func TestNewMongoRepository_GetExecutions_Tags_Integration(t *testing.T) { + test.IntegrationTest(t) + + ctx := context.Background() + + client, err := mongo.Connect(ctx, options.Client().ApplyURI(cfg.APIMongoDSN)) + if err != nil { + t.Fatalf("error connecting to mongo: %v", err) + } + db := client.Database("testworkflow-executions-tags-mongo-repository-test") + t.Cleanup(func() { + db.Drop(ctx) + }) + + repo := NewMongoRepository(db, false) + + execution := testkube.TestWorkflowExecution{ + Id: "test-id-1", + Name: "test-name-1", + Workflow: &testkube.TestWorkflow{ + Name: "test-name-1", + Spec: &testkube.TestWorkflowSpec{}, + }, + Tags: map[string]string{ + "my.key1": "value1", + }, + } + if err := repo.Insert(ctx, execution); err != nil { + t.Fatalf("error inserting execution: %v", err) + } + + execution = testkube.TestWorkflowExecution{ + Id: "test-id-2", + Name: "test-name-2", + Workflow: &testkube.TestWorkflow{ + Name: "test-name-2", + Spec: &testkube.TestWorkflowSpec{}, + }, + Tags: map[string]string{ + "key2": "value2", + }, + } + if err := repo.Insert(ctx, execution); err != nil { + t.Fatalf("error inserting execution: %v", err) + } + + execution = testkube.TestWorkflowExecution{ + Id: "test-id-3", + Name: "test-name-3", + Workflow: &testkube.TestWorkflow{ + Name: "test-name-3", + Spec: &testkube.TestWorkflowSpec{}, + }, + Tags: map[string]string{ + "my.key1": "value3", + "key2": "", + }, + } + if err := repo.Insert(ctx, execution); err != nil { + t.Fatalf("error inserting execution: %v", err) + } + + res, err := repo.GetExecutions(ctx, NewExecutionsFilter()) + if err != nil { + t.Fatalf("error getting executions: %v", err) + } + + assert.Len(t, res, 3) + + tagSelector := "my.key1=value1" + res, err = repo.GetExecutions(ctx, NewExecutionsFilter().WithTagSelector(tagSelector)) + if err != nil { + t.Fatalf("error getting executions: %v", err) + } + + assert.Len(t, res, 1) + assert.Equal(t, "test-name-1", res[0].Name) + + tagSelector = "my.key1" + res, err = repo.GetExecutions(ctx, NewExecutionsFilter().WithTagSelector(tagSelector)) + if err != nil { + t.Fatalf("error getting executions: %v", err) + } + + assert.Len(t, res, 2) + + tagSelector = "my.key1=value3,key2" + res, err = repo.GetExecutions(ctx, NewExecutionsFilter().WithTagSelector(tagSelector)) + if err != nil { + t.Fatalf("error getting executions: %v", err) + } + + assert.Len(t, res, 1) + assert.Equal(t, "test-name-3", res[0].Name) + + tagSelector = "my.key1=value1,key2=value2" + res, err = repo.GetExecutions(ctx, NewExecutionsFilter().WithTagSelector(tagSelector)) + if err != nil { + t.Fatalf("error getting executions: %v", err) + } + + assert.Len(t, res, 0) + + tagSelector = "my.key1=value1,my.key1=value3" + res, err = repo.GetExecutions(ctx, NewExecutionsFilter().WithTagSelector(tagSelector)) + if err != nil { + t.Fatalf("error getting executions: %v", err) + } + + assert.Len(t, res, 2) +}