From 2801584946bc23c0b26d14a76aae1d4779c78ddc Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 7 Mar 2023 16:05:25 -0800 Subject: [PATCH] [Pub/Sub] Using unique subscription names (#64) --- processes/consumer/pubsub.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/processes/consumer/pubsub.go b/processes/consumer/pubsub.go index a29767dff..7d4013317 100644 --- a/processes/consumer/pubsub.go +++ b/processes/consumer/pubsub.go @@ -14,11 +14,10 @@ import ( ) const defaultAckDeadline = 5 * time.Minute -const subscriptionName = "transfer" -func findOrCreateSubscription(ctx context.Context, client *gcp_pubsub.Client, topic string) (*gcp_pubsub.Subscription, error) { +func findOrCreateSubscription(ctx context.Context, client *gcp_pubsub.Client, topic, subName string) (*gcp_pubsub.Subscription, error) { log := logger.FromContext(ctx) - sub := client.Subscription(subscriptionName) + sub := client.Subscription(subName) exists, err := sub.Exists(ctx) if err != nil { return nil, fmt.Errorf("failed to fetch subscription, err: %v", err) @@ -33,7 +32,7 @@ func findOrCreateSubscription(ctx context.Context, client *gcp_pubsub.Client, to return nil, fmt.Errorf("failed to fetch gcp topic, exists: %v, err: %v", exists, err) } - sub, err = client.CreateSubscription(ctx, subscriptionName, gcp_pubsub.SubscriptionConfig{ + sub, err = client.CreateSubscription(ctx, subName, gcp_pubsub.SubscriptionConfig{ Topic: gcpTopic, AckDeadline: defaultAckDeadline, // Enable ordering given the `partition key` which is known as ordering key in Pub/Sub @@ -71,14 +70,15 @@ func StartSubscriber(ctx context.Context, flushChan chan bool) { wg.Add(1) go func(ctx context.Context, client *gcp_pubsub.Client, topic string) { defer wg.Done() - sub, err := findOrCreateSubscription(ctx, client, topic) + subName := fmt.Sprintf("transfer_%s", topic) + sub, err := findOrCreateSubscription(ctx, client, topic, subName) if err != nil { log.Fatalf("failed to find or create subscription, err: %v", err) } err = sub.Receive(ctx, func(_ context.Context, pubsubMsg *gcp_pubsub.Message) { msg := artie.NewMessage(nil, pubsubMsg, topic) - msg.EmitIngestionLag(ctx, subscriptionName) + msg.EmitIngestionLag(ctx, subName) logFields := map[string]interface{}{ "topic": msg.Topic(), "msgID": msg.PubSub.ID, @@ -86,7 +86,7 @@ func StartSubscriber(ctx context.Context, flushChan chan bool) { "value": string(msg.Value()), } - shouldFlush, processErr := processMessage(ctx, msg, topicToConfigFmtMap, subscriptionName) + shouldFlush, processErr := processMessage(ctx, msg, topicToConfigFmtMap, subName) if processErr != nil { log.WithError(processErr).WithFields(logFields).Warn("skipping message...") }