From 559e15faeafb69ca65967878b4560e374eb561bf Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 19 Nov 2024 18:16:09 +0100 Subject: [PATCH 1/3] Test transforms --- go.mod | 7 +- go.sum | 17 +- internal/elasticsearch/client.go | 4 +- internal/testrunner/runners/system/tester.go | 161 +++++++++++++++++++ 4 files changed, 184 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index e8aed8c97..08b59894a 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/creack/pty v1.1.19 github.com/dustin/go-humanize v1.0.1 github.com/elastic/elastic-integration-corpus-generator-tool v0.10.0 - github.com/elastic/go-elasticsearch/v7 v7.17.10 + github.com/elastic/go-elasticsearch/v8 v8.16.0 github.com/elastic/go-licenser v0.4.2 github.com/elastic/go-resource v0.2.0 github.com/elastic/go-ucfg v0.8.8 @@ -72,6 +72,7 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dnephin/pflag v1.0.7 // indirect github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect + github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect github.com/elastic/gojsonschema v1.2.1 // indirect github.com/elastic/kbncontent v0.1.4 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect @@ -84,6 +85,7 @@ require ( github.com/go-errors/errors v1.4.2 // indirect github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-openapi/errors v0.20.3 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect @@ -161,6 +163,9 @@ require ( github.com/xlab/treeprint v1.2.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.mongodb.org/mongo-driver v1.11.1 // indirect + go.opentelemetry.io/otel v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.opentelemetry.io/otel/trace v1.28.0 // indirect go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect golang.org/x/crypto v0.29.0 // indirect golang.org/x/exp/typeparams v0.0.0-20231108232855-2478ac86f678 // indirect diff --git a/go.sum b/go.sum index 6eb62f1fd..97c2f48bf 100644 --- a/go.sum +++ b/go.sum @@ -88,8 +88,10 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elastic/elastic-integration-corpus-generator-tool v0.10.0 h1:sx1lpZuTG5suJuvgix4FWQFCLFFbzkoOmPoHWYOPLCY= github.com/elastic/elastic-integration-corpus-generator-tool v0.10.0/go.mod h1:2/30n+2QRzRzus4TPVUV1T3U/j8g2ItUgvP0pcpjLGk= -github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= -github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= +github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/elastic/go-elasticsearch/v8 v8.16.0 h1:f7bR+iBz8GTAVhwyFO3hm4ixsz2eMaEy0QroYnXV3jE= +github.com/elastic/go-elasticsearch/v8 v8.16.0/go.mod h1:lGMlgKIbYoRvay3xWBeKahAiJOgmFDsjZC39nmO3H64= github.com/elastic/go-licenser v0.4.2 h1:bPbGm8bUd8rxzSswFOqvQh1dAkKGkgAmrPxbUi+Y9+A= github.com/elastic/go-licenser v0.4.2/go.mod h1:W8eH6FaZDR8fQGm+7FnVa7MxI1b/6dAqxz+zPB8nm5c= github.com/elastic/go-resource v0.2.0 h1:T92tw+THqISnCKaZBijlZMpEpCYkFkwsOgLQxKX6pqA= @@ -138,8 +140,11 @@ github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399 h1:eMj github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII= github.com/go-git/go-git/v5 v5.12.0 h1:7Md+ndsjrzZxbddRDZjF14qK+NN56sy6wkqaVrjZtys= github.com/go-git/go-git/v5 v5.12.0/go.mod h1:FTM9VKtnI2m65hNI/TenDDDnUf2Q9FHnXYjuz9i5OEY= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-openapi/errors v0.20.2/go.mod h1:cM//ZKUKyO06HSwqAelJ5NsEMMcpa6VpXe8DOa1Mi1M= @@ -427,6 +432,14 @@ github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ go.mongodb.org/mongo-driver v1.10.0/go.mod h1:wsihk0Kdgv8Kqu1Anit4sfK+22vSFbUrAVEYRhCXrA8= go.mongodb.org/mongo-driver v1.11.1 h1:QP0znIRTuL0jf1oBQoAoM0C6ZJfBK4kx0Uumtv1A7w8= go.mongodb.org/mongo-driver v1.11.1/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= +go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= go.starlark.net v0.0.0-20230525235612-a134d8f9ddca h1:VdD38733bfYv5tUZwEIskMM93VanwNIi5bIKnDrJdEY= go.starlark.net v0.0.0-20230525235612-a134d8f9ddca/go.mod h1:jxU+3+j+71eXOW14274+SmmuW82qJzl6iZSeqEtTGds= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= diff --git a/internal/elasticsearch/client.go b/internal/elasticsearch/client.go index 20dfff16b..71a61f517 100644 --- a/internal/elasticsearch/client.go +++ b/internal/elasticsearch/client.go @@ -14,8 +14,8 @@ import ( "net/http" "strings" - "github.com/elastic/go-elasticsearch/v7" - "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/elastic/elastic-package/internal/certs" ) diff --git a/internal/testrunner/runners/system/tester.go b/internal/testrunner/runners/system/tester.go index 309636007..0248115fd 100644 --- a/internal/testrunner/runners/system/tester.go +++ b/internal/testrunner/runners/system/tester.go @@ -1896,6 +1896,12 @@ func (r *tester) checkTransforms(ctx context.Context, config *testConfig, pkgMan return fmt.Errorf("no documents found in preview for transform %q", transformId) } + // Check that there is no problem running the actual transform. + err = r.checkRunningTransformHealth(ctx, transformId) + if err != nil { + return fmt.Errorf("there are issues with installed transform %q: %w", transformId, err) + } + transformRootPath := filepath.Dir(transform.Path) fieldsValidator, err := fields.CreateValidatorForDirectory(transformRootPath, fields.WithSpecVersion(pkgManifest.SpecVersion), @@ -1978,6 +1984,161 @@ func (r *tester) previewTransform(ctx context.Context, transformId string) ([]co return preview.Documents, nil } +func (r *tester) scheduleTransform(ctx context.Context, transformId string) error { + resp, err := r.esAPI.TransformScheduleNowTransform(transformId, + r.esAPI.TransformScheduleNowTransform.WithContext(ctx), + ) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.IsError() { + return fmt.Errorf("failed to schedule transform %q: %s", transformId, resp.String()) + } + + return nil +} + +type transformStats struct { + Checkpointing struct { + Last struct { + Checkpoint int `json:"checkpoint"` + } `json:"last"` + Next struct { + Checkpoint int `json:"checkpoint"` + } `json:"next"` + LastSearchTime int `json:"last_search_time"` + } `json:"checkpointing"` + Health transformHealth `json:"health"` + Reason string `json:"reason"` + State string `json:"state"` +} + +type transformHealth struct { + Status string `json:"status"` + Issues []struct { + Issue string `json:"issue"` + Details string `json:"details"` + Count int `json:"count"` + FirstOccurrence time.Time `json:"first_occurrence"` + } `json:"issues"` +} + +func (r *tester) getTransformStats(ctx context.Context, transformId string) (*transformStats, error) { + resp, err := r.esAPI.TransformGetTransformStats(transformId, + r.esAPI.TransformGetTransformStats.WithContext(ctx), + r.esAPI.TransformGetTransformStats.WithAllowNoMatch(false), + ) + + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.IsError() { + return nil, fmt.Errorf("failed to get transform stats for %q: %s", transformId, resp.String()) + } + + var response struct { + Transforms []transformStats `json:"transforms"` + } + err = json.NewDecoder(resp.Body).Decode(&response) + if err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + if len(response.Transforms) != 1 { + return nil, fmt.Errorf("stats for %d transforms received when requesting only for %s", len(response.Transforms), transformId) + } + return &response.Transforms[0], nil +} + +// checkRunningTransformHealth checks the following for a given transform: +// - That it is started. +// - That it can execute at least once during the check. +// - That it is healthy after executing at least once. +func (r *tester) checkRunningTransformHealth(ctx context.Context, transformId string) error { + const ( + period = 1 * time.Second + timeout = 60 * time.Second + ) + lastSearchTime := 0 + last := -1 + running := false + ok, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) { + stats, err := r.getTransformStats(ctx, transformId) + if err != nil { + return false, err + } + if last < 0 { + last = stats.Checkpointing.Last.Checkpoint + lastSearchTime = stats.Checkpointing.LastSearchTime + } + logger.Debugf("transform %s state: %s, health: %s, checkpoint %d, last search %d", + transformId, + stats.State, stats.Health.Status, + stats.Checkpointing.Last.Checkpoint, stats.Checkpointing.LastSearchTime) + switch stats.State { + case "failed": + return false, fmt.Errorf("transform in failed state: %s", stats.Reason) + case "aborting", "stopping", "stopped": + return false, fmt.Errorf("transform unexpectedly %s", stats.State) + case "indexing": + // It is already running, wait till indexing finishes. + running = true + return false, nil + case "started": + if !running { + logger.Debugf("scheduling transform %s now", transformId) + err := r.scheduleTransform(ctx, transformId) + if err != nil { + return false, fmt.Errorf("failed to schedule transform: %w", err) + } + running = true + return false, nil + } + default: + return false, fmt.Errorf("unexpected transform state %q", stats.State) + } + + if stats.Checkpointing.Last.Checkpoint <= last && stats.Checkpointing.LastSearchTime <= lastSearchTime { + // There hasn't been any update yet, try again. + return false, nil + } + + err = healthError(stats.Health) + return err == nil, err + }, period, timeout) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("could not confirm successful executions of transform %s", transformId) + } + return nil +} + +func healthError(health transformHealth) error { + if health.Status == "green" { + return nil + } + + var msg strings.Builder + msg.WriteString("unexpected transform health status (" + health.Status + ")") + + if len(health.Issues) > 0 { + msg.WriteString(": ") + for i, issue := range health.Issues { + msg.WriteString(issue.Issue + "(" + issue.Details + ")") + if i+1 < len(health.Issues) { + msg.WriteString(", ") + } + } + } + + return errors.New(msg.String()) +} + func filterAgents(allAgents []kibana.Agent, svcInfo servicedeployer.ServiceInfo) []kibana.Agent { if svcInfo.Agent.Host.NamePrefix != "" { logger.Debugf("filter agents using criteria: NamePrefix=%s", svcInfo.Agent.Host.NamePrefix) From 899a800ed26e41ee1bbcad75b6022f33abe847dc Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 22 Nov 2024 12:04:13 +0100 Subject: [PATCH 2/3] Reset transform before each check --- internal/testrunner/runners/system/tester.go | 88 ++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/internal/testrunner/runners/system/tester.go b/internal/testrunner/runners/system/tester.go index 0248115fd..2837f7a68 100644 --- a/internal/testrunner/runners/system/tester.go +++ b/internal/testrunner/runners/system/tester.go @@ -12,6 +12,7 @@ import ( "fmt" "net/http" "os" + "path" "path/filepath" "regexp" "slices" @@ -1984,6 +1985,39 @@ func (r *tester) previewTransform(ctx context.Context, transformId string) ([]co return preview.Documents, nil } +func (r *tester) resetTransform(ctx context.Context, transformId string) error { + resp, err := r.esAPI.TransformResetTransform(transformId, + r.esAPI.TransformResetTransform.WithContext(ctx), + r.esAPI.TransformResetTransform.WithForce(true), + ) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.IsError() { + return fmt.Errorf("failed to reset transform %q: %s", transformId, resp.String()) + } + + return nil +} + +func (r *tester) startTransform(ctx context.Context, transformId string) error { + resp, err := r.esAPI.TransformStartTransform(transformId, + r.esAPI.TransformStartTransform.WithContext(ctx), + ) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.IsError() { + return fmt.Errorf("failed to start transform %q: %s", transformId, resp.String()) + } + + return nil +} + func (r *tester) scheduleTransform(ctx context.Context, transformId string) error { resp, err := r.esAPI.TransformScheduleNowTransform(transformId, r.esAPI.TransformScheduleNowTransform.WithContext(ctx), @@ -2053,9 +2087,45 @@ func (r *tester) getTransformStats(ctx context.Context, transformId string) (*tr return &response.Transforms[0], nil } +func (r *tester) checkTransformAuditMessages(ctx context.Context, transformId string) error { + // XXX: This is an internal API, are these audit messages available somewhere else? + const internalTransformsPath = "/internal/transform/transforms" + messagesPath := path.Join(internalTransformsPath, transformId, "messages") + query := "?sortField=timestamp&sortDirection=desc" // Required + statusCode, body, err := r.kibanaClient.SendRequest(ctx, http.MethodGet, messagesPath+query, nil) + if err != nil { + return fmt.Errorf("could not get transform audit messages: %w", err) + } + if statusCode >= 400 { + return fmt.Errorf("could not get transform audit messages: status code %d, body: %s", statusCode, body) + } + + var resp struct { + Messages []struct { + TransformID string `json:"transform_id"` + Message string `json:"message"` + Level string `json:"level"` + Timestamp int `json:"timestamp"` + NodeName string `json:"node_name"` + } `json:"messages"` + } + err = json.Unmarshal(body, &resp) + if err != nil { + return fmt.Errorf("could not decode response: %w", err) + } + + for _, message := range resp.Messages { + if message.Level == "error" { + return fmt.Errorf("failure found in transform: %s", message.Message) + } + } + return nil +} + // checkRunningTransformHealth checks the following for a given transform: // - That it is started. // - That it can execute at least once during the check. +// - That it hasn't generated any error message. // - That it is healthy after executing at least once. func (r *tester) checkRunningTransformHealth(ctx context.Context, transformId string) error { const ( @@ -2065,6 +2135,18 @@ func (r *tester) checkRunningTransformHealth(ctx context.Context, transformId st lastSearchTime := 0 last := -1 running := false + + // Reset transform to clean any previous state. + /* XXX: It fails to create the index after reset :? + err := r.resetTransform(ctx, transformId) + if err != nil { + return fmt.Errorf("failed to reset transform: %w", err) + } + err = r.startTransform(ctx, transformId) + if err != nil { + return fmt.Errorf("failed to start transform after reset: %w", err) + } + */ ok, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) { stats, err := r.getTransformStats(ctx, transformId) if err != nil { @@ -2106,6 +2188,12 @@ func (r *tester) checkRunningTransformHealth(ctx context.Context, transformId st return false, nil } + // We need to check the audit messages in case a document is removed but caused issues. + err = r.checkTransformAuditMessages(ctx, transformId) + if err != nil { + return false, err + } + err = healthError(stats.Health) return err == nil, err }, period, timeout) From dc741acbeab1238959f55a676bf9049391eb4a67 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 22 Nov 2024 13:14:12 +0100 Subject: [PATCH 3/3] Linting --- internal/testrunner/runners/system/tester.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/testrunner/runners/system/tester.go b/internal/testrunner/runners/system/tester.go index 2837f7a68..4e0140cde 100644 --- a/internal/testrunner/runners/system/tester.go +++ b/internal/testrunner/runners/system/tester.go @@ -1985,6 +1985,8 @@ func (r *tester) previewTransform(ctx context.Context, transformId string) ([]co return preview.Documents, nil } +/* XXX: unused code now, commented out for the linter till we investigate the issues with reset + func (r *tester) resetTransform(ctx context.Context, transformId string) error { resp, err := r.esAPI.TransformResetTransform(transformId, r.esAPI.TransformResetTransform.WithContext(ctx), @@ -2018,6 +2020,8 @@ func (r *tester) startTransform(ctx context.Context, transformId string) error { return nil } +*/ + func (r *tester) scheduleTransform(ctx context.Context, transformId string) error { resp, err := r.esAPI.TransformScheduleNowTransform(transformId, r.esAPI.TransformScheduleNowTransform.WithContext(ctx),