From 9c03fe4c1a54ede2d11bb573aa6b49ca567aea64 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Wed, 27 Nov 2024 20:56:29 +0300 Subject: [PATCH] feat: add cloud grpc method for server notifications Signed-off-by: Vladislav Sukhin --- cmd/api-server/commons/commons.go | 25 ++-- cmd/api-server/main.go | 17 +++ internal/config/config.go | 85 +++++++------- internal/config/procontext.go | 25 ++-- pkg/agent/agent.go | 28 ++++- pkg/agent/agent_test.go | 9 +- pkg/agent/events_test.go | 9 +- pkg/agent/logs_test.go | 8 +- pkg/agent/testworkflows.go | 187 ++++++++++++++++++++++++++++++ 9 files changed, 319 insertions(+), 74 deletions(-) diff --git a/cmd/api-server/commons/commons.go b/cmd/api-server/commons/commons.go index 2754634026b..b689d51935a 100644 --- a/cmd/api-server/commons/commons.go +++ b/cmd/api-server/commons/commons.go @@ -282,18 +282,19 @@ func ReadDefaultExecutors(cfg *config.Config) (executors []testkube.ExecutorDeta func ReadProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext { proContext := config.ProContext{ - APIKey: cfg.TestkubeProAPIKey, - URL: cfg.TestkubeProURL, - TLSInsecure: cfg.TestkubeProTLSInsecure, - WorkerCount: cfg.TestkubeProWorkerCount, - LogStreamWorkerCount: cfg.TestkubeProLogStreamWorkerCount, - WorkflowNotificationsWorkerCount: cfg.TestkubeProWorkflowNotificationsWorkerCount, - SkipVerify: cfg.TestkubeProSkipVerify, - EnvID: cfg.TestkubeProEnvID, - OrgID: cfg.TestkubeProOrgID, - Migrate: cfg.TestkubeProMigrate, - ConnectionTimeout: cfg.TestkubeProConnectionTimeout, - DashboardURI: cfg.TestkubeDashboardURI, + APIKey: cfg.TestkubeProAPIKey, + URL: cfg.TestkubeProURL, + TLSInsecure: cfg.TestkubeProTLSInsecure, + WorkerCount: cfg.TestkubeProWorkerCount, + LogStreamWorkerCount: cfg.TestkubeProLogStreamWorkerCount, + WorkflowNotificationsWorkerCount: cfg.TestkubeProWorkflowNotificationsWorkerCount, + WorkflowServiceNotificationsWorkerCount: cfg.TestkubeProWorkflowServiceNotificationsWorkerCount, + SkipVerify: cfg.TestkubeProSkipVerify, + EnvID: cfg.TestkubeProEnvID, + OrgID: cfg.TestkubeProOrgID, + Migrate: cfg.TestkubeProMigrate, + ConnectionTimeout: cfg.TestkubeProConnectionTimeout, + DashboardURI: cfg.TestkubeDashboardURI, } if cfg.TestkubeProAPIKey == "" || grpcClient == nil { diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index f3bcd976f29..8a349b80f52 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -325,6 +325,22 @@ func main() { } return notifications.Channel(), nil } + getTestWorkflowServiceNotificationsStream := func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error) { + execution, err := testWorkflowResultsRepository.Get(ctx, executionID) + if err != nil { + return nil, err + } + notifications := executionWorker.Notifications(ctx, fmt.Sprintf("%s-%s-%d", execution.Id, serviceName, serviceIndex), executionworkertypes.NotificationsOptions{ + Hints: executionworkertypes.Hints{ + Namespace: execution.Namespace, + ScheduledAt: common.Ptr(execution.ScheduledAt), + }, + }) + if notifications.Err() != nil { + return nil, notifications.Err() + } + return notifications.Channel(), nil + } getDeprecatedLogStream := func(ctx context.Context, executionID string) (chan output.Output, error) { return nil, errors.New("deprecated features have been disabled") } @@ -337,6 +353,7 @@ func main() { grpcClient, getDeprecatedLogStream, getTestWorkflowNotificationsStream, + getTestWorkflowServiceNotificationsStream, clusterId, cfg.TestkubeClusterName, features, diff --git a/internal/config/config.go b/internal/config/config.go index 9c2d6fc83f0..0394c8d981d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -40,48 +40,49 @@ type Config struct { LogsStorage string `envconfig:"LOGS_STORAGE" default:""` WorkflowStorage string `envconfig:"WORKFLOW_STORAGE" default:"crd"` // WhitelistedContainers is a list of containers from which logs should be collected. - WhitelistedContainers []string `envconfig:"WHITELISTED_CONTAINERS" default:"init,logs,scraper"` - NatsEmbedded bool `envconfig:"NATS_EMBEDDED" default:"false"` - NatsEmbeddedStoreDir string `envconfig:"NATS_EMBEDDED_STORE_DIR" default:"/app/nats"` - NatsURI string `envconfig:"NATS_URI" default:"nats://localhost:4222"` - NatsSecure bool `envconfig:"NATS_SECURE" default:"false"` - NatsSkipVerify bool `envconfig:"NATS_SKIP_VERIFY" default:"false"` - NatsCertFile string `envconfig:"NATS_CERT_FILE" default:""` - NatsKeyFile string `envconfig:"NATS_KEY_FILE" default:""` - NatsCAFile string `envconfig:"NATS_CA_FILE" default:""` - NatsConnectTimeout time.Duration `envconfig:"NATS_CONNECT_TIMEOUT" default:"5s"` - JobServiceAccountName string `envconfig:"JOB_SERVICE_ACCOUNT_NAME" default:""` - JobTemplateFile string `envconfig:"JOB_TEMPLATE_FILE" default:""` - DisableTestTriggers bool `envconfig:"DISABLE_TEST_TRIGGERS" default:"false"` - TestkubeDefaultExecutors string `envconfig:"TESTKUBE_DEFAULT_EXECUTORS" default:""` - TestkubeEnabledExecutors string `envconfig:"TESTKUBE_ENABLED_EXECUTORS" default:""` - TestkubeTemplateJob string `envconfig:"TESTKUBE_TEMPLATE_JOB" default:""` - TestkubeContainerTemplateJob string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_JOB" default:""` - TestkubeContainerTemplateScraper string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_SCRAPER" default:""` - TestkubeContainerTemplatePVC string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_PVC" default:""` - TestkubeTemplateSlavePod string `envconfig:"TESTKUBE_TEMPLATE_SLAVE_POD" default:""` - TestkubeConfigDir string `envconfig:"TESTKUBE_CONFIG_DIR" default:"config"` - TestkubeAnalyticsEnabled bool `envconfig:"TESTKUBE_ANALYTICS_ENABLED" default:"false"` - TestkubeReadonlyExecutors bool `envconfig:"TESTKUBE_READONLY_EXECUTORS" default:"false"` - TestkubeNamespace string `envconfig:"TESTKUBE_NAMESPACE" default:"testkube"` - TestkubeProAPIKey string `envconfig:"TESTKUBE_PRO_API_KEY" default:""` - TestkubeProURL string `envconfig:"TESTKUBE_PRO_URL" default:""` - TestkubeProTLSInsecure bool `envconfig:"TESTKUBE_PRO_TLS_INSECURE" default:"false"` - TestkubeProWorkerCount int `envconfig:"TESTKUBE_PRO_WORKER_COUNT" default:"50"` - TestkubeProLogStreamWorkerCount int `envconfig:"TESTKUBE_PRO_LOG_STREAM_WORKER_COUNT" default:"25"` - TestkubeProWorkflowNotificationsWorkerCount int `envconfig:"TESTKUBE_PRO_WORKFLOW_NOTIFICATIONS_STREAM_WORKER_COUNT" default:"25"` - TestkubeProSkipVerify bool `envconfig:"TESTKUBE_PRO_SKIP_VERIFY" default:"false"` - TestkubeProEnvID string `envconfig:"TESTKUBE_PRO_ENV_ID" default:""` - TestkubeProOrgID string `envconfig:"TESTKUBE_PRO_ORG_ID" default:""` - TestkubeProMigrate string `envconfig:"TESTKUBE_PRO_MIGRATE" default:"false"` - TestkubeProConnectionTimeout int `envconfig:"TESTKUBE_PRO_CONNECTION_TIMEOUT" default:"10"` - TestkubeProCertFile string `envconfig:"TESTKUBE_PRO_CERT_FILE" default:""` - TestkubeProKeyFile string `envconfig:"TESTKUBE_PRO_KEY_FILE" default:""` - TestkubeProTLSSecret string `envconfig:"TESTKUBE_PRO_TLS_SECRET" default:""` - TestkubeProRunnerCustomCASecret string `envconfig:"TESTKUBE_PRO_RUNNER_CUSTOM_CA_SECRET" default:""` - TestkubeWatcherNamespaces string `envconfig:"TESTKUBE_WATCHER_NAMESPACES" default:""` - TestkubeRegistry string `envconfig:"TESTKUBE_REGISTRY" default:""` - TestkubePodStartTimeout time.Duration `envconfig:"TESTKUBE_POD_START_TIMEOUT" default:"30m"` + WhitelistedContainers []string `envconfig:"WHITELISTED_CONTAINERS" default:"init,logs,scraper"` + NatsEmbedded bool `envconfig:"NATS_EMBEDDED" default:"false"` + NatsEmbeddedStoreDir string `envconfig:"NATS_EMBEDDED_STORE_DIR" default:"/app/nats"` + NatsURI string `envconfig:"NATS_URI" default:"nats://localhost:4222"` + NatsSecure bool `envconfig:"NATS_SECURE" default:"false"` + NatsSkipVerify bool `envconfig:"NATS_SKIP_VERIFY" default:"false"` + NatsCertFile string `envconfig:"NATS_CERT_FILE" default:""` + NatsKeyFile string `envconfig:"NATS_KEY_FILE" default:""` + NatsCAFile string `envconfig:"NATS_CA_FILE" default:""` + NatsConnectTimeout time.Duration `envconfig:"NATS_CONNECT_TIMEOUT" default:"5s"` + JobServiceAccountName string `envconfig:"JOB_SERVICE_ACCOUNT_NAME" default:""` + JobTemplateFile string `envconfig:"JOB_TEMPLATE_FILE" default:""` + DisableTestTriggers bool `envconfig:"DISABLE_TEST_TRIGGERS" default:"false"` + TestkubeDefaultExecutors string `envconfig:"TESTKUBE_DEFAULT_EXECUTORS" default:""` + TestkubeEnabledExecutors string `envconfig:"TESTKUBE_ENABLED_EXECUTORS" default:""` + TestkubeTemplateJob string `envconfig:"TESTKUBE_TEMPLATE_JOB" default:""` + TestkubeContainerTemplateJob string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_JOB" default:""` + TestkubeContainerTemplateScraper string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_SCRAPER" default:""` + TestkubeContainerTemplatePVC string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_PVC" default:""` + TestkubeTemplateSlavePod string `envconfig:"TESTKUBE_TEMPLATE_SLAVE_POD" default:""` + TestkubeConfigDir string `envconfig:"TESTKUBE_CONFIG_DIR" default:"config"` + TestkubeAnalyticsEnabled bool `envconfig:"TESTKUBE_ANALYTICS_ENABLED" default:"false"` + TestkubeReadonlyExecutors bool `envconfig:"TESTKUBE_READONLY_EXECUTORS" default:"false"` + TestkubeNamespace string `envconfig:"TESTKUBE_NAMESPACE" default:"testkube"` + TestkubeProAPIKey string `envconfig:"TESTKUBE_PRO_API_KEY" default:""` + TestkubeProURL string `envconfig:"TESTKUBE_PRO_URL" default:""` + TestkubeProTLSInsecure bool `envconfig:"TESTKUBE_PRO_TLS_INSECURE" default:"false"` + TestkubeProWorkerCount int `envconfig:"TESTKUBE_PRO_WORKER_COUNT" default:"50"` + TestkubeProLogStreamWorkerCount int `envconfig:"TESTKUBE_PRO_LOG_STREAM_WORKER_COUNT" default:"25"` + TestkubeProWorkflowNotificationsWorkerCount int `envconfig:"TESTKUBE_PRO_WORKFLOW_NOTIFICATIONS_STREAM_WORKER_COUNT" default:"25"` + TestkubeProWorkflowServiceNotificationsWorkerCount int `envconfig:"TESTKUBE_PRO_WORKFLOW_SERVICE_NOTIFICATIONS_STREAM_WORKER_COUNT" default:"25"` + TestkubeProSkipVerify bool `envconfig:"TESTKUBE_PRO_SKIP_VERIFY" default:"false"` + TestkubeProEnvID string `envconfig:"TESTKUBE_PRO_ENV_ID" default:""` + TestkubeProOrgID string `envconfig:"TESTKUBE_PRO_ORG_ID" default:""` + TestkubeProMigrate string `envconfig:"TESTKUBE_PRO_MIGRATE" default:"false"` + TestkubeProConnectionTimeout int `envconfig:"TESTKUBE_PRO_CONNECTION_TIMEOUT" default:"10"` + TestkubeProCertFile string `envconfig:"TESTKUBE_PRO_CERT_FILE" default:""` + TestkubeProKeyFile string `envconfig:"TESTKUBE_PRO_KEY_FILE" default:""` + TestkubeProTLSSecret string `envconfig:"TESTKUBE_PRO_TLS_SECRET" default:""` + TestkubeProRunnerCustomCASecret string `envconfig:"TESTKUBE_PRO_RUNNER_CUSTOM_CA_SECRET" default:""` + TestkubeWatcherNamespaces string `envconfig:"TESTKUBE_WATCHER_NAMESPACES" default:""` + TestkubeRegistry string `envconfig:"TESTKUBE_REGISTRY" default:""` + TestkubePodStartTimeout time.Duration `envconfig:"TESTKUBE_POD_START_TIMEOUT" default:"30m"` // TestkubeImageCredentialsCacheTTL is the duration for which the image pull credentials should be cached provided as a Go duration string. // If set to 0, the cache is disabled. TestkubeImageCredentialsCacheTTL time.Duration `envconfig:"TESTKUBE_IMAGE_CREDENTIALS_CACHE_TTL" default:"30m"` diff --git a/internal/config/procontext.go b/internal/config/procontext.go index aa3b090be81..4bfcb41acf5 100644 --- a/internal/config/procontext.go +++ b/internal/config/procontext.go @@ -1,16 +1,17 @@ package config type ProContext struct { - APIKey string - URL string - TLSInsecure bool - WorkerCount int - LogStreamWorkerCount int - WorkflowNotificationsWorkerCount int - SkipVerify bool - EnvID string - OrgID string - Migrate string - ConnectionTimeout int - DashboardURI string + APIKey string + URL string + TLSInsecure bool + WorkerCount int + LogStreamWorkerCount int + WorkflowNotificationsWorkerCount int + WorkflowServiceNotificationsWorkerCount int + SkipVerify bool + EnvID string + OrgID string + Migrate string + ConnectionTimeout int + DashboardURI string } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index b612770d261..e11fb039838 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -55,6 +55,11 @@ type Agent struct { testWorkflowNotificationsResponseBuffer chan *cloud.TestWorkflowNotificationsResponse testWorkflowNotificationsFunc func(ctx context.Context, executionID string) (<-chan testkube.TestWorkflowExecutionNotification, error) + testWorkflowServiceNotificationsWorkerCount int + testWorkflowServiceNotificationsRequestBuffer chan *cloud.TestWorkflowServiceNotificationsRequest + testWorkflowServiceNotificationsResponseBuffer chan *cloud.TestWorkflowServiceNotificationsResponse + testWorkflowServiceNotificationsFunc func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error) + events chan testkube.Event sendTimeout time.Duration receiveTimeout time.Duration @@ -73,6 +78,7 @@ func NewAgent(logger *zap.SugaredLogger, client cloud.TestKubeCloudAPIClient, logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error), workflowNotificationsFunc func(ctx context.Context, executionID string) (<-chan testkube.TestWorkflowExecutionNotification, error), + workflowServiceNotificationsFunc func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error), clusterID string, clusterName string, features featureflags.FeatureFlags, @@ -99,11 +105,16 @@ func NewAgent(logger *zap.SugaredLogger, testWorkflowNotificationsRequestBuffer: make(chan *cloud.TestWorkflowNotificationsRequest, bufferSizePerWorker*proContext.WorkflowNotificationsWorkerCount), testWorkflowNotificationsResponseBuffer: make(chan *cloud.TestWorkflowNotificationsResponse, bufferSizePerWorker*proContext.WorkflowNotificationsWorkerCount), testWorkflowNotificationsFunc: workflowNotificationsFunc, - clusterID: clusterID, - clusterName: clusterName, - features: features, - proContext: proContext, - dockerImageVersion: dockerImageVersion, + testWorkflowServiceNotificationsWorkerCount: proContext.WorkflowServiceNotificationsWorkerCount, + testWorkflowServiceNotificationsRequestBuffer: make(chan *cloud.TestWorkflowServiceNotificationsRequest, bufferSizePerWorker*proContext.WorkflowServiceNotificationsWorkerCount), + testWorkflowServiceNotificationsResponseBuffer: make(chan *cloud.TestWorkflowServiceNotificationsResponse, bufferSizePerWorker*proContext.WorkflowServiceNotificationsWorkerCount), + testWorkflowServiceNotificationsFunc: workflowServiceNotificationsFunc, + + clusterID: clusterID, + clusterName: clusterName, + features: features, + proContext: proContext, + dockerImageVersion: dockerImageVersion, }, nil } @@ -151,6 +162,13 @@ func (ag *Agent) run(ctx context.Context) (err error) { return ag.runTestWorkflowNotificationsWorker(groupCtx, ag.testWorkflowNotificationsWorkerCount) }) + g.Go(func() error { + return ag.runTestWorkflowServiceNotificationsLoop(groupCtx) + }) + g.Go(func() error { + return ag.runTestWorkflowServiceNotificationsWorker(groupCtx, ag.testWorkflowServiceNotificationsWorkerCount) + }) + err = g.Wait() return err diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index fb6c6d17418..47973e09ac1 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -59,10 +59,11 @@ func TestCommandExecution(t *testing.T) { var logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error) var workflowNotificationsStreamFunc func(ctx context.Context, executionID string) (<-chan testkube.TestWorkflowExecutionNotification, error) + var workflowServiceNotificationsStreamFunc func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error) logger, _ := zap.NewDevelopment() proContext := config.ProContext{APIKey: "api-key", WorkerCount: 5, LogStreamWorkerCount: 5, WorkflowNotificationsWorkerCount: 5} - agent, err := agent.NewAgent(logger.Sugar(), m, grpcClient, logStreamFunc, workflowNotificationsStreamFunc, "", "", featureflags.FeatureFlags{}, &proContext, "") + agent, err := agent.NewAgent(logger.Sugar(), m, grpcClient, logStreamFunc, workflowNotificationsStreamFunc, workflowServiceNotificationsStreamFunc, "", "", featureflags.FeatureFlags{}, &proContext, "") if err != nil { t.Fatal(err) } @@ -97,6 +98,12 @@ func (cs *CloudServer) GetTestWorkflowNotificationsStream(srv cloud.TestKubeClou return nil } +func (cs *CloudServer) GetTestWorkflowServiceNotificationsStream(srv cloud.TestKubeCloudAPI_GetTestWorkflowServiceNotificationsStreamServer) error { + <-cs.ctx.Done() + + return nil +} + func (cs *CloudServer) ExecuteAsync(srv cloud.TestKubeCloudAPI_ExecuteAsyncServer) error { md, ok := metadata.FromIncomingContext(srv.Context()) if !ok { diff --git a/pkg/agent/events_test.go b/pkg/agent/events_test.go index 663e06ac192..1f52ea26f47 100644 --- a/pkg/agent/events_test.go +++ b/pkg/agent/events_test.go @@ -56,9 +56,10 @@ func TestEventLoop(t *testing.T) { var logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error) var workflowNotificationsStreamFunc func(ctx context.Context, executionID string) (<-chan testkube.TestWorkflowExecutionNotification, error) + var workflowServiceNotificationsStreamFunc func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error) proContext := config.ProContext{APIKey: "api-key", WorkerCount: 5, LogStreamWorkerCount: 5, WorkflowNotificationsWorkerCount: 5} - agent, err := agent.NewAgent(logger.Sugar(), nil, grpcClient, logStreamFunc, workflowNotificationsStreamFunc, "", "", featureflags.FeatureFlags{}, &proContext, "") + agent, err := agent.NewAgent(logger.Sugar(), nil, grpcClient, logStreamFunc, workflowNotificationsStreamFunc, workflowServiceNotificationsStreamFunc, "", "", featureflags.FeatureFlags{}, &proContext, "") assert.NoError(t, err) go func() { l, err := agent.Load() @@ -110,6 +111,12 @@ func (cws *CloudEventServer) GetTestWorkflowNotificationsStream(srv cloud.TestKu return nil } +func (cws *CloudEventServer) GetTestWorkflowServiceNotificationsStream(srv cloud.TestKubeCloudAPI_GetTestWorkflowServiceNotificationsStreamServer) error { + <-cws.ctx.Done() + + return nil +} + func (cws *CloudEventServer) Send(srv cloud.TestKubeCloudAPI_SendServer) error { md, ok := metadata.FromIncomingContext(srv.Context()) if !ok { diff --git a/pkg/agent/logs_test.go b/pkg/agent/logs_test.go index 0491467415c..59eaecf6f0a 100644 --- a/pkg/agent/logs_test.go +++ b/pkg/agent/logs_test.go @@ -66,10 +66,11 @@ func TestLogStream(t *testing.T) { return ch, nil } var workflowNotificationsStreamFunc func(ctx context.Context, executionID string) (<-chan testkube.TestWorkflowExecutionNotification, error) + var workflowServiceNotificationsStreamFunc func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error) logger, _ := zap.NewDevelopment() proContext := config.ProContext{APIKey: "api-key", WorkerCount: 5, LogStreamWorkerCount: 5, WorkflowNotificationsWorkerCount: 5} - agent, err := agent.NewAgent(logger.Sugar(), m, grpcClient, logStreamFunc, workflowNotificationsStreamFunc, "", "", featureflags.FeatureFlags{}, &proContext, "") + agent, err := agent.NewAgent(logger.Sugar(), m, grpcClient, logStreamFunc, workflowNotificationsStreamFunc, workflowServiceNotificationsStreamFunc, "", "", featureflags.FeatureFlags{}, &proContext, "") if err != nil { t.Fatal(err) } @@ -102,6 +103,11 @@ func (cs *CloudLogsServer) GetTestWorkflowNotificationsStream(srv cloud.TestKube return nil } +func (cs *CloudLogsServer) GetTestWorkflowServiceNotificationsStream(srv cloud.TestKubeCloudAPI_GetTestWorkflowServiceNotificationsStreamServer) error { + <-cs.ctx.Done() + return nil +} + func (cs *CloudLogsServer) GetLogsStream(srv cloud.TestKubeCloudAPI_GetLogsStreamServer) error { md, ok := metadata.FromIncomingContext(srv.Context()) if !ok { diff --git a/pkg/agent/testworkflows.go b/pkg/agent/testworkflows.go index 2001fa9debd..969f443c6c2 100644 --- a/pkg/agent/testworkflows.go +++ b/pkg/agent/testworkflows.go @@ -74,6 +74,51 @@ func (ag *Agent) runTestWorkflowNotificationsLoop(ctx context.Context) error { return err } +func (ag *Agent) runTestWorkflowServiceNotificationsLoop(ctx context.Context) error { + ctx = agentclient.AddAPIKeyMeta(ctx, ag.apiKey) + + ag.logger.Infow("initiating workflow service notifications streaming connection with Cloud API") + // creates a new Stream from the client side. ctx is used for the lifetime of the stream. + opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(math.MaxInt32)} + stream, err := ag.client.GetTestWorkflowServiceNotificationsStream(ctx, opts...) + if err != nil { + ag.logger.Errorf("failed to execute: %w", err) + return errors.Wrap(err, "failed to setup stream") + } + + // GRPC stream have special requirements for concurrency on SendMsg, and RecvMsg calls. + // Please check https://github.com/grpc/grpc-go/blob/master/Documentation/concurrency.md + g, groupCtx := errgroup.WithContext(ctx) + g.Go(func() error { + for { + cmd, err := ag.receiveTestWorkflowServiceNotificationsRequest(groupCtx, stream) + if err != nil { + return err + } + + ag.testWorkflowServiceNotificationsRequestBuffer <- cmd + } + }) + + g.Go(func() error { + for { + select { + case resp := <-ag.testWorkflowServiceNotificationsResponseBuffer: + err := ag.sendTestWorkflowServiceNotificationsResponse(groupCtx, stream, resp) + if err != nil { + return err + } + case <-groupCtx.Done(): + return groupCtx.Err() + } + } + }) + + err = g.Wait() + + return err +} + func (ag *Agent) runTestWorkflowNotificationsWorker(ctx context.Context, numWorkers int) error { g, groupCtx := errgroup.WithContext(ctx) for i := 0; i < numWorkers; i++ { @@ -102,6 +147,34 @@ func (ag *Agent) runTestWorkflowNotificationsWorker(ctx context.Context, numWork return g.Wait() } +func (ag *Agent) runTestWorkflowServiceNotificationsWorker(ctx context.Context, numWorkers int) error { + g, groupCtx := errgroup.WithContext(ctx) + for i := 0; i < numWorkers; i++ { + g.Go(func() error { + for { + select { + case req := <-ag.testWorkflowServiceNotificationsRequestBuffer: + if req.RequestType == cloud.TestWorkflowNotificationsRequestType_WORKFLOW_STREAM_HEALTH_CHECK { + ag.testWorkflowServiceNotificationsResponseBuffer <- &cloud.TestWorkflowServiceNotificationsResponse{ + StreamId: req.StreamId, + SeqNo: 0, + } + break + } + + err := ag.executeWorkflowServiceNotificationsRequest(groupCtx, req) + if err != nil { + ag.logger.Errorf("error executing workflow service notifications request: %s", err.Error()) + } + case <-groupCtx.Done(): + return groupCtx.Err() + } + } + }) + } + return g.Wait() +} + func (ag *Agent) executeWorkflowNotificationsRequest(ctx context.Context, req *cloud.TestWorkflowNotificationsRequest) error { notificationsCh, err := ag.testWorkflowNotificationsFunc(ctx, req.ExecutionId) for i := 0; i < testWorkflowNotificationsRetryCount; i++ { @@ -162,6 +235,66 @@ func (ag *Agent) executeWorkflowNotificationsRequest(ctx context.Context, req *c } } +func (ag *Agent) executeWorkflowServiceNotificationsRequest(ctx context.Context, req *cloud.TestWorkflowServiceNotificationsRequest) error { + notificationsCh, err := ag.testWorkflowServiceNotificationsFunc(ctx, req.ExecutionId, req.ServiceName, int(req.ServiceIndex)) + for i := 0; i < testWorkflowNotificationsRetryCount; i++ { + if err != nil { + // We have a race condition here + // Cloud sometimes slow to insert execution or test + // while WorkflowNotifications request from websockets comes in faster + // so we retry up to testWorkflowNotificationsRetryCount times. + time.Sleep(100 * time.Millisecond) + notificationsCh, err = ag.testWorkflowServiceNotificationsFunc(ctx, req.ExecutionId, req.ServiceName, int(req.ServiceIndex)) + } + } + if err != nil { + message := fmt.Sprintf("cannot get pod logs: %s", err.Error()) + ag.testWorkflowServiceNotificationsResponseBuffer <- &cloud.TestWorkflowServiceNotificationsResponse{ + StreamId: req.StreamId, + SeqNo: 0, + Type: cloud.TestWorkflowNotificationType_WORKFLOW_STREAM_ERROR, + Message: fmt.Sprintf("%s %s", time.Now().Format(controller.KubernetesLogTimeFormat), message), + } + return nil + } + + for { + var i uint32 + select { + case n, ok := <-notificationsCh: + if !ok { + return nil + } + t := getTestWorkflowNotificationType(n) + msg := &cloud.TestWorkflowServiceNotificationsResponse{ + StreamId: req.StreamId, + SeqNo: i, + Timestamp: n.Ts.Format(time.RFC3339Nano), + Ref: n.Ref, + Type: t, + } + if n.Result != nil { + m, _ := json.Marshal(n.Result) + msg.Message = string(m) + } else if n.Output != nil { + m, _ := json.Marshal(n.Output) + msg.Message = string(m) + } else { + msg.Message = n.Log + } + i++ + + select { + case ag.testWorkflowServiceNotificationsResponseBuffer <- msg: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + func (ag *Agent) receiveTestWorkflowNotificationsRequest(ctx context.Context, stream cloud.TestKubeCloudAPI_GetTestWorkflowNotificationsStreamClient) (*cloud.TestWorkflowNotificationsRequest, error) { respChan := make(chan testWorkflowNotificationsRequest, 1) go func() { @@ -191,6 +324,35 @@ type testWorkflowNotificationsRequest struct { err error } +func (ag *Agent) receiveTestWorkflowServiceNotificationsRequest(ctx context.Context, stream cloud.TestKubeCloudAPI_GetTestWorkflowServiceNotificationsStreamClient) (*cloud.TestWorkflowServiceNotificationsRequest, error) { + respChan := make(chan testWorkflowServiceNotificationsRequest, 1) + go func() { + cmd, err := stream.Recv() + respChan <- testWorkflowServiceNotificationsRequest{resp: cmd, err: err} + }() + + var cmd *cloud.TestWorkflowServiceNotificationsRequest + select { + case resp := <-respChan: + cmd = resp.resp + err := resp.err + + if err != nil { + ag.logger.Errorf("agent stream receive: %v", err) + return nil, err + } + case <-ctx.Done(): + return nil, ctx.Err() + } + + return cmd, nil +} + +type testWorkflowServiceNotificationsRequest struct { + resp *cloud.TestWorkflowServiceNotificationsRequest + err error +} + func (ag *Agent) sendTestWorkflowNotificationsResponse(ctx context.Context, stream cloud.TestKubeCloudAPI_GetTestWorkflowNotificationsStreamClient, resp *cloud.TestWorkflowNotificationsResponse) error { errChan := make(chan error, 1) go func() { @@ -215,3 +377,28 @@ func (ag *Agent) sendTestWorkflowNotificationsResponse(ctx context.Context, stre return errors.New("send response too slow") } } + +func (ag *Agent) sendTestWorkflowServiceNotificationsResponse(ctx context.Context, stream cloud.TestKubeCloudAPI_GetTestWorkflowServiceNotificationsStreamClient, resp *cloud.TestWorkflowServiceNotificationsResponse) error { + errChan := make(chan error, 1) + go func() { + errChan <- stream.Send(resp) + close(errChan) + }() + + t := time.NewTimer(ag.sendTimeout) + select { + case err := <-errChan: + if !t.Stop() { + <-t.C + } + return err + case <-ctx.Done(): + if !t.Stop() { + <-t.C + } + + return ctx.Err() + case <-t.C: + return errors.New("send response too slow") + } +}