Skip to content

Commit

Permalink
Merge pull request #87 from planetary-social/backpressure-and-readonly
Browse files Browse the repository at this point in the history
Backpressure signal and ReadOnly tx
  • Loading branch information
dcadenas authored May 2, 2024
2 parents 43c09a8 + e996801 commit 27b9057
Show file tree
Hide file tree
Showing 21 changed files with 198 additions and 56 deletions.
2 changes: 1 addition & 1 deletion cmd/send-all-events-to-relay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (u *EventUploader) worker(ctx context.Context) {
func (u *EventUploader) sendEvent(ctx context.Context, event domain.Event) error {
for {
if err := u.eventSender.SendEvent(ctx, u.address, event); err != nil {
if errors.Is(err, relays.ErrEventReplaced) {
if errors.Is(err, relays.BackPressureError) {
u.eventsRelayReplaced.Add(1)
u.allEvents.Add(1)
} else {
Expand Down
4 changes: 4 additions & 0 deletions service/adapters/mocks/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ func NewTransactionProvider(adapters app.Adapters) *TransactionProvider {
func (t TransactionProvider) Transact(ctx context.Context, f func(context.Context, app.Adapters) error) error {
return f(ctx, t.adapters)
}

func (t TransactionProvider) ReadOnly(ctx context.Context, f func(context.Context, app.Adapters) error) error {
return f(ctx, t.adapters)
}
22 changes: 11 additions & 11 deletions service/adapters/sqlite/contact_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestContactRepository_GetCurrentContactsEventReturnsPredefinedError(t *test
ctx := fixtures.TestContext(t)
adapters := NewTestAdapters(ctx, t)

err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
_, err := adapters.ContactRepository.GetCurrentContactsEvent(ctx, fixtures.SomePublicKey())
require.ErrorIs(t, err, app.ErrNoContactsEvent)

Expand All @@ -31,7 +31,7 @@ func TestContactRepository_GetFollowwesReturnsEmptyListWhenThereIsNoData(t *test
ctx := fixtures.TestContext(t)
adapters := NewTestAdapters(ctx, t)

err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
result, err := adapters.ContactRepository.GetFollowees(ctx, fixtures.SomePublicKey())
require.NoError(t, err)
require.Empty(t, result)
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestContactRepository_ContactsAreReplacedForGivenPublicKey(t *testing.T) {
return strings.Compare(a.Hex(), b.Hex())
}

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
current1, err := adapters.ContactRepository.GetCurrentContactsEvent(ctx, pk1)
require.NoError(t, err)
require.Equal(t, event1.Id(), current1.Id())
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestContactRepository_ContactsAreReplacedForGivenPublicKey(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
current1, err := adapters.ContactRepository.GetCurrentContactsEvent(ctx, pk1)
require.NoError(t, err)
require.Equal(t, event1.Id(), current1.Id())
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestContactRepository_GrabbingAnEventForFolloweesMeansTheyAreInPublicKeysBu
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
currentFollowerEvent, err := adapters.ContactRepository.GetCurrentContactsEvent(ctx, pk1)
require.NoError(t, err)
require.Equal(t, event.Id(), currentFollowerEvent.Id())
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestContactRepository_IsFolloweeOfMonitoredPublicKey(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
ok, err := adapters.ContactRepository.IsFolloweeOfMonitoredPublicKey(ctx, followee11)
require.NoError(t, err)
require.True(t, ok)
Expand Down Expand Up @@ -314,7 +314,7 @@ func BenchmarkContactRepository_GetCurrentContactsEvent(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
event, err := adapters.ContactRepository.GetCurrentContactsEvent(ctx, eventToLookUp.PubKey())
require.NoError(b, err)
require.Equal(b, eventToLookUp.Id(), event.Id())
Expand All @@ -337,7 +337,7 @@ func TestContactRepository_CountFolloweesReturnsNumberOfFollowees(t *testing.T)
pk2, sk2 := fixtures.SomeKeyPair()
event2 := fixtures.SomeEventWithAuthor(sk2)

err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.ContactRepository.CountFollowees(ctx, pk1)
require.NoError(t, err)
require.Equal(t, 0, n)
Expand All @@ -364,7 +364,7 @@ func TestContactRepository_CountFolloweesReturnsNumberOfFollowees(t *testing.T)
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.ContactRepository.CountFollowees(ctx, pk1)
require.NoError(t, err)
require.Equal(t, 2, n)
Expand Down Expand Up @@ -392,7 +392,7 @@ func TestContactRepository_CountFollowersReturnsNumberOfFollowers(t *testing.T)
followee2 := fixtures.SomePublicKey()
followee3 := fixtures.SomePublicKey()

err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.ContactRepository.CountFollowers(ctx, followee1)
require.NoError(t, err)
require.Equal(t, 0, n)
Expand Down Expand Up @@ -426,7 +426,7 @@ func TestContactRepository_CountFollowersReturnsNumberOfFollowers(t *testing.T)
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.ContactRepository.CountFollowers(ctx, followee1)
require.NoError(t, err)
require.Equal(t, 1, n)
Expand Down
16 changes: 8 additions & 8 deletions service/adapters/sqlite/event_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestEventRepository_GetReturnsPredefinedError(t *testing.T) {
ctx := fixtures.TestContext(t)
adapters := NewTestAdapters(ctx, t)

err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
_, err := adapters.EventRepository.Get(ctx, fixtures.SomeEventID())
require.ErrorIs(t, err, app.ErrEventNotFound)

Expand Down Expand Up @@ -64,7 +64,7 @@ func TestEventRepository_ItIsPossibleToSaveAndGetEvents(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
readEvent, err := adapters.EventRepository.Get(ctx, event.Id())
require.NoError(t, err)
require.Equal(t, event.Raw(), readEvent.Raw())
Expand All @@ -78,7 +78,7 @@ func TestEventRepository_CountCountsSavedEvents(t *testing.T) {
ctx := fixtures.TestContext(t)
adapters := NewTestAdapters(ctx, t)

err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.EventRepository.Count(ctx)
require.NoError(t, err)
require.Equal(t, 0, n)
Expand All @@ -96,7 +96,7 @@ func TestEventRepository_CountCountsSavedEvents(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.EventRepository.Count(ctx)
require.NoError(t, err)
require.Equal(t, i+1, n)
Expand All @@ -114,7 +114,7 @@ func TestEventRepository_ExistsChecksIfEventsExist(t *testing.T) {
event1 := fixtures.SomeEvent()
event2 := fixtures.SomeEvent()

err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
ok, err := adapters.EventRepository.Exists(ctx, event1.Id())
require.NoError(t, err)
require.False(t, ok)
Expand All @@ -135,7 +135,7 @@ func TestEventRepository_ExistsChecksIfEventsExist(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
ok, err := adapters.EventRepository.Exists(ctx, event1.Id())
require.NoError(t, err)
require.True(t, ok)
Expand All @@ -153,7 +153,7 @@ func TestEventRepository_ListReturnsNoEventsIfRepositoryIsEmpty(t *testing.T) {
ctx := fixtures.TestContext(t)
adapters := NewTestAdapters(ctx, t)

err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
events, err := adapters.EventRepository.List(ctx, nil, 10)
require.NoError(t, err)
require.Empty(t, events)
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestEventRepository_ListReturnsEventsIfRepositoryIsNotEmpty(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
events, err := adapters.EventRepository.List(ctx, nil, 2)
require.NoError(t, err)
fixtures.RequireEqualEventSlices(t,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestPublicKeysToMonitorRepository_GetReturnsPredefinedError(t *testing.T) {
ctx := fixtures.TestContext(t)
adapters := NewTestAdapters(ctx, t)

err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
_, err := adapters.PublicKeysToMonitorRepository.Get(ctx, fixtures.SomePublicKey())
require.ErrorIs(t, err, app.ErrPublicKeyToMonitorNotFound)

Expand All @@ -31,7 +31,7 @@ func TestPublicKeysToMonitorRepository_ListReturnsNoDataWhenRepositoryIsEmpty(t
ctx := fixtures.TestContext(t)
adapters := NewTestAdapters(ctx, t)

err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
result, err := adapters.PublicKeysToMonitorRepository.List(ctx)
require.NoError(t, err)
require.Empty(t, result)
Expand All @@ -57,7 +57,7 @@ func TestPublicKeysToMonitorRepository_GetReturnsData(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
result, err := adapters.PublicKeysToMonitorRepository.Get(ctx, publicKey)
require.NoError(t, err)

Expand Down Expand Up @@ -97,7 +97,7 @@ func TestPublicKeysToMonitorRepository_SaveUpdatesData(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
result, err := adapters.PublicKeysToMonitorRepository.List(ctx)
require.NoError(t, err)

Expand All @@ -123,7 +123,7 @@ func TestPublicKeysToMonitorRepository_SaveUpdatesData(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
result, err := adapters.PublicKeysToMonitorRepository.List(ctx)
require.NoError(t, err)

Expand Down
7 changes: 4 additions & 3 deletions service/adapters/sqlite/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var ErrQueueEmpty = errors.New("queue is empty")

type PubsubTransactionProvider interface {
Transact(context.Context, func(context.Context, *sql.Tx) error) error
ReadOnly(context.Context, func(context.Context, *sql.Tx) error) error
}

type Message struct {
Expand Down Expand Up @@ -152,7 +153,7 @@ func (p *PubSub) Subscribe(ctx context.Context, topic string) <-chan *ReceivedMe

func (p *PubSub) QueueLength(ctx context.Context, topic string) (int, error) {
var count int
if err := p.transactionProvider.Transact(ctx, func(ctx context.Context, tx *sql.Tx) error {
if err := p.transactionProvider.ReadOnly(ctx, func(ctx context.Context, tx *sql.Tx) error {
row := tx.QueryRow(
"SELECT COUNT(*) FROM pubsub WHERE topic = ?",
topic,
Expand All @@ -175,7 +176,7 @@ func (p *PubSub) QueueLength(ctx context.Context, topic string) (int, error) {
// OldestMessageAge returns ErrQueueEmpty if the queue is empty.
func (p *PubSub) OldestMessageAge(ctx context.Context, topic string) (time.Duration, error) {
var age time.Duration
if err := p.transactionProvider.Transact(ctx, func(ctx context.Context, tx *sql.Tx) error {
if err := p.transactionProvider.ReadOnly(ctx, func(ctx context.Context, tx *sql.Tx) error {
row := tx.QueryRow(
"SELECT created_at FROM pubsub WHERE topic = ? ORDER BY created_at ASC LIMIT 1",
topic,
Expand Down Expand Up @@ -256,7 +257,7 @@ func (p *PubSub) subscribe(ctx context.Context, topic string, ch chan *ReceivedM

func (p *PubSub) readMsg(ctx context.Context, topic string) (Message, error) {
var msg Message
if err := p.transactionProvider.Transact(ctx, func(ctx context.Context, tx *sql.Tx) error {
if err := p.transactionProvider.ReadOnly(ctx, func(ctx context.Context, tx *sql.Tx) error {
row := tx.QueryRow(
"SELECT uuid, payload FROM pubsub WHERE topic = ? AND (backoff_until IS NULL OR backoff_until <= ?) ORDER BY RANDOM() LIMIT 1",
topic,
Expand Down
14 changes: 7 additions & 7 deletions service/adapters/sqlite/relay_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestRelayRepository_ItIsPossibleToListSavedData(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
addresses, err := adapters.RelayRepository.List(ctx)
require.NoError(t, err)
require.Len(t, addresses, 1)
Expand All @@ -81,7 +81,7 @@ func TestRelayRepository_ItIsPossibleToListSavedData(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
addresses, err := adapters.RelayRepository.List(ctx)
require.NoError(t, err)
require.Len(t, addresses, 2)
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestRelayRepository_SavingSameDataTwiceDoesNotCreateDuplicates(t *testing.T
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
addresses, err := adapters.RelayRepository.List(ctx)
require.NoError(t, err)
require.Len(t, addresses, 1)
Expand All @@ -134,7 +134,7 @@ func TestRelayRepository_SavingSameDataTwiceDoesNotCreateDuplicates(t *testing.T
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
addresses, err := adapters.RelayRepository.List(ctx)
require.NoError(t, err)
require.Len(t, addresses, 1)
Expand All @@ -161,7 +161,7 @@ func TestRelayRepository_CountCountsSavedAddresses(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.RelayRepository.Count(ctx)
require.NoError(t, err)
require.Equal(t, 0, n)
Expand All @@ -178,7 +178,7 @@ func TestRelayRepository_CountCountsSavedAddresses(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.RelayRepository.Count(ctx)
require.NoError(t, err)
require.Equal(t, 1, n)
Expand All @@ -195,7 +195,7 @@ func TestRelayRepository_CountCountsSavedAddresses(t *testing.T) {
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.TransactionProvider.ReadOnly(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.RelayRepository.Count(ctx)
require.NoError(t, err)
require.Equal(t, 2, n)
Expand Down
Loading

0 comments on commit 27b9057

Please sign in to comment.