diff --git a/service/adapters/relay_connection.go b/service/adapters/relay_connection.go index 3fced03..4825e06 100644 --- a/service/adapters/relay_connection.go +++ b/service/adapters/relay_connection.go @@ -17,6 +17,10 @@ import ( "github.com/planetary-social/nos-crossposting-service/service/domain" ) +const ( + reconnectAfter = 1 * time.Minute +) + type RelayConnection struct { address domain.RelayAddress logger logging.Logger @@ -24,15 +28,18 @@ type RelayConnection struct { state app.RelayConnectionState stateMutex sync.Mutex - subscriptions map[string]subscription - subscriptionsMutex sync.Mutex + subscriptions map[string]subscription + subscriptionsUpdatedCh chan struct{} + subscriptionsUpdatedChClosed bool + subscriptionsMutex sync.Mutex } func NewRelayConnection(address domain.RelayAddress, logger logging.Logger) *RelayConnection { return &RelayConnection{ - address: address, - logger: logger.New(fmt.Sprintf("relayConnection(%s)", address.String())), - subscriptions: make(map[string]subscription), + address: address, + logger: logger.New(fmt.Sprintf("relayConnection(%s)", address.String())), + subscriptions: make(map[string]subscription), + subscriptionsUpdatedCh: make(chan struct{}), } } @@ -82,6 +89,8 @@ func (r *RelayConnection) GetEvents(ctx context.Context, publicKey domain.Public maxAge: maxAge, } + r.triggerSubscriptionUpdate() + go func() { <-ctx.Done() if err := r.removeChannel(ch); err != nil { @@ -110,6 +119,7 @@ func (r *RelayConnection) removeChannel(chToRemove chan app.EventOrEndOfSavedEve if chToRemove == subscription.ch { close(subscription.ch) delete(r.subscriptions, uuid) + r.triggerSubscriptionUpdate() return nil } } @@ -117,6 +127,18 @@ func (r *RelayConnection) removeChannel(chToRemove chan app.EventOrEndOfSavedEve return errors.New("somehow the channel was already removed") } +func (r *RelayConnection) triggerSubscriptionUpdate() { + if !r.subscriptionsUpdatedChClosed { + r.subscriptionsUpdatedChClosed = true + close(r.subscriptionsUpdatedCh) + } +} + +func (r *RelayConnection) resetSubscriptionUpdateCh() { + r.subscriptionsUpdatedChClosed = false + r.subscriptionsUpdatedCh = make(chan struct{}) +} + func (r *RelayConnection) run(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -227,7 +249,7 @@ func (r *RelayConnection) manageSubs( } select { - case <-time.After(manageSubscriptionsEvery): + case <-r.subscriptionsUpdatedCh: continue case <-ctx.Done(): return ctx.Err() @@ -242,6 +264,8 @@ func (r *RelayConnection) updateSubs( r.subscriptionsMutex.Lock() defer r.subscriptionsMutex.Unlock() + r.resetSubscriptionUpdateCh() + for _, uuid := range activeSubscriptions.List() { if _, ok := r.subscriptions[uuid]; !ok { r.logger.Trace(). diff --git a/service/adapters/relay_event_downloader.go b/service/adapters/relay_event_downloader.go index b830d5f..8736025 100644 --- a/service/adapters/relay_event_downloader.go +++ b/service/adapters/relay_event_downloader.go @@ -12,9 +12,6 @@ import ( const ( storeMetricsEvery = 30 * time.Second - - reconnectAfter = 1 * time.Minute - manageSubscriptionsEvery = 10 * time.Second ) type RelayEventDownloader struct {