Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

System tests for transforms #2242

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/creack/pty v1.1.19
github.com/dustin/go-humanize v1.0.1
github.com/elastic/elastic-integration-corpus-generator-tool v0.10.0
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/elastic/go-elasticsearch/v8 v8.16.0
github.com/elastic/go-licenser v0.4.2
github.com/elastic/go-resource v0.2.0
github.com/elastic/go-ucfg v0.8.8
Expand Down Expand Up @@ -72,6 +72,7 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/elastic/gojsonschema v1.2.1 // indirect
github.com/elastic/kbncontent v0.1.4 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
Expand All @@ -84,6 +85,7 @@ require (
github.com/go-errors/errors v1.4.2 // indirect
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/errors v0.20.3 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
Expand Down Expand Up @@ -161,6 +163,9 @@ require (
github.com/xlab/treeprint v1.2.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.mongodb.org/mongo-driver v1.11.1 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
golang.org/x/crypto v0.29.0 // indirect
golang.org/x/exp/typeparams v0.0.0-20231108232855-2478ac86f678 // indirect
Expand Down
17 changes: 15 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/elastic/elastic-integration-corpus-generator-tool v0.10.0 h1:sx1lpZuTG5suJuvgix4FWQFCLFFbzkoOmPoHWYOPLCY=
github.com/elastic/elastic-integration-corpus-generator-tool v0.10.0/go.mod h1:2/30n+2QRzRzus4TPVUV1T3U/j8g2ItUgvP0pcpjLGk=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch/v8 v8.16.0 h1:f7bR+iBz8GTAVhwyFO3hm4ixsz2eMaEy0QroYnXV3jE=
github.com/elastic/go-elasticsearch/v8 v8.16.0/go.mod h1:lGMlgKIbYoRvay3xWBeKahAiJOgmFDsjZC39nmO3H64=
github.com/elastic/go-licenser v0.4.2 h1:bPbGm8bUd8rxzSswFOqvQh1dAkKGkgAmrPxbUi+Y9+A=
github.com/elastic/go-licenser v0.4.2/go.mod h1:W8eH6FaZDR8fQGm+7FnVa7MxI1b/6dAqxz+zPB8nm5c=
github.com/elastic/go-resource v0.2.0 h1:T92tw+THqISnCKaZBijlZMpEpCYkFkwsOgLQxKX6pqA=
Expand Down Expand Up @@ -138,8 +140,11 @@ github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399 h1:eMj
github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII=
github.com/go-git/go-git/v5 v5.12.0 h1:7Md+ndsjrzZxbddRDZjF14qK+NN56sy6wkqaVrjZtys=
github.com/go-git/go-git/v5 v5.12.0/go.mod h1:FTM9VKtnI2m65hNI/TenDDDnUf2Q9FHnXYjuz9i5OEY=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/errors v0.20.2/go.mod h1:cM//ZKUKyO06HSwqAelJ5NsEMMcpa6VpXe8DOa1Mi1M=
Expand Down Expand Up @@ -427,6 +432,14 @@ github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ
go.mongodb.org/mongo-driver v1.10.0/go.mod h1:wsihk0Kdgv8Kqu1Anit4sfK+22vSFbUrAVEYRhCXrA8=
go.mongodb.org/mongo-driver v1.11.1 h1:QP0znIRTuL0jf1oBQoAoM0C6ZJfBK4kx0Uumtv1A7w8=
go.mongodb.org/mongo-driver v1.11.1/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE=
go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca h1:VdD38733bfYv5tUZwEIskMM93VanwNIi5bIKnDrJdEY=
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca/go.mod h1:jxU+3+j+71eXOW14274+SmmuW82qJzl6iZSeqEtTGds=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
Expand Down
4 changes: 2 additions & 2 deletions internal/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"net/http"
"strings"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"

"github.com/elastic/elastic-package/internal/certs"
)
Expand Down
253 changes: 253 additions & 0 deletions internal/testrunner/runners/system/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"regexp"
"slices"
Expand Down Expand Up @@ -1896,6 +1897,12 @@ func (r *tester) checkTransforms(ctx context.Context, config *testConfig, pkgMan
return fmt.Errorf("no documents found in preview for transform %q", transformId)
}

// Check that there is no problem running the actual transform.
err = r.checkRunningTransformHealth(ctx, transformId)
if err != nil {
return fmt.Errorf("there are issues with installed transform %q: %w", transformId, err)
}

transformRootPath := filepath.Dir(transform.Path)
fieldsValidator, err := fields.CreateValidatorForDirectory(transformRootPath,
fields.WithSpecVersion(pkgManifest.SpecVersion),
Expand Down Expand Up @@ -1978,6 +1985,252 @@ func (r *tester) previewTransform(ctx context.Context, transformId string) ([]co
return preview.Documents, nil
}

/* XXX: unused code now, commented out for the linter till we investigate the issues with reset

func (r *tester) resetTransform(ctx context.Context, transformId string) error {
resp, err := r.esAPI.TransformResetTransform(transformId,
r.esAPI.TransformResetTransform.WithContext(ctx),
r.esAPI.TransformResetTransform.WithForce(true),
)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.IsError() {
return fmt.Errorf("failed to reset transform %q: %s", transformId, resp.String())
}

return nil
}

func (r *tester) startTransform(ctx context.Context, transformId string) error {
resp, err := r.esAPI.TransformStartTransform(transformId,
r.esAPI.TransformStartTransform.WithContext(ctx),
)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.IsError() {
return fmt.Errorf("failed to start transform %q: %s", transformId, resp.String())
}

return nil
}

*/

func (r *tester) scheduleTransform(ctx context.Context, transformId string) error {
resp, err := r.esAPI.TransformScheduleNowTransform(transformId,
r.esAPI.TransformScheduleNowTransform.WithContext(ctx),
)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.IsError() {
return fmt.Errorf("failed to schedule transform %q: %s", transformId, resp.String())
}

return nil
}

type transformStats struct {
Checkpointing struct {
Last struct {
Checkpoint int `json:"checkpoint"`
} `json:"last"`
Next struct {
Checkpoint int `json:"checkpoint"`
} `json:"next"`
LastSearchTime int `json:"last_search_time"`
} `json:"checkpointing"`
Health transformHealth `json:"health"`
Reason string `json:"reason"`
State string `json:"state"`
}

type transformHealth struct {
Status string `json:"status"`
Issues []struct {
Issue string `json:"issue"`
Details string `json:"details"`
Count int `json:"count"`
FirstOccurrence time.Time `json:"first_occurrence"`
} `json:"issues"`
}

func (r *tester) getTransformStats(ctx context.Context, transformId string) (*transformStats, error) {
resp, err := r.esAPI.TransformGetTransformStats(transformId,
r.esAPI.TransformGetTransformStats.WithContext(ctx),
r.esAPI.TransformGetTransformStats.WithAllowNoMatch(false),
)

if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.IsError() {
return nil, fmt.Errorf("failed to get transform stats for %q: %s", transformId, resp.String())
}

var response struct {
Transforms []transformStats `json:"transforms"`
}
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
if len(response.Transforms) != 1 {
return nil, fmt.Errorf("stats for %d transforms received when requesting only for %s", len(response.Transforms), transformId)
}
return &response.Transforms[0], nil
}

func (r *tester) checkTransformAuditMessages(ctx context.Context, transformId string) error {
// XXX: This is an internal API, are these audit messages available somewhere else?
const internalTransformsPath = "/internal/transform/transforms"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have found that some failures only appear in the audit messages, but these messages seem to be available only on internal APIs, that I guess don't have compatibility guarantees, nor will be available in all deployments.

We have to look for some other API.

@kpollich do you know how safe these internal APIs are to use? Or do you know if there are alternatives?

Though the errors found here also make the transform to fail with real-time data, so maybe this is an issue with test data that is discarded by being old. Then we'd have to review the test data too.

messagesPath := path.Join(internalTransformsPath, transformId, "messages")
query := "?sortField=timestamp&sortDirection=desc" // Required
statusCode, body, err := r.kibanaClient.SendRequest(ctx, http.MethodGet, messagesPath+query, nil)
if err != nil {
return fmt.Errorf("could not get transform audit messages: %w", err)
}
if statusCode >= 400 {
return fmt.Errorf("could not get transform audit messages: status code %d, body: %s", statusCode, body)
}

var resp struct {
Messages []struct {
TransformID string `json:"transform_id"`
Message string `json:"message"`
Level string `json:"level"`
Timestamp int `json:"timestamp"`
NodeName string `json:"node_name"`
} `json:"messages"`
}
err = json.Unmarshal(body, &resp)
if err != nil {
return fmt.Errorf("could not decode response: %w", err)
}

for _, message := range resp.Messages {
if message.Level == "error" {
return fmt.Errorf("failure found in transform: %s", message.Message)
}
}
return nil
}

// checkRunningTransformHealth checks the following for a given transform:
// - That it is started.
// - That it can execute at least once during the check.
// - That it hasn't generated any error message.
// - That it is healthy after executing at least once.
func (r *tester) checkRunningTransformHealth(ctx context.Context, transformId string) error {
const (
period = 1 * time.Second
timeout = 60 * time.Second
)
lastSearchTime := 0
last := -1
running := false

// Reset transform to clean any previous state.
/* XXX: It fails to create the index after reset :?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is failing now because it tries to create indexes with a name that is matched by a data stream template, and transforms need to use indexes, not data streams.

⚠️ This may be an actual issue because this means that the reset API breaks these transforms. We may need to review how transform indexes are named in packages to avoid these conflicts.

err := r.resetTransform(ctx, transformId)
if err != nil {
return fmt.Errorf("failed to reset transform: %w", err)
}
err = r.startTransform(ctx, transformId)
if err != nil {
return fmt.Errorf("failed to start transform after reset: %w", err)
}
*/
ok, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
stats, err := r.getTransformStats(ctx, transformId)
if err != nil {
return false, err
}
if last < 0 {
last = stats.Checkpointing.Last.Checkpoint
lastSearchTime = stats.Checkpointing.LastSearchTime
}
logger.Debugf("transform %s state: %s, health: %s, checkpoint %d, last search %d",
transformId,
stats.State, stats.Health.Status,
stats.Checkpointing.Last.Checkpoint, stats.Checkpointing.LastSearchTime)
switch stats.State {
case "failed":
return false, fmt.Errorf("transform in failed state: %s", stats.Reason)
case "aborting", "stopping", "stopped":
return false, fmt.Errorf("transform unexpectedly %s", stats.State)
case "indexing":
// It is already running, wait till indexing finishes.
running = true
return false, nil
case "started":
if !running {
logger.Debugf("scheduling transform %s now", transformId)
err := r.scheduleTransform(ctx, transformId)
if err != nil {
return false, fmt.Errorf("failed to schedule transform: %w", err)
}
running = true
return false, nil
}
default:
return false, fmt.Errorf("unexpected transform state %q", stats.State)
}

if stats.Checkpointing.Last.Checkpoint <= last && stats.Checkpointing.LastSearchTime <= lastSearchTime {
// There hasn't been any update yet, try again.
return false, nil
}

// We need to check the audit messages in case a document is removed but caused issues.
err = r.checkTransformAuditMessages(ctx, transformId)
if err != nil {
return false, err
}

err = healthError(stats.Health)
return err == nil, err
}, period, timeout)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("could not confirm successful executions of transform %s", transformId)
}
return nil
}

func healthError(health transformHealth) error {
if health.Status == "green" {
return nil
}

var msg strings.Builder
msg.WriteString("unexpected transform health status (" + health.Status + ")")

if len(health.Issues) > 0 {
msg.WriteString(": ")
for i, issue := range health.Issues {
msg.WriteString(issue.Issue + "(" + issue.Details + ")")
if i+1 < len(health.Issues) {
msg.WriteString(", ")
}
}
}

return errors.New(msg.String())
}

func filterAgents(allAgents []kibana.Agent, svcInfo servicedeployer.ServiceInfo) []kibana.Agent {
if svcInfo.Agent.Host.NamePrefix != "" {
logger.Debugf("filter agents using criteria: NamePrefix=%s", svcInfo.Agent.Host.NamePrefix)
Expand Down