diff --git a/cmd/send-all-events-to-relay/main.go b/cmd/send-all-events-to-relay/main.go index 3bf7c69..78366db 100644 --- a/cmd/send-all-events-to-relay/main.go +++ b/cmd/send-all-events-to-relay/main.go @@ -148,7 +148,7 @@ func (u *EventUploader) worker(ctx context.Context) { func (u *EventUploader) sendEvent(ctx context.Context, event domain.Event) error { for { if err := u.eventSender.SendEvent(ctx, u.address, event); err != nil { - if errors.Is(err, relays.ErrEventReplaced) { + if errors.Is(err, relays.BackPressureError) { u.eventsRelayReplaced.Add(1) u.allEvents.Add(1) } else { diff --git a/service/adapters/mocks/transaction.go b/service/adapters/mocks/transaction.go index 122acd1..1a2f7a3 100644 --- a/service/adapters/mocks/transaction.go +++ b/service/adapters/mocks/transaction.go @@ -19,3 +19,7 @@ func NewTransactionProvider(adapters app.Adapters) *TransactionProvider { func (t TransactionProvider) Transact(ctx context.Context, f func(context.Context, app.Adapters) error) error { return f(ctx, t.adapters) } + +func (t TransactionProvider) ReadOnly(ctx context.Context, f func(context.Context, app.Adapters) error) error { + return f(ctx, t.adapters) +} diff --git a/service/adapters/sqlite/contact_repository_test.go b/service/adapters/sqlite/contact_repository_test.go index fc214c7..2ae630d 100644 --- a/service/adapters/sqlite/contact_repository_test.go +++ b/service/adapters/sqlite/contact_repository_test.go @@ -18,7 +18,7 @@ func TestContactRepository_GetCurrentContactsEventReturnsPredefinedError(t *test ctx := fixtures.TestContext(t) adapters := NewTestAdapters(ctx, t) - err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { _, err := adapters.ContactRepository.GetCurrentContactsEvent(ctx, fixtures.SomePublicKey()) require.ErrorIs(t, err, app.ErrNoContactsEvent) @@ -31,7 +31,7 @@ func TestContactRepository_GetFollowwesReturnsEmptyListWhenThereIsNoData(t *test ctx := fixtures.TestContext(t) adapters := NewTestAdapters(ctx, t) - err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { result, err := adapters.ContactRepository.GetFollowees(ctx, fixtures.SomePublicKey()) require.NoError(t, err) require.Empty(t, result) @@ -87,7 +87,7 @@ func TestContactRepository_ContactsAreReplacedForGivenPublicKey(t *testing.T) { return strings.Compare(a.Hex(), b.Hex()) } - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { current1, err := adapters.ContactRepository.GetCurrentContactsEvent(ctx, pk1) require.NoError(t, err) require.Equal(t, event1.Id(), current1.Id()) @@ -132,7 +132,7 @@ func TestContactRepository_ContactsAreReplacedForGivenPublicKey(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { current1, err := adapters.ContactRepository.GetCurrentContactsEvent(ctx, pk1) require.NoError(t, err) require.Equal(t, event1.Id(), current1.Id()) @@ -195,7 +195,7 @@ func TestContactRepository_GrabbingAnEventForFolloweesMeansTheyAreInPublicKeysBu }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { currentFollowerEvent, err := adapters.ContactRepository.GetCurrentContactsEvent(ctx, pk1) require.NoError(t, err) require.Equal(t, event.Id(), currentFollowerEvent.Id()) @@ -251,7 +251,7 @@ func TestContactRepository_IsFolloweeOfMonitoredPublicKey(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { ok, err := adapters.ContactRepository.IsFolloweeOfMonitoredPublicKey(ctx, followee11) require.NoError(t, err) require.True(t, ok) @@ -314,7 +314,7 @@ func BenchmarkContactRepository_GetCurrentContactsEvent(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { event, err := adapters.ContactRepository.GetCurrentContactsEvent(ctx, eventToLookUp.PubKey()) require.NoError(b, err) require.Equal(b, eventToLookUp.Id(), event.Id()) @@ -337,7 +337,7 @@ func TestContactRepository_CountFolloweesReturnsNumberOfFollowees(t *testing.T) pk2, sk2 := fixtures.SomeKeyPair() event2 := fixtures.SomeEventWithAuthor(sk2) - err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { n, err := adapters.ContactRepository.CountFollowees(ctx, pk1) require.NoError(t, err) require.Equal(t, 0, n) @@ -364,7 +364,7 @@ func TestContactRepository_CountFolloweesReturnsNumberOfFollowees(t *testing.T) }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { n, err := adapters.ContactRepository.CountFollowees(ctx, pk1) require.NoError(t, err) require.Equal(t, 2, n) @@ -392,7 +392,7 @@ func TestContactRepository_CountFollowersReturnsNumberOfFollowers(t *testing.T) followee2 := fixtures.SomePublicKey() followee3 := fixtures.SomePublicKey() - err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { n, err := adapters.ContactRepository.CountFollowers(ctx, followee1) require.NoError(t, err) require.Equal(t, 0, n) @@ -426,7 +426,7 @@ func TestContactRepository_CountFollowersReturnsNumberOfFollowers(t *testing.T) }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { n, err := adapters.ContactRepository.CountFollowers(ctx, followee1) require.NoError(t, err) require.Equal(t, 1, n) diff --git a/service/adapters/sqlite/event_repository_test.go b/service/adapters/sqlite/event_repository_test.go index 4368cee..eb6c348 100644 --- a/service/adapters/sqlite/event_repository_test.go +++ b/service/adapters/sqlite/event_repository_test.go @@ -18,7 +18,7 @@ func TestEventRepository_GetReturnsPredefinedError(t *testing.T) { ctx := fixtures.TestContext(t) adapters := NewTestAdapters(ctx, t) - err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { _, err := adapters.EventRepository.Get(ctx, fixtures.SomeEventID()) require.ErrorIs(t, err, app.ErrEventNotFound) @@ -64,7 +64,7 @@ func TestEventRepository_ItIsPossibleToSaveAndGetEvents(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { readEvent, err := adapters.EventRepository.Get(ctx, event.Id()) require.NoError(t, err) require.Equal(t, event.Raw(), readEvent.Raw()) @@ -78,7 +78,7 @@ func TestEventRepository_CountCountsSavedEvents(t *testing.T) { ctx := fixtures.TestContext(t) adapters := NewTestAdapters(ctx, t) - err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { n, err := adapters.EventRepository.Count(ctx) require.NoError(t, err) require.Equal(t, 0, n) @@ -96,7 +96,7 @@ func TestEventRepository_CountCountsSavedEvents(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { n, err := adapters.EventRepository.Count(ctx) require.NoError(t, err) require.Equal(t, i+1, n) @@ -114,7 +114,7 @@ func TestEventRepository_ExistsChecksIfEventsExist(t *testing.T) { event1 := fixtures.SomeEvent() event2 := fixtures.SomeEvent() - err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { ok, err := adapters.EventRepository.Exists(ctx, event1.Id()) require.NoError(t, err) require.False(t, ok) @@ -135,7 +135,7 @@ func TestEventRepository_ExistsChecksIfEventsExist(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { ok, err := adapters.EventRepository.Exists(ctx, event1.Id()) require.NoError(t, err) require.True(t, ok) @@ -153,7 +153,7 @@ func TestEventRepository_ListReturnsNoEventsIfRepositoryIsEmpty(t *testing.T) { ctx := fixtures.TestContext(t) adapters := NewTestAdapters(ctx, t) - err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { events, err := adapters.EventRepository.List(ctx, nil, 10) require.NoError(t, err) require.Empty(t, events) @@ -185,7 +185,7 @@ func TestEventRepository_ListReturnsEventsIfRepositoryIsNotEmpty(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { events, err := adapters.EventRepository.List(ctx, nil, 2) require.NoError(t, err) fixtures.RequireEqualEventSlices(t, diff --git a/service/adapters/sqlite/public_keys_to_monitor_repository_test.go b/service/adapters/sqlite/public_keys_to_monitor_repository_test.go index 871488b..f62c6e0 100644 --- a/service/adapters/sqlite/public_keys_to_monitor_repository_test.go +++ b/service/adapters/sqlite/public_keys_to_monitor_repository_test.go @@ -18,7 +18,7 @@ func TestPublicKeysToMonitorRepository_GetReturnsPredefinedError(t *testing.T) { ctx := fixtures.TestContext(t) adapters := NewTestAdapters(ctx, t) - err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { _, err := adapters.PublicKeysToMonitorRepository.Get(ctx, fixtures.SomePublicKey()) require.ErrorIs(t, err, app.ErrPublicKeyToMonitorNotFound) @@ -31,7 +31,7 @@ func TestPublicKeysToMonitorRepository_ListReturnsNoDataWhenRepositoryIsEmpty(t ctx := fixtures.TestContext(t) adapters := NewTestAdapters(ctx, t) - err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { result, err := adapters.PublicKeysToMonitorRepository.List(ctx) require.NoError(t, err) require.Empty(t, result) @@ -57,7 +57,7 @@ func TestPublicKeysToMonitorRepository_GetReturnsData(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { result, err := adapters.PublicKeysToMonitorRepository.Get(ctx, publicKey) require.NoError(t, err) @@ -97,7 +97,7 @@ func TestPublicKeysToMonitorRepository_SaveUpdatesData(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { result, err := adapters.PublicKeysToMonitorRepository.List(ctx) require.NoError(t, err) @@ -123,7 +123,7 @@ func TestPublicKeysToMonitorRepository_SaveUpdatesData(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { result, err := adapters.PublicKeysToMonitorRepository.List(ctx) require.NoError(t, err) diff --git a/service/adapters/sqlite/pubsub.go b/service/adapters/sqlite/pubsub.go index 9a79d4d..779cf5f 100644 --- a/service/adapters/sqlite/pubsub.go +++ b/service/adapters/sqlite/pubsub.go @@ -28,6 +28,7 @@ var ErrQueueEmpty = errors.New("queue is empty") type PubsubTransactionProvider interface { Transact(context.Context, func(context.Context, *sql.Tx) error) error + ReadOnly(context.Context, func(context.Context, *sql.Tx) error) error } type Message struct { @@ -152,7 +153,7 @@ func (p *PubSub) Subscribe(ctx context.Context, topic string) <-chan *ReceivedMe func (p *PubSub) QueueLength(ctx context.Context, topic string) (int, error) { var count int - if err := p.transactionProvider.Transact(ctx, func(ctx context.Context, tx *sql.Tx) error { + if err := p.transactionProvider.ReadOnly(ctx, func(ctx context.Context, tx *sql.Tx) error { row := tx.QueryRow( "SELECT COUNT(*) FROM pubsub WHERE topic = ?", topic, @@ -175,7 +176,7 @@ func (p *PubSub) QueueLength(ctx context.Context, topic string) (int, error) { // OldestMessageAge returns ErrQueueEmpty if the queue is empty. func (p *PubSub) OldestMessageAge(ctx context.Context, topic string) (time.Duration, error) { var age time.Duration - if err := p.transactionProvider.Transact(ctx, func(ctx context.Context, tx *sql.Tx) error { + if err := p.transactionProvider.ReadOnly(ctx, func(ctx context.Context, tx *sql.Tx) error { row := tx.QueryRow( "SELECT created_at FROM pubsub WHERE topic = ? ORDER BY created_at ASC LIMIT 1", topic, @@ -256,7 +257,7 @@ func (p *PubSub) subscribe(ctx context.Context, topic string, ch chan *ReceivedM func (p *PubSub) readMsg(ctx context.Context, topic string) (Message, error) { var msg Message - if err := p.transactionProvider.Transact(ctx, func(ctx context.Context, tx *sql.Tx) error { + if err := p.transactionProvider.ReadOnly(ctx, func(ctx context.Context, tx *sql.Tx) error { row := tx.QueryRow( "SELECT uuid, payload FROM pubsub WHERE topic = ? AND (backoff_until IS NULL OR backoff_until <= ?) ORDER BY RANDOM() LIMIT 1", topic, diff --git a/service/adapters/sqlite/relay_repository_test.go b/service/adapters/sqlite/relay_repository_test.go index 8467dc1..af3b6ee 100644 --- a/service/adapters/sqlite/relay_repository_test.go +++ b/service/adapters/sqlite/relay_repository_test.go @@ -63,7 +63,7 @@ func TestRelayRepository_ItIsPossibleToListSavedData(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { addresses, err := adapters.RelayRepository.List(ctx) require.NoError(t, err) require.Len(t, addresses, 1) @@ -81,7 +81,7 @@ func TestRelayRepository_ItIsPossibleToListSavedData(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { addresses, err := adapters.RelayRepository.List(ctx) require.NoError(t, err) require.Len(t, addresses, 2) @@ -116,7 +116,7 @@ func TestRelayRepository_SavingSameDataTwiceDoesNotCreateDuplicates(t *testing.T }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { addresses, err := adapters.RelayRepository.List(ctx) require.NoError(t, err) require.Len(t, addresses, 1) @@ -134,7 +134,7 @@ func TestRelayRepository_SavingSameDataTwiceDoesNotCreateDuplicates(t *testing.T }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { addresses, err := adapters.RelayRepository.List(ctx) require.NoError(t, err) require.Len(t, addresses, 1) @@ -161,7 +161,7 @@ func TestRelayRepository_CountCountsSavedAddresses(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { n, err := adapters.RelayRepository.Count(ctx) require.NoError(t, err) require.Equal(t, 0, n) @@ -178,7 +178,7 @@ func TestRelayRepository_CountCountsSavedAddresses(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { n, err := adapters.RelayRepository.Count(ctx) require.NoError(t, err) require.Equal(t, 1, n) @@ -195,7 +195,7 @@ func TestRelayRepository_CountCountsSavedAddresses(t *testing.T) { }) require.NoError(t, err) - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { n, err := adapters.RelayRepository.Count(ctx) require.NoError(t, err) require.Equal(t, 2, n) diff --git a/service/adapters/sqlite/sqlite.go b/service/adapters/sqlite/sqlite.go index 0d4d5e1..7f863d3 100644 --- a/service/adapters/sqlite/sqlite.go +++ b/service/adapters/sqlite/sqlite.go @@ -96,7 +96,12 @@ type GenericTransactionProvider[T any] struct { func (t *GenericTransactionProvider[T]) Transact(ctx context.Context, fn func(context.Context, T) error) error { transactionFunc := t.makeTransactionFunc(fn) - return t.runner.TryRun(ctx, transactionFunc) + return t.runner.TryRun(ctx, transactionFunc, false) +} + +func (t *GenericTransactionProvider[T]) ReadOnly(ctx context.Context, fn func(context.Context, T) error) error { + transactionFunc := t.makeTransactionFunc(fn) + return t.runner.TryRun(ctx, transactionFunc, true) } func (t *GenericTransactionProvider[T]) makeTransactionFunc(fn func(context.Context, T) error) TransactionFunc { @@ -128,11 +133,11 @@ func NewTransactionRunner(db *sql.DB) *TransactionRunner { } } -func (t *TransactionRunner) TryRun(ctx context.Context, fn TransactionFunc) error { +func (t *TransactionRunner) TryRun(ctx context.Context, fn TransactionFunc, readOnly bool) error { resultCh := make(chan error) select { - case t.chIn <- newTransactionTask(ctx, fn, resultCh): + case t.chIn <- newTransactionTask(ctx, fn, resultCh, readOnly): case <-ctx.Done(): return ctx.Err() } @@ -150,7 +155,7 @@ func (t *TransactionRunner) Run(ctx context.Context) error { select { case task := <-t.chIn: select { - case task.ResultCh <- t.run(task.Ctx, task.Fn): + case task.ResultCh <- t.run(task.Ctx, task.Fn, task.readOnly): continue case <-task.Ctx.Done(): continue @@ -163,8 +168,15 @@ func (t *TransactionRunner) Run(ctx context.Context) error { } } -func (t *TransactionRunner) run(ctx context.Context, fn TransactionFunc) error { - tx, err := t.db.BeginTx(ctx, nil) +func (t *TransactionRunner) run(ctx context.Context, fn TransactionFunc, readOnly bool) error { + var opts sql.TxOptions + if readOnly { + opts = sql.TxOptions{ReadOnly: true} + } else { + opts = sql.TxOptions{Isolation: sql.LevelSerializable} + } + + tx, err := t.db.BeginTx(ctx, &opts) if err != nil { return errors.Wrap(err, "error starting the transaction") } @@ -187,8 +199,9 @@ type transactionTask struct { Ctx context.Context Fn TransactionFunc ResultCh chan<- error + readOnly bool } -func newTransactionTask(ctx context.Context, fn TransactionFunc, resultCh chan<- error) transactionTask { - return transactionTask{Ctx: ctx, Fn: fn, ResultCh: resultCh} +func newTransactionTask(ctx context.Context, fn TransactionFunc, resultCh chan<- error, readOnly bool) transactionTask { + return transactionTask{Ctx: ctx, Fn: fn, ResultCh: resultCh, readOnly: readOnly} } diff --git a/service/app/app.go b/service/app/app.go index 8287e80..e43c2bc 100644 --- a/service/app/app.go +++ b/service/app/app.go @@ -20,6 +20,7 @@ const ( type TransactionProvider interface { Transact(context.Context, func(context.Context, Adapters) error) error + ReadOnly(context.Context, func(context.Context, Adapters) error) error } type Adapters struct { @@ -143,6 +144,8 @@ type Subscriber interface { type EventSender interface { // SendEvent returns relays.ErrEventReplaced. SendEvent(ctx context.Context, relayAddress domain.RelayAddress, event domain.Event) error + NotifyBackPressure() + ResolveBackPressure() } var ( diff --git a/service/app/handler_get_event.go b/service/app/handler_get_event.go index bdbcbe3..1065711 100644 --- a/service/app/handler_get_event.go +++ b/service/app/handler_get_event.go @@ -40,7 +40,7 @@ func (h *GetEventHandler) Handle(ctx context.Context, cmd GetEvent) (event domai ctx, cancel := context.WithTimeout(ctx, applicationHandlerTimeout) defer cancel() - if err := h.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error { + if err := h.transactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters Adapters) error { tmp, err := adapters.Events.Get(ctx, cmd.id) if err != nil { return errors.Wrap(err, "error getting the event") diff --git a/service/app/handler_get_events.go b/service/app/handler_get_events.go index 0201f9c..e429ed3 100644 --- a/service/app/handler_get_events.go +++ b/service/app/handler_get_events.go @@ -63,7 +63,7 @@ func (h *GetEventsHandler) Handle(ctx context.Context, cmd GetEvents) (result Ge defer cancel() var events []domain.Event - if err := h.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error { + if err := h.transactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters Adapters) error { tmp, err := adapters.Events.List(ctx, cmd.after, getEventsLimit+1) if err != nil { return errors.Wrap(err, "error getting the event") diff --git a/service/app/handler_get_public_key_info.go b/service/app/handler_get_public_key_info.go index 9bb19be..20031a4 100644 --- a/service/app/handler_get_public_key_info.go +++ b/service/app/handler_get_public_key_info.go @@ -59,7 +59,7 @@ func (h *GetPublicKeyInfoHandler) Handle(ctx context.Context, cmd GetPublicKeyIn var followeesCount, followersCount int - if err := h.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error { + if err := h.transactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters Adapters) error { tmp, err := adapters.Contacts.CountFollowees(ctx, cmd.publicKey) if err != nil { return errors.Wrap(err, "error counting followees") diff --git a/service/app/handler_process_saved_event.go b/service/app/handler_process_saved_event.go index d0b915f..f751bc2 100644 --- a/service/app/handler_process_saved_event.go +++ b/service/app/handler_process_saved_event.go @@ -94,9 +94,17 @@ func (h *ProcessSavedEventHandler) Handle(ctx context.Context, cmd ProcessSavedE return nil } +func (h *ProcessSavedEventHandler) NotifyBackPressure() { + h.eventSender.NotifyBackPressure() +} + +func (h *ProcessSavedEventHandler) ResolveBackPressure() { + h.eventSender.ResolveBackPressure() +} + func (h *ProcessSavedEventHandler) loadEvent(ctx context.Context, eventId domain.EventId) (domain.Event, error) { var event domain.Event - if err := h.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error { + if err := h.transactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters Adapters) error { tmp, err := adapters.Events.Get(ctx, eventId) if err != nil { return errors.Wrap(err, "error loading the event") diff --git a/service/app/handler_update_metrics.go b/service/app/handler_update_metrics.go index 5540b76..7838e07 100644 --- a/service/app/handler_update_metrics.go +++ b/service/app/handler_update_metrics.go @@ -51,7 +51,7 @@ func (h *UpdateMetricsHandler) Handle(ctx context.Context) (err error) { h.metrics.ReportQueueOldestMessageAge("eventSaved", age) } - if err := h.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error { + if err := h.transactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters Adapters) error { n, err := adapters.Relays.Count(ctx) if err != nil { return errors.Wrap(err, "error counting relay addresses") diff --git a/service/app/sources.go b/service/app/sources.go index 61393ca..b7db3bc 100644 --- a/service/app/sources.go +++ b/service/app/sources.go @@ -32,7 +32,7 @@ func (m *DatabaseRelaySource) GetRelays(ctx context.Context) ([]domain.RelayAddr }() var maybeResult []domain.MaybeRelayAddress - if err := m.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error { + if err := m.transactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters Adapters) error { tmp, err := adapters.Relays.List(ctx) if err != nil { return errors.Wrap(err, "error listing relays") @@ -86,7 +86,7 @@ func (d *DatabasePublicKeySource) GetPublicKeys(ctx context.Context) (downloader publicKeysToMonitor := *internal.NewEmptySet[domain.PublicKey]() publicKeysToMonitorFollowees := *internal.NewEmptySet[domain.PublicKey]() - if err := d.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error { + if err := d.transactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters Adapters) error { values, err := adapters.PublicKeysToMonitor.List(ctx) if err != nil { return errors.Wrap(err, "error getting public keys to monitor") diff --git a/service/domain/downloader/downloader.go b/service/domain/downloader/downloader.go index b6126f0..c4c9bab 100644 --- a/service/domain/downloader/downloader.go +++ b/service/domain/downloader/downloader.go @@ -22,7 +22,11 @@ const ( var relaySuffixesToSkip = []string{ "relay.nos.social", + "127.0.0.1", + "localhost", "nostr.band", + "nostrja-kari-nip50.heguro.com", + "nostr.sebastix.social", } var ( diff --git a/service/domain/relays/event_sender.go b/service/domain/relays/event_sender.go index a89650d..6eeea46 100644 --- a/service/domain/relays/event_sender.go +++ b/service/domain/relays/event_sender.go @@ -27,6 +27,14 @@ func (s *EventSender) SendEvent(ctx context.Context, address domain.RelayAddress return nil } +func (s *EventSender) NotifyBackPressure() { + s.connections.NotifyBackPressure() +} + +func (s *EventSender) ResolveBackPressure() { + s.connections.ResolveBackPressure() +} + func (s *EventSender) maybeConvertError(err error) error { var okResponseErr OKResponseError if !errors.As(err, &okResponseErr) { diff --git a/service/domain/relays/relay_connection.go b/service/domain/relays/relay_connection.go index 24c308a..fc053eb 100644 --- a/service/domain/relays/relay_connection.go +++ b/service/domain/relays/relay_connection.go @@ -70,6 +70,8 @@ type RelayConnection struct { eventsToSendMutex sync.Mutex newEventsCh chan domain.Event rateLimitNoticeBackoffManager *RateLimitNoticeBackoffManager + cancelRun context.CancelFunc + cancelBackPressure context.CancelFunc } func NewRelayConnection( @@ -89,6 +91,8 @@ func NewRelayConnection( eventsToSend: make(map[domain.EventId]*eventToSend), newEventsCh: make(chan domain.Event), rateLimitNoticeBackoffManager: rateLimitNoticeBackoffManager, + cancelRun: nil, + cancelBackPressure: nil, } } @@ -103,9 +107,13 @@ func (r *RelayConnection) Run(ctx context.Context) { backoff := r.backoffManager.GetReconnectionBackoff(err) - // We control relay.nos.social, so we can be more aggressive with the backoff - if r.Address().String() == "wss://relay.nos.social" { - backoff = 1 * time.Minute + if r.Address().HostWithoutPort() == "relay.nos.social" { + // We control relay.nos.social, so we don't backoff here + backoff = 0 + } else if errors.Is(err, BackPressureError) { + // Only calling r.cancelBackPressure() can resolve the backpressure + r.WaitUntilNoBackPressure(ctx) + backoff = 0 } select { @@ -117,6 +125,20 @@ func (r *RelayConnection) Run(ctx context.Context) { } } +func (r *RelayConnection) WaitUntilNoBackPressure(ctx context.Context) { + var resolvedBackPressureCtx context.Context + var cancelBackPressure context.CancelFunc + + r.setStateWithFn(RelayConnectionStateBackPressured, func() { + resolvedBackPressureCtx, cancelBackPressure = context.WithCancel(ctx) + r.cancelBackPressure = cancelBackPressure + }) + + <-resolvedBackPressureCtx.Done() + + r.setState(RelayConnectionStateDisconnected) +} + func (r *RelayConnection) State() RelayConnectionState { r.stateMutex.Lock() defer r.stateMutex.Unlock() @@ -289,6 +311,8 @@ func (r *RelayConnection) triggerSubscriptionUpdate() { func (r *RelayConnection) run(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) + backPressureCtx, cancelFromBackPressure := context.WithCancel(ctx) + r.cancelRun = cancelFromBackPressure defer cancel() defer r.setState(RelayConnectionStateDisconnected) @@ -335,6 +359,19 @@ func (r *RelayConnection) run(ctx context.Context) error { return NewReadMessageError(err) } + select { + case <-backPressureCtx.Done(): + // The backpressure handling code is to avoid overwhelming the queue + // that writes to relay.nos.social so we skip that signal for it or + // the queue will never shrink + if r.Address().HostWithoutPort() == "relay.nos.social" { + continue + } + + return BackPressureError + default: + } + if err := r.handleMessage(messageBytes); err != nil { r.logger. Error(). @@ -585,9 +622,18 @@ func (r *RelayConnection) passSendEventResponseToChannel(eventID domain.EventId, func (r *RelayConnection) setState(state RelayConnectionState) { r.stateMutex.Lock() defer r.stateMutex.Unlock() + r.state = state } +func (r *RelayConnection) setStateWithFn(state RelayConnectionState, fn func()) { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + + r.state = state + fn() +} + func (r *RelayConnection) manageSubs(ctx context.Context, conn Connection) error { defer conn.Close() @@ -731,6 +777,8 @@ func (t DialError) Is(target error) bool { return ok1 || ok2 } +var BackPressureError = errors.New("backpressure error") + type ReadMessageError struct { underlying error } diff --git a/service/domain/relays/relay_connection_state.go b/service/domain/relays/relay_connection_state.go index e2de85f..662daa9 100644 --- a/service/domain/relays/relay_connection_state.go +++ b/service/domain/relays/relay_connection_state.go @@ -1,9 +1,10 @@ package relays var ( - RelayConnectionStateInitializing = RelayConnectionState{"initializing"} - RelayConnectionStateConnected = RelayConnectionState{"connected"} - RelayConnectionStateDisconnected = RelayConnectionState{"disconnected"} + RelayConnectionStateInitializing = RelayConnectionState{"initializing"} + RelayConnectionStateConnected = RelayConnectionState{"connected"} + RelayConnectionStateDisconnected = RelayConnectionState{"disconnected"} + RelayConnectionStateBackPressured = RelayConnectionState{"backpressured"} ) type RelayConnectionState struct { diff --git a/service/domain/relays/relay_connections.go b/service/domain/relays/relay_connections.go index aec3b7e..13dea88 100644 --- a/service/domain/relays/relay_connections.go +++ b/service/domain/relays/relay_connections.go @@ -74,6 +74,24 @@ func (d *RelayConnections) SendEvent(ctx context.Context, relayAddress domain.Re return connection.SendEvent(ctx, event) } +func (d *RelayConnections) NotifyBackPressure() { + for _, connection := range d.connections { + if connection.cancelRun != nil && connection.Address().HostWithoutPort() != "relay.nos.social" { + connection.cancelRun() + connection.cancelRun = nil + } + } +} + +func (d *RelayConnections) ResolveBackPressure() { + for _, connection := range d.connections { + if connection.cancelBackPressure != nil { + connection.cancelBackPressure() + connection.cancelBackPressure = nil + } + } +} + func (d *RelayConnections) storeMetricsLoop(ctx context.Context) { for { d.storeMetrics() diff --git a/service/ports/sqlitepubsub/event_saved.go b/service/ports/sqlitepubsub/event_saved.go index c8afb49..faa9600 100644 --- a/service/ports/sqlitepubsub/event_saved.go +++ b/service/ports/sqlitepubsub/event_saved.go @@ -3,6 +3,8 @@ package sqlitepubsub import ( "context" "encoding/json" + "fmt" + "time" "github.com/boreq/errors" "github.com/planetary-social/nos-event-service/internal/logging" @@ -11,8 +13,12 @@ import ( "github.com/planetary-social/nos-event-service/service/domain" ) +const backPressureThreshold = 20000 + type ProcessSavedEventHandler interface { Handle(ctx context.Context, cmd app.ProcessSavedEvent) (err error) + NotifyBackPressure() + ResolveBackPressure() } type EventSavedEventSubscriber struct { @@ -36,6 +42,34 @@ func NewEventSavedEventSubscriber( } } func (s *EventSavedEventSubscriber) Run(ctx context.Context) error { + + //Periodically check for backpressure, if detected, send signal to sql queue + //publisher to disconnect from relays for a while + go func() { + for { + queueSize, err := s.subscriber.EventSavedQueueLength(ctx) + if err != nil { + s.logger.Error().WithError(err).Message("error getting queue length") + } + + if queueSize > backPressureThreshold { + s.logger.Debug().Message( + fmt.Sprintf("Queue size %d > %d. Sending backpressure signal to slow down", queueSize, backPressureThreshold), + ) + s.handler.NotifyBackPressure() + } else if queueSize < backPressureThreshold/2 { + s.handler.ResolveBackPressure() + } + + select { + case <-ctx.Done(): + return + case <-time.After(30 * time.Second): + continue + } + } + }() + for msg := range s.subscriber.SubscribeToEventSaved(ctx) { if err := s.handleMessage(ctx, msg); err != nil { s.logger.Error().WithError(err).Message("error handling a message")