From 14437375c69d328bfbc37bb5791cd688e09c5868 Mon Sep 17 00:00:00 2001 From: Dawid Rusnak Date: Wed, 10 Jul 2024 15:24:20 +0200 Subject: [PATCH] feat(performance): improve Log Processing performance (#5647) * feat(performance): buffer the logs sent from the container, to avoid sending message for each line * feat(performance): batch Test Workflow's result updates * fix(testworkflows): handle getting long container logs after the log rotation happens @see {@link https://stackoverflow.com/a/68673451} * feat(testworkflows): optimize reading timestamp from Kubernetes logs * feat(testworkflows): optimize buffering logs * feat(testworkflows): use native channel instead of heavier Channels for WatchInstrumentedPod * feat(testworkflows): increase buffer size for logs buffering --- .../testworkflowcontroller/channel.go | 5 + .../testworkflowcontroller/controller.go | 8 +- .../testworkflowcontroller/logs.go | 361 +++++++++++++++--- .../testworkflowcontroller/logs_test.go | 48 ++- .../testworkflowcontroller/notifier.go | 106 ++++- .../testworkflowcontroller/utils.go | 2 +- .../watchinstrumentedpod.go | 11 +- .../testworkflowexecutor/executor.go | 2 +- 8 files changed, 463 insertions(+), 80 deletions(-) diff --git a/pkg/testworkflows/testworkflowcontroller/channel.go b/pkg/testworkflows/testworkflowcontroller/channel.go index e9da67db0f9..00702e80c00 100644 --- a/pkg/testworkflows/testworkflowcontroller/channel.go +++ b/pkg/testworkflows/testworkflowcontroller/channel.go @@ -20,6 +20,7 @@ type Channel[T any] interface { Channel() <-chan ChannelMessage[T] Close() Done() <-chan struct{} + CtxErr() error } type channel[T any] struct { @@ -168,3 +169,7 @@ func (c *channel[T]) Close() { func (c *channel[T]) Done() <-chan struct{} { return c.ctx.Done() } + +func (c *channel[T]) CtxErr() error { + return c.ctx.Err() +} diff --git a/pkg/testworkflows/testworkflowcontroller/controller.go b/pkg/testworkflows/testworkflowcontroller/controller.go index bc074e216e6..ef7b2cbc52c 100644 --- a/pkg/testworkflows/testworkflowcontroller/controller.go +++ b/pkg/testworkflows/testworkflowcontroller/controller.go @@ -212,7 +212,7 @@ func (c *controller) StopController() { } func (c *controller) Watch(parentCtx context.Context) <-chan ChannelMessage[Notification] { - w, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{ + ch, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{ JobEvents: c.jobEvents, Job: c.job, }) @@ -222,7 +222,7 @@ func (c *controller) Watch(parentCtx context.Context) <-chan ChannelMessage[Noti v.Close() return v.Channel() } - return w.Channel() + return ch } // TODO: Make it actually light @@ -281,7 +281,7 @@ func (c *controller) Logs(parentCtx context.Context, follow bool) io.Reader { case <-c.podEvents.Peek(parentCtx): case <-alignTimeoutCh: } - w, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{ + ch, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{ JobEvents: c.jobEvents, Job: c.job, Follow: common.Ptr(follow), @@ -289,7 +289,7 @@ func (c *controller) Logs(parentCtx context.Context, follow bool) io.Reader { if err != nil { return } - for v := range w.Channel() { + for v := range ch { if v.Error == nil && v.Value.Log != "" && !v.Value.Temporary { if ref != v.Value.Ref && v.Value.Ref != "" { ref = v.Value.Ref diff --git a/pkg/testworkflows/testworkflowcontroller/logs.go b/pkg/testworkflows/testworkflowcontroller/logs.go index 15d9072d191..7fd97ef4bdc 100644 --- a/pkg/testworkflows/testworkflowcontroller/logs.go +++ b/pkg/testworkflows/testworkflowcontroller/logs.go @@ -2,20 +2,31 @@ package testworkflowcontroller import ( "bufio" + "bytes" "context" - "errors" "io" "strings" + "sync" "time" + "unsafe" - errors2 "github.com/pkg/errors" + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "github.com/kubeshop/testkube/cmd/testworkflow-init/data" + "github.com/kubeshop/testkube/internal/common" + "github.com/kubeshop/testkube/pkg/log" "github.com/kubeshop/testkube/pkg/utils" ) +const ( + FlushLogMaxSize = 100_000 + FlushBufferSize = 65_536 + FlushLogTime = 100 * time.Millisecond +) + type Comment struct { Time time.Time Hint *data.Instruction @@ -29,63 +40,220 @@ type ContainerLog struct { Output *data.Instruction } -func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, follow bool, pod Channel[*corev1.Pod]) Channel[ContainerLog] { +// getContainerLogsStream is getting logs stream, and tries to reinitialize the stream on EOF. +// EOF may happen not only on the actual container end, but also in case of the log rotation. +// @see {@link https://stackoverflow.com/a/68673451} +func getContainerLogsStream(ctx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, pod Channel[*corev1.Pod], since *time.Time) (io.Reader, error) { + // Fail immediately if the context is finished + if ctx.Err() != nil { + return nil, ctx.Err() + } + + // Build Kubernetes structure for time + var sinceTime *metav1.Time + if since != nil { + sinceTime = &metav1.Time{Time: *since} + } + + // Create logs stream request + req := clientSet.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{ + Container: containerName, + Follow: true, + Timestamps: true, + SinceTime: sinceTime, + }) + var err error + var stream io.ReadCloser + for { + stream, err = req.Stream(ctx) + if err != nil { + // The container is not necessarily already started when Started event is received + if !strings.Contains(err.Error(), "is waiting to start") { + return nil, err + } + p := <-pod.Peek(ctx) + if p == nil { + return bytes.NewReader(nil), io.EOF + } + containerDone := IsPodDone(p) + for i := range p.Status.InitContainerStatuses { + if p.Status.InitContainerStatuses[i].Name == containerName { + if p.Status.InitContainerStatuses[i].State.Terminated != nil { + containerDone = true + break + } + } + } + for i := range p.Status.ContainerStatuses { + if p.Status.ContainerStatuses[i].Name == containerName { + if p.Status.ContainerStatuses[i].State.Terminated != nil { + containerDone = true + break + } + } + } + + if containerDone { + return bytes.NewReader(nil), io.EOF + } + continue + } + break + } + return stream, nil +} + +func WatchContainerLogs(parentCtx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, pod Channel[*corev1.Pod]) Channel[ContainerLog] { + ctx, ctxCancel := context.WithCancel(parentCtx) w := newChannel[ContainerLog](ctx, bufferSize) go func() { - defer w.Close() + <-w.Done() + ctxCancel() + }() + + go func() { + defer ctxCancel() var err error + var since *time.Time + // Create logs stream request - req := clientSet.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{ - Follow: follow, - Timestamps: true, - Container: containerName, - }) - var stream io.ReadCloser - for { - stream, err = req.Stream(ctx) + stream, err := getContainerLogsStream(ctx, clientSet, namespace, podName, containerName, pod, since) + hadAnyContent := false + if err == io.EOF { + return + } else if err != nil { + w.Error(err) + return + } + + // Build a buffer for logs to avoid scheduling Log notification for each write + var logBufferLog bytes.Buffer + var logBufferTs time.Time + var logBufferMu sync.Mutex + var logBufferCh = make(chan struct{}, 1) + unsafeFlushLogBuffer := func() { + if logBufferLog.Len() == 0 || w.CtxErr() != nil { + return + } + message := make([]byte, logBufferLog.Len()) + _, err := logBufferLog.Read(message) if err != nil { - // The container is not necessarily already started when Started event is received - if !strings.Contains(err.Error(), "is waiting to start") { - w.Error(err) - return - } - p := <-pod.Peek(ctx) - if p != nil && IsPodDone(p) { - w.Error(errors.New("pod is finished and there are no logs for this container")) + log.DefaultLogger.Errorf("failed to read log buffer: %s/%s", podName, containerName) + return + } + w.Send(ContainerLog{Time: logBufferTs, Log: message}) + } + flushLogBuffer := func() { + logBufferMu.Lock() + defer logBufferMu.Unlock() + unsafeFlushLogBuffer() + } + appendLog := func(ts time.Time, log ...[]byte) { + logBufferMu.Lock() + defer logBufferMu.Unlock() + + initialLogLen := logBufferLog.Len() + if initialLogLen == 0 { + logBufferTs = ts + } + for i := range log { + logBufferLog.Write(log[i]) + } + + finalLogLen := logBufferLog.Len() + flushable := finalLogLen > FlushLogMaxSize + if flushable { + unsafeFlushLogBuffer() + } + + // Inform the flushing worker about a new log to flush. + // Do it only when it's not scheduled + if initialLogLen == 0 || flushable { + select { + case logBufferCh <- struct{}{}: + default: } - continue } - break } + // Flush the log automatically after 100ms + bufferCtx, bufferCtxCancel := context.WithCancel(ctx) + defer bufferCtxCancel() go func() { - <-w.Done() - _ = stream.Close() + t := time.NewTimer(FlushLogTime) + for { + t.Stop() + + if bufferCtx.Err() != nil { + return + } + + logLen := logBufferLog.Len() + if logLen == 0 { + select { + case <-bufferCtx.Done(): + return + case <-logBufferCh: + continue + } + } + + t.Reset(FlushLogTime) + select { + case <-bufferCtx.Done(): + if !t.Stop() { + <-t.C + } + return + case <-t.C: + flushLogBuffer() + case <-logBufferCh: + continue + } + } }() + // Flush the rest of logs if it is closed + defer flushLogBuffer() + // Parse and return the logs - reader := bufio.NewReader(stream) - var tsPrefix, tmpTsPrefix []byte + reader := bufio.NewReaderSize(stream, FlushBufferSize) + tsReader := newTimestampReader() isNewLine := false isStarted := false - var ts, tmpTs time.Time for { var prepend []byte // Read next timestamp - tmpTs, tmpTsPrefix, err = ReadTimestamp(reader) + err = tsReader.Read(reader) if err == nil { - ts = tmpTs - tsPrefix = tmpTsPrefix + // Strip older logs - SinceTime in Kubernetes logs is ignoring milliseconds precision + if since != nil && since.After(tsReader.ts) { + _, _ = utils.ReadLongLine(reader) + continue + } + hadAnyContent = true } else if err == io.EOF { - return + if !hadAnyContent { + return + } + // Reinitialize logs stream + since = common.Ptr(tsReader.ts.Add(1)) + stream, err = getContainerLogsStream(ctx, clientSet, namespace, podName, containerName, pod, since) + if err != nil { + return + } + reader.Reset(stream) + hadAnyContent = false + continue } else { // Edge case: Kubernetes may send critical errors without timestamp (like ionotify) - if len(tmpTsPrefix) > 0 { - prepend = tmpTsPrefix + if len(tsReader.Prefix()) > 0 { + prepend = bytes.Clone(tsReader.Prefix()) } + flushLogBuffer() w.Error(err) } @@ -104,33 +272,34 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam if err == nil && instruction != nil { isNewLine = false hadComment = true - log := ContainerLog{Time: ts} + log := ContainerLog{Time: tsReader.ts} if isHint { log.Hint = instruction } else { log.Output = instruction } + flushLogBuffer() w.Send(log) } // Append as regular log if expected if !hadComment { if !isStarted { - line = append(tsPrefix, line...) + appendLog(tsReader.ts, tsReader.Prefix(), line) isStarted = true } else if isNewLine { - line = append(append([]byte("\n"), tsPrefix...), line...) + appendLog(tsReader.ts, []byte("\n"), tsReader.Prefix(), line) } - w.Send(ContainerLog{Time: ts, Log: line}) isNewLine = true } } else if isStarted { - w.Send(ContainerLog{Time: ts, Log: append([]byte("\n"), tsPrefix...)}) + appendLog(tsReader.ts, []byte("\n"), tsReader.Prefix()) } // Handle the error if err != nil { if err != io.EOF { + flushLogBuffer() w.Error(err) } return @@ -141,31 +310,111 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam return w } -func ReadTimestamp(reader *bufio.Reader) (time.Time, []byte, error) { - tsPrefix := make([]byte, 31, 35) // 30 bytes for timestamp + 1 byte for space + 4 additional bytes for non-UTC timezone - count, err := io.ReadFull(reader, tsPrefix) +var ( + ErrInvalidTimestamp = errors.New("invalid timestamp") +) + +type timestampReader struct { + buffer []byte + bytes int + ts time.Time + utc *bool +} + +func newTimestampReader() *timestampReader { + return ×tampReader{ + buffer: make([]byte, 31, 36), // 30 bytes for timestamp + 1 byte for space + 5 additional bytes for non-UTC timezone + } +} + +func (t *timestampReader) Prefix() []byte { + return t.buffer[:t.bytes] +} + +// read is initial operation for reading the timestamp, +// that is the slowest one, but also detects the timestamp format. +// It's meant to be executed just once, for performance reasons. +func (t *timestampReader) read(reader *bufio.Reader) error { + // Read the possible timestamp slice + read, err := io.ReadFull(reader, t.buffer[:31]) + t.bytes = read if err != nil { - return time.Time{}, nil, err + return err } - if count < 31 { - return time.Time{}, nil, io.EOF + + // Detect the timezone format and adjust the reader if needed + utc := t.buffer[29] == 'Z' + t.utc = &utc + if !utc && len(t.buffer) < 35 { + // Increase capacity to store the +00:00 time + t.buffer = append(t.buffer, make([]byte, 5)...) + + // Read the missing part + read, err = io.ReadFull(reader, t.buffer[31:]) + t.bytes += read + if err != nil { + return err + } } - var ts time.Time - // Handle non-UTC timezones - if tsPrefix[29] == '+' { - tsSuffix := make([]byte, 5) - count, err = io.ReadFull(reader, tsSuffix) + + // Compute the timestamp + if utc { + ts, err := time.Parse(time.RFC3339Nano, unsafe.String(&t.buffer[0], 30)) if err != nil { - return time.Time{}, nil, err + return ErrInvalidTimestamp } - if count < 5 { - return time.Time{}, nil, io.EOF + t.ts = ts + } else { + ts, err := time.Parse(time.RFC3339Nano, unsafe.String(&t.buffer[0], 35)) + if err != nil { + return ErrInvalidTimestamp } - tsPrefix = append(tsPrefix, tsSuffix...) + t.ts = ts.UTC() } - ts, err = time.Parse(KubernetesTimezoneLogTimeFormat, string(tsPrefix[0:len(tsPrefix)-1])) + return nil +} + +// readUTC is optimized operation for reading the UTC timestamp (Z). +func (t *timestampReader) readUTC(reader *bufio.Reader) error { + // Read the possible timestamp slice + read, err := io.ReadFull(reader, t.buffer) + t.bytes = read if err != nil { - return time.Time{}, tsPrefix, errors2.Wrap(err, "parsing timestamp") + return err + } + + // Compute the timestamp + ts, err := time.Parse(time.RFC3339Nano, unsafe.String(&t.buffer[0], 30)) + if err != nil { + return ErrInvalidTimestamp + } + t.ts = ts + return nil +} + +// readNonUTC is optimized operation for reading the non-UTC timestamp (+00:00). +func (t *timestampReader) readNonUTC(reader *bufio.Reader) error { + // Read the possible timestamp slice + read, err := io.ReadFull(reader, t.buffer) + t.bytes = read + if err != nil { + return err + } + + // Compute the timestamp + ts, err := time.Parse(time.RFC3339Nano, unsafe.String(&t.buffer[0], 35)) + if err != nil { + return ErrInvalidTimestamp + } + t.ts = ts.UTC() + return nil +} + +func (t *timestampReader) Read(reader *bufio.Reader) error { + if t.utc == nil { + return t.read(reader) + } else if *t.utc { + return t.readUTC(reader) } - return ts.UTC(), tsPrefix, nil + return t.readNonUTC(reader) } diff --git a/pkg/testworkflows/testworkflowcontroller/logs_test.go b/pkg/testworkflows/testworkflowcontroller/logs_test.go index f181034773b..f9e4f6c64e9 100644 --- a/pkg/testworkflows/testworkflowcontroller/logs_test.go +++ b/pkg/testworkflows/testworkflowcontroller/logs_test.go @@ -10,26 +10,58 @@ import ( "github.com/stretchr/testify/assert" ) -func Test_ReadTimestamp_UTC(t *testing.T) { +func Test_ReadTimestamp_UTC_Initial(t *testing.T) { + reader := newTimestampReader() prefix := "2024-06-07T12:41:49.037275300Z " message := "some-message" buf := bufio.NewReader(bytes.NewBufferString(prefix + message)) - ts, byt, err := ReadTimestamp(buf) + err := reader.Read(buf) rest, _ := io.ReadAll(buf) assert.NoError(t, err) - assert.Equal(t, []byte(prefix), byt) + assert.Equal(t, []byte(prefix), reader.Prefix()) assert.Equal(t, []byte(message), rest) - assert.Equal(t, time.Date(2024, 6, 7, 12, 41, 49, 37275300, time.UTC), ts) + assert.Equal(t, time.Date(2024, 6, 7, 12, 41, 49, 37275300, time.UTC), reader.ts) } -func Test_ReadTimestamp_NonUTC(t *testing.T) { +func Test_ReadTimestamp_NonUTC_Initial(t *testing.T) { + reader := newTimestampReader() prefix := "2024-06-07T15:41:49.037275300+03:00 " message := "some-message" buf := bufio.NewReader(bytes.NewBufferString(prefix + message)) - ts, byt, err := ReadTimestamp(buf) + err := reader.Read(buf) rest, _ := io.ReadAll(buf) assert.NoError(t, err) - assert.Equal(t, []byte(prefix), byt) + assert.Equal(t, []byte(prefix), reader.Prefix()) assert.Equal(t, []byte(message), rest) - assert.Equal(t, time.Date(2024, 6, 7, 12, 41, 49, 37275300, time.UTC), ts) + assert.Equal(t, time.Date(2024, 6, 7, 12, 41, 49, 37275300, time.UTC), reader.ts) +} + +func Test_ReadTimestamp_UTC_Recurring(t *testing.T) { + reader := newTimestampReader() + prefix := "2024-06-07T12:41:49.037275300Z " + message := "some-message" + buf := bufio.NewReader(bytes.NewBufferString(prefix + prefix + message)) + err1 := reader.Read(buf) + err2 := reader.Read(buf) + rest, _ := io.ReadAll(buf) + assert.NoError(t, err1) + assert.NoError(t, err2) + assert.Equal(t, []byte(prefix), reader.Prefix()) + assert.Equal(t, []byte(message), rest) + assert.Equal(t, time.Date(2024, 6, 7, 12, 41, 49, 37275300, time.UTC), reader.ts) +} + +func Test_ReadTimestamp_NonUTC_Recurring(t *testing.T) { + reader := newTimestampReader() + prefix := "2024-06-07T15:41:49.037275300+03:00 " + message := "some-message" + buf := bufio.NewReader(bytes.NewBufferString(prefix + prefix + message)) + err1 := reader.Read(buf) + err2 := reader.Read(buf) + rest, _ := io.ReadAll(buf) + assert.NoError(t, err1) + assert.NoError(t, err2) + assert.Equal(t, []byte(prefix), reader.Prefix()) + assert.Equal(t, []byte(message), rest) + assert.Equal(t, time.Date(2024, 6, 7, 12, 41, 49, 37275300, time.UTC), reader.ts) } diff --git a/pkg/testworkflows/testworkflowcontroller/notifier.go b/pkg/testworkflows/testworkflowcontroller/notifier.go index f5b6f145669..bb86d9cdb23 100644 --- a/pkg/testworkflows/testworkflowcontroller/notifier.go +++ b/pkg/testworkflows/testworkflowcontroller/notifier.go @@ -3,6 +3,7 @@ package testworkflowcontroller import ( "context" "fmt" + "sync" "time" "github.com/kubeshop/testkube/cmd/testworkflow-init/data" @@ -12,12 +13,38 @@ import ( "github.com/kubeshop/testkube/pkg/ui" ) +const ( + FlushResultTime = 50 * time.Millisecond + FlushResultMaxTime = 100 * time.Millisecond +) + type notifier struct { - watcher *channel[Notification] + ctx context.Context + ch chan ChannelMessage[Notification] result testkube.TestWorkflowResult sig []testkube.TestWorkflowSignature scheduledAt time.Time lastTs map[string]time.Time + + resultMu sync.Mutex + resultCh chan struct{} + resultScheduled bool +} + +func (n *notifier) send(value Notification) { + // Ignore when the channel is already closed + defer func() { + recover() + }() + n.ch <- ChannelMessage[Notification]{Value: value} +} + +func (n *notifier) error(err error) { + // Ignore when the channel is already closed + defer func() { + recover() + }() + n.ch <- ChannelMessage[Notification]{Error: err} } func (n *notifier) GetLastTimestamp(ref string) time.Time { @@ -40,13 +67,69 @@ func (n *notifier) RegisterTimestamp(ref string, t time.Time) { } } +func (n *notifier) Flush() { + n.resultMu.Lock() + defer n.resultMu.Unlock() + if !n.resultScheduled { + return + } + n.send(Notification{Timestamp: n.result.LatestTimestamp(), Result: n.result.Clone()}) + n.resultScheduled = false +} + +func (n *notifier) scheduleFlush() { + n.resultMu.Lock() + defer n.resultMu.Unlock() + + // Inform existing scheduler about the next result + if n.resultScheduled { + select { + case n.resultCh <- struct{}{}: + default: + } + return + } + + // Run the scheduler + n.resultScheduled = true + go func() { + flushTimer := time.NewTimer(FlushResultMaxTime) + flushTimerEnabled := false + + for { + if n.ctx.Err() != nil { + return + } + + select { + case <-n.ctx.Done(): + n.Flush() + return + case <-flushTimer.C: + n.Flush() + flushTimerEnabled = false + case <-time.After(FlushResultTime): + n.Flush() + flushTimerEnabled = false + case <-n.resultCh: + if !flushTimerEnabled { + flushTimerEnabled = true + flushTimer.Reset(FlushResultMaxTime) + } + continue + } + } + }() +} + func (n *notifier) Raw(ref string, ts time.Time, message string, temporary bool) { if message != "" { if ref == InitContainerName { ref = "" } // TODO: use timestamp from the message too for lastTs? - n.watcher.Send(Notification{ + n.Flush() + n.send(Notification{ Timestamp: ts.UTC(), Log: message, Ref: ref, @@ -63,7 +146,7 @@ func (n *notifier) Log(ref string, ts time.Time, message string) { } func (n *notifier) Error(err error) { - n.watcher.Error(err) + n.error(err) } func (n *notifier) Event(ref string, ts time.Time, level, reason, message string) { @@ -92,7 +175,7 @@ func (n *notifier) recompute() { func (n *notifier) emit() { n.recompute() - n.watcher.Send(Notification{Timestamp: n.result.LatestTimestamp(), Result: n.result.Clone()}) + n.scheduleFlush() } func (n *notifier) queue(ts time.Time) { @@ -184,7 +267,8 @@ func (n *notifier) Output(ref string, ts time.Time, output *data.Instruction) { return } n.RegisterTimestamp(ref, ts) - n.watcher.Send(Notification{Timestamp: ts.UTC(), Ref: ref, Output: output}) + n.Flush() + n.send(Notification{Timestamp: ts.UTC(), Ref: ref, Output: output}) } func (n *notifier) Finish(ts time.Time) { @@ -270,11 +354,21 @@ func newNotifier(ctx context.Context, signature []testworkflowprocessor.Signatur } result.Recompute(sig, scheduledAt) + ch := make(chan ChannelMessage[Notification]) + + go func() { + <-ctx.Done() + close(ch) + }() + return ¬ifier{ - watcher: newChannel[Notification](ctx, 0), + ch: ch, + ctx: ctx, sig: sig, scheduledAt: scheduledAt, result: result, lastTs: make(map[string]time.Time), + + resultCh: make(chan struct{}, 1), } } diff --git a/pkg/testworkflows/testworkflowcontroller/utils.go b/pkg/testworkflows/testworkflowcontroller/utils.go index 77bb22bc4ef..ad54220b0ca 100644 --- a/pkg/testworkflows/testworkflowcontroller/utils.go +++ b/pkg/testworkflows/testworkflowcontroller/utils.go @@ -12,7 +12,7 @@ import ( ) const ( - KubernetesLogTimeFormat = "2006-01-02T15:04:05.000000000Z" + KubernetesLogTimeFormat = "2006-01-02T15:04:05.999999999Z" KubernetesTimezoneLogTimeFormat = KubernetesLogTimeFormat + "07:00" ) diff --git a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go index 7e7feafeda7..c98dd7b0678 100644 --- a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go +++ b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go @@ -27,7 +27,7 @@ type WatchInstrumentedPodOptions struct { Follow *bool } -func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interface, signature []testworkflowprocessor.Signature, scheduledAt time.Time, pod Channel[*corev1.Pod], podEvents Channel[*corev1.Event], opts WatchInstrumentedPodOptions) (Channel[Notification], error) { +func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interface, signature []testworkflowprocessor.Signature, scheduledAt time.Time, pod Channel[*corev1.Pod], podEvents Channel[*corev1.Event], opts WatchInstrumentedPodOptions) (<-chan ChannelMessage[Notification], error) { // Avoid missing data if pod == nil { return nil, errors.New("pod watcher is required") @@ -42,7 +42,10 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf // Start watching go func() { - defer ctxCancel() + defer func() { + s.Flush() + ctxCancel() + }() // Watch for the basic initialization warnings for v := range state.PreStart("") { @@ -99,7 +102,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf // Watch the container logs follow := common.ResolvePtr(opts.Follow, true) && !state.IsFinished(ref) - for v := range WatchContainerLogs(ctx, clientSet, podObj.Namespace, podObj.Name, ref, 10, follow, pod).Channel() { + for v := range WatchContainerLogs(ctx, clientSet, podObj.Namespace, podObj.Name, ref, 10, pod).Channel() { if v.Error != nil { s.Error(v.Error) } else if v.Value.Output != nil { @@ -177,7 +180,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf } }() - return s.watcher, nil + return s.ch, nil } func maxTime(times ...time.Time) time.Time { diff --git a/pkg/testworkflows/testworkflowexecutor/executor.go b/pkg/testworkflows/testworkflowexecutor/executor.go index d3f00502c13..f7c02dff686 100644 --- a/pkg/testworkflows/testworkflowexecutor/executor.go +++ b/pkg/testworkflows/testworkflowexecutor/executor.go @@ -313,7 +313,7 @@ func (e *executor) Control(ctx context.Context, testWorkflow *testworkflowsv1.Te e.metrics.IncAndObserveExecuteTestWorkflow(*execution, e.dashboardURI) - e.updateStatus(testWorkflow, execution, testWorkflowExecution) + e.updateStatus(testWorkflow, execution, testWorkflowExecution) // TODO: Consider if it is needed err = testworkflowcontroller.Cleanup(ctx, e.clientSet, execution.GetNamespace(e.namespace), execution.Id) if err != nil { log.DefaultLogger.Errorw("failed to cleanup TestWorkflow resources", "id", execution.Id, "error", err)