Skip to content

Commit

Permalink
internal/civisibility: intelligent test runner support (#2943)
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyredondo authored Oct 29, 2024
1 parent 6ee2ccc commit fcda5e2
Show file tree
Hide file tree
Showing 32 changed files with 2,880 additions and 66 deletions.
21 changes: 21 additions & 0 deletions internal/civisibility/constants/test_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,27 @@ const (

// TestEarlyFlakeDetectionRetryAborted indicates a retry abort reason by the early flake detection feature
TestEarlyFlakeDetectionRetryAborted = "test.early_flake.abort_reason"

// TestSkippedByITR indicates a test skipped by the ITR feature
TestSkippedByITR = "test.skipped_by_itr"

// SkippedByITRReason indicates the reason why the test was skipped by the ITR feature
SkippedByITRReason = "Skipped by Datadog Intelligent Test Runner"

// ITRTestsSkipped indicates that tests were skipped by the ITR feature
ITRTestsSkipped = "_dd.ci.itr.tests_skipped"

// ITRTestsSkippingEnabled indicates that the ITR test skipping feature is enabled
ITRTestsSkippingEnabled = "test.itr.tests_skipping.enabled"

// ITRTestsSkippingType indicates the type of ITR test skipping
ITRTestsSkippingType = "test.itr.tests_skipping.type"

// ITRTestsSkippingCount indicates the number of tests skipped by the ITR feature
ITRTestsSkippingCount = "test.itr.tests_skipping.count"

// CodeCoverageEnabled indicates that code coverage is enabled
CodeCoverageEnabled = "test.code_coverage.enabled"
)

// Define valid test status types.
Expand Down
48 changes: 42 additions & 6 deletions internal/civisibility/integrations/civisibility_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type (
)

var (
// settingsInitializationOnce ensures we do the settings initialization just once
settingsInitializationOnce sync.Once

// additionalFeaturesInitializationOnce ensures we do the additional features initialization just once
additionalFeaturesInitializationOnce sync.Once

Expand All @@ -53,12 +56,14 @@ var (

// ciVisibilityFlakyRetriesSettings contains the CI Visibility Flaky Retries settings for this session
ciVisibilityFlakyRetriesSettings FlakyRetriesSetting

// ciVisibilitySkippables contains the CI Visibility skippable tests for this session
ciVisibilitySkippables map[string]map[string][]net.SkippableResponseDataAttributes
)

// ensureAdditionalFeaturesInitialization initialize all the additional features
func ensureAdditionalFeaturesInitialization(serviceName string) {
additionalFeaturesInitializationOnce.Do(func() {
log.Debug("civisibility: initializing additional features")
func ensureSettingsInitialization(serviceName string) {
settingsInitializationOnce.Do(func() {
log.Debug("civisibility: initializing settings")

// Create the CI Visibility client
ciVisibilityClient = net.NewClientWithServiceName(serviceName)
Expand Down Expand Up @@ -104,6 +109,17 @@ func ensureAdditionalFeaturesInitialization(serviceName string) {
<-uploadChannel
})
}
})
}

// ensureAdditionalFeaturesInitialization initialize all the additional features
func ensureAdditionalFeaturesInitialization(serviceName string) {
additionalFeaturesInitializationOnce.Do(func() {
log.Debug("civisibility: initializing additional features")
ensureSettingsInitialization(serviceName)
if ciVisibilityClient == nil {
return
}

// if early flake detection is enabled then we run the early flake detection request
if ciVisibilitySettings.EarlyFlakeDetection.Enabled {
Expand Down Expand Up @@ -133,13 +149,26 @@ func ensureAdditionalFeaturesInitialization(serviceName string) {
ciVisibilitySettings.FlakyTestRetriesEnabled = false
}
}

// if ITR is enabled then we do the skippable tests request
if ciVisibilitySettings.TestsSkipping {
// get the skippable tests
correlationId, skippableTests, err := ciVisibilityClient.GetSkippableTests()
if err != nil {
log.Error("civisibility: error getting CI visibility skippable tests: %v", err)
} else if skippableTests != nil {
log.Debug("civisibility: skippable tests loaded: %d suites", len(skippableTests))
utils.AddCITags(constants.ItrCorrelationIDTag, correlationId)
ciVisibilitySkippables = skippableTests
}
}
})
}

// GetSettings gets the settings from the backend settings endpoint
func GetSettings() *net.SettingsResponseData {
// call to ensure the additional features initialization is completed (service name can be null here)
ensureAdditionalFeaturesInitialization("")
// call to ensure the settings features initialization is completed (service name can be null here)
ensureSettingsInitialization("")
return &ciVisibilitySettings
}

Expand All @@ -157,6 +186,13 @@ func GetFlakyRetriesSettings() *FlakyRetriesSetting {
return &ciVisibilityFlakyRetriesSettings
}

// GetSkippableTests gets the skippable tests from the backend
func GetSkippableTests() map[string]map[string][]net.SkippableResponseDataAttributes {
// call to ensure the additional features initialization is completed (service name can be null here)
ensureAdditionalFeaturesInitialization("")
return ciVisibilitySkippables
}

func uploadRepositoryChanges() (bytes int64, err error) {
// get the search commits response
initialCommitData, err := getSearchCommits()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package coverage

import (
"bytes"
"encoding/binary"
"io"
"sync/atomic"

"github.com/tinylib/msgp/msgp"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

// coveragePayload is a slim copy of the payload struct from the tracer package.
type coveragePayload struct {
// header specifies the first few bytes in the msgpack stream
// indicating the type of array (fixarray, array16 or array32)
// and the number of items contained in the stream.
header []byte

// off specifies the current read position on the header.
off int

// count specifies the number of items in the stream.
count uint32

// buf holds the sequence of msgpack-encoded items.
buf bytes.Buffer

// reader is used for reading the contents of buf.
reader *bytes.Reader
}

var _ io.Reader = (*coveragePayload)(nil)

// newCoveragePayload returns a ready to use coverage payload.
func newCoveragePayload() *coveragePayload {
p := &coveragePayload{
header: make([]byte, 8),
off: 8,
}
return p
}

// push pushes a new item into the stream.
func (p *coveragePayload) push(testCoverageData *ciTestCoverageData) error {
p.buf.Grow(testCoverageData.Msgsize())
if err := msgp.Encode(&p.buf, testCoverageData); err != nil {
return err
}
atomic.AddUint32(&p.count, 1)
p.updateHeader()
return nil
}

// itemCount returns the number of items available in the srteam.
func (p *coveragePayload) itemCount() int {
return int(atomic.LoadUint32(&p.count))
}

// size returns the payload size in bytes. After the first read the value becomes
// inaccurate by up to 8 bytes.
func (p *coveragePayload) size() int {
return p.buf.Len() + len(p.header) - p.off
}

// reset sets up the payload to be read a second time. It maintains the
// underlying byte contents of the buffer. reset should not be used in order to
// reuse the payload for another set of traces.
func (p *coveragePayload) reset() {
p.updateHeader()
if p.reader != nil {
p.reader.Seek(0, 0)
}
}

// clear empties the payload buffers.
func (p *coveragePayload) clear() {
p.buf = bytes.Buffer{}
p.reader = nil
}

// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
const (
msgpackArrayFix byte = 144 // up to 15 items
msgpackArray16 = 0xdc // up to 2^16-1 items, followed by size in 2 bytes
msgpackArray32 = 0xdd // up to 2^32-1 items, followed by size in 4 bytes
)

// updateHeader updates the payload header based on the number of items currently
// present in the stream.
func (p *coveragePayload) updateHeader() {
n := uint64(atomic.LoadUint32(&p.count))
switch {
case n <= 15:
p.header[7] = msgpackArrayFix + byte(n)
p.off = 7
case n <= 1<<16-1:
binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes
p.header[5] = msgpackArray16
p.off = 5
default: // n <= 1<<32-1
binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes
p.header[3] = msgpackArray32
p.off = 3
}
}

// Close implements io.Closer
func (p *coveragePayload) Close() error {
return nil
}

// Read implements io.Reader. It reads from the msgpack-encoded stream.
func (p *coveragePayload) Read(b []byte) (n int, err error) {
if p.off < len(p.header) {
// reading header
n = copy(b, p.header[p.off:])
p.off += n
return n, nil
}
if p.reader == nil {
p.reader = bytes.NewReader(p.buf.Bytes())
}
return p.reader.Read(b)
}

// getBuffer retrieves the complete body of the CI Visibility coverage payload, including the header.
// It reads the current payload buffer, adds the header, and encodes the entire payload in MessagePack format.
//
// Returns:
//
// A pointer to a bytes.Buffer containing the encoded CI Visibility coverage payload.
// An error if reading from the buffer or encoding the payload fails.
func (p *coveragePayload) getBuffer() (*bytes.Buffer, error) {
log.Debug("coveragePayload: .getBuffer (count: %v)", p.itemCount())

// Create a buffer to read the current payload
payloadBuf := new(bytes.Buffer)
if _, err := payloadBuf.ReadFrom(p); err != nil {
return nil, err
}

// Create the final coverage payload
finalPayload := &ciTestCovPayload{
Version: 2,
Coverages: payloadBuf.Bytes(),
}

// Create a new buffer to encode the coverage payload in MessagePack format
encodedBuf := new(bytes.Buffer)
if err := msgp.Encode(encodedBuf, finalPayload); err != nil {
return nil, err
}

return encodedBuf, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

//go:generate msgp -unexported -marshal=false -o=test_coverage_msgp.go -tests=false

package coverage

import "github.com/tinylib/msgp/msgp"

type (
// ciTestCoveragePayloads represents a list of test code coverage payloads.
ciTestCoveragePayloads []*ciTestCovPayload

// ciTestCoverages represents a list of test code coverage data.
ciTestCoverages []*ciTestCoverageData

// ciTestCovPayload represents a test code coverage payload specifically designed for CI Visibility events.
ciTestCovPayload struct {
Version int32 `msg:"version"` // Version of the payload
Coverages msgp.Raw `msg:"coverages"` // list of coverages
}

// ciTestCoverageData represents the coverage data for a single test.
ciTestCoverageData struct {
SessionID uint64 `msg:"test_session_id"` // identifier of this session
SuiteID uint64 `msg:"test_suite_id"` // identifier of the suite
SpanID uint64 `msg:"span_id"` // identifier of this test
Files []*ciTestCoverageFile `msg:"files"` // list of files covered
}

// ciTestCoverageFile represents the coverage data for a single file.
ciTestCoverageFile struct {
FileName string `msg:"filename"` // name of the file
}
)

var (
_ msgp.Encodable = (*ciTestCoverageData)(nil)
_ msgp.Decodable = (*ciTestCoverageData)(nil)

_ msgp.Encodable = (*ciTestCoverages)(nil)
_ msgp.Decodable = (*ciTestCoverages)(nil)

_ msgp.Encodable = (*ciTestCovPayload)(nil)
_ msgp.Decodable = (*ciTestCoveragePayloads)(nil)
)

// newCiTestCovPayload creates a new instance of ciTestCovPayload.
func newCiTestCoverageData(tCove *testCoverage) *ciTestCoverageData {
return &ciTestCoverageData{
SessionID: tCove.sessionID,
SuiteID: tCove.suiteID,
SpanID: tCove.testID,
Files: newCiTestCoverageFiles(tCove.filesCovered),
}
}

// newCiTestCoverageFiles creates a new instance of ciTestCoverageFile array.
func newCiTestCoverageFiles(files []string) []*ciTestCoverageFile {
ciFiles := make([]*ciTestCoverageFile, 0, len(files))
for _, file := range files {
ciFiles = append(ciFiles, &ciTestCoverageFile{FileName: file})
}
return ciFiles
}
Loading

0 comments on commit fcda5e2

Please sign in to comment.