diff --git a/pkg/testworkflows/testworkflowcontroller/channel.go b/pkg/testworkflows/testworkflowcontroller/channel.go index e9da67db0f9..00702e80c00 100644 --- a/pkg/testworkflows/testworkflowcontroller/channel.go +++ b/pkg/testworkflows/testworkflowcontroller/channel.go @@ -20,6 +20,7 @@ type Channel[T any] interface { Channel() <-chan ChannelMessage[T] Close() Done() <-chan struct{} + CtxErr() error } type channel[T any] struct { @@ -168,3 +169,7 @@ func (c *channel[T]) Close() { func (c *channel[T]) Done() <-chan struct{} { return c.ctx.Done() } + +func (c *channel[T]) CtxErr() error { + return c.ctx.Err() +} diff --git a/pkg/testworkflows/testworkflowcontroller/notifier.go b/pkg/testworkflows/testworkflowcontroller/notifier.go index f5b6f145669..477a33d39bc 100644 --- a/pkg/testworkflows/testworkflowcontroller/notifier.go +++ b/pkg/testworkflows/testworkflowcontroller/notifier.go @@ -3,6 +3,7 @@ package testworkflowcontroller import ( "context" "fmt" + "sync" "time" "github.com/kubeshop/testkube/cmd/testworkflow-init/data" @@ -12,12 +13,21 @@ import ( "github.com/kubeshop/testkube/pkg/ui" ) +const ( + FlushResultTime = 50 * time.Millisecond + FlushResultMaxTime = 100 * time.Millisecond +) + type notifier struct { watcher *channel[Notification] result testkube.TestWorkflowResult sig []testkube.TestWorkflowSignature scheduledAt time.Time lastTs map[string]time.Time + + resultMu sync.Mutex + resultCh chan struct{} + resultScheduled bool } func (n *notifier) GetLastTimestamp(ref string) time.Time { @@ -40,12 +50,68 @@ func (n *notifier) RegisterTimestamp(ref string, t time.Time) { } } +func (n *notifier) Flush() { + n.resultMu.Lock() + defer n.resultMu.Unlock() + if !n.resultScheduled { + return + } + n.watcher.Send(Notification{Timestamp: n.result.LatestTimestamp(), Result: n.result.Clone()}) + n.resultScheduled = false +} + +func (n *notifier) scheduleFlush() { + n.resultMu.Lock() + defer n.resultMu.Unlock() + + // Inform existing scheduler about the next result + if n.resultScheduled { + select { + case n.resultCh <- struct{}{}: + default: + } + return + } + + // Run the scheduler + n.resultScheduled = true + go func() { + flushTimer := time.NewTimer(FlushResultMaxTime) + flushTimerEnabled := false + + for { + if n.watcher.CtxErr() != nil { + return + } + + select { + case <-n.watcher.Done(): + n.Flush() + return + case <-flushTimer.C: + n.Flush() + flushTimerEnabled = false + case <-time.After(FlushResultTime): + n.Flush() + flushTimerEnabled = false + case <-n.resultCh: + if !flushTimerEnabled { + flushTimerEnabled = true + flushTimer.Reset(FlushResultMaxTime) + } + continue + } + } + }() +} + func (n *notifier) Raw(ref string, ts time.Time, message string, temporary bool) { if message != "" { if ref == InitContainerName { ref = "" } // TODO: use timestamp from the message too for lastTs? + n.Flush() n.watcher.Send(Notification{ Timestamp: ts.UTC(), Log: message, @@ -92,7 +158,7 @@ func (n *notifier) recompute() { func (n *notifier) emit() { n.recompute() - n.watcher.Send(Notification{Timestamp: n.result.LatestTimestamp(), Result: n.result.Clone()}) + n.scheduleFlush() } func (n *notifier) queue(ts time.Time) { @@ -184,6 +250,7 @@ func (n *notifier) Output(ref string, ts time.Time, output *data.Instruction) { return } n.RegisterTimestamp(ref, ts) + n.Flush() n.watcher.Send(Notification{Timestamp: ts.UTC(), Ref: ref, Output: output}) } @@ -276,5 +343,7 @@ func newNotifier(ctx context.Context, signature []testworkflowprocessor.Signatur scheduledAt: scheduledAt, result: result, lastTs: make(map[string]time.Time), + + resultCh: make(chan struct{}, 1), } } diff --git a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go index 7e7feafeda7..521485ef29a 100644 --- a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go +++ b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go @@ -42,7 +42,10 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf // Start watching go func() { - defer ctxCancel() + defer func() { + s.Flush() + ctxCancel() + }() // Watch for the basic initialization warnings for v := range state.PreStart("") {