From 6041d9e5dfc1aca52ace6f0eb3840e02abdf1fd8 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Tue, 3 Oct 2023 18:54:11 -0300 Subject: [PATCH] fix(maestro): fix re-uping errored container without update. --- maestro/service/deploy_service.go | 12 ++++++++++-- sinker/redis/consumer/sink_key_expire.go | 3 +-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/maestro/service/deploy_service.go b/maestro/service/deploy_service.go index 199af6d90..e5e1992b4 100644 --- a/maestro/service/deploy_service.go +++ b/maestro/service/deploy_service.go @@ -106,7 +106,15 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi return errors.New("trying to deploy sink that is not active") } d.logger.Debug("handling sink activity event", zap.String("sink-id", event.SinkID)) - + deploymentEntry, _, err := d.deploymentService.GetDeployment(ctx, event.OwnerID, event.SinkID) + if err != nil { + d.logger.Warn("did not find collector entry for sink", zap.String("sink-id", event.SinkID)) + return err + } + if deploymentEntry.LastStatus == "error" { + d.logger.Warn("collector is in error state, skipping") + return nil + } // async update sink status to provisioning go func() { err := d.deploymentService.UpdateStatus(ctx, event.OwnerID, event.SinkID, "provisioning", "") @@ -114,7 +122,7 @@ func (d *eventService) HandleSinkActivity(ctx context.Context, event maestroredi d.logger.Error("error updating status to provisioning", zap.Error(err)) } }() - _, err := d.deploymentService.NotifyCollector(ctx, event.OwnerID, event.SinkID, "deploy", "", "") + _, err = d.deploymentService.NotifyCollector(ctx, event.OwnerID, event.SinkID, "deploy", "", "") if err != nil { d.logger.Error("error trying to notify collector", zap.Error(err)) err2 := d.deploymentService.UpdateStatus(ctx, event.OwnerID, event.SinkID, "provisioning_error", err.Error()) diff --git a/sinker/redis/consumer/sink_key_expire.go b/sinker/redis/consumer/sink_key_expire.go index b12cfaf08..7fa87bd25 100644 --- a/sinker/redis/consumer/sink_key_expire.go +++ b/sinker/redis/consumer/sink_key_expire.go @@ -2,7 +2,6 @@ package consumer import ( "context" - "fmt" "github.com/go-redis/redis/v8" "github.com/orb-community/orb/sinker/redis/producer" "go.uber.org/zap" @@ -39,7 +38,7 @@ func (s *sinkerKeyExpirationListener) SubscribeToKeyExpiration(ctx context.Conte case <-ctx.Done(): return case msg := <-ch: - s.logger.Info(fmt.Sprintf("key %s expired", msg.Payload)) + s.logger.Info("key expired", zap.String("key", msg.Payload)) subCtx := context.WithValue(ctx, "msg", msg.Payload) err := s.ReceiveMessage(subCtx, msg.Payload) if err != nil {