Skip to content

Commit

Permalink
Time window based replication (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq authored Dec 13, 2023
1 parent 0a04e99 commit 6ef1a99
Show file tree
Hide file tree
Showing 17 changed files with 1,124 additions and 105 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ fmt:
test:
go test -race ./...

.PHONY: test-nocache
test-nocache:
go test -race -count=1 ./...

.PHONY: test-bench
test-bench:
go test -v -race -run="^$$" -bench=. -benchtime=1x ./...
Expand Down
4 changes: 4 additions & 0 deletions cmd/event-service/di/inject_adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/boreq/errors"
"github.com/google/wire"
"github.com/planetary-social/nos-event-service/internal/logging"
"github.com/planetary-social/nos-event-service/service/adapters"
"github.com/planetary-social/nos-event-service/service/adapters/prometheus"
"github.com/planetary-social/nos-event-service/service/adapters/sqlite"
"github.com/planetary-social/nos-event-service/service/app"
Expand Down Expand Up @@ -64,6 +65,9 @@ var adaptersSet = wire.NewSet(
wire.Bind(new(app.Metrics), new(*prometheus.Prometheus)),
wire.Bind(new(relays.Metrics), new(*prometheus.Prometheus)),
wire.Bind(new(downloader.Metrics), new(*prometheus.Prometheus)),

adapters.NewCurrentTimeProvider,
wire.Bind(new(downloader.CurrentTimeProvider), new(*adapters.CurrentTimeProvider)),
)

func newAdaptersFactoryFn(deps buildTransactionSqliteAdaptersDependencies) sqlite.AdaptersFactoryFn {
Expand Down
8 changes: 8 additions & 0 deletions cmd/event-service/di/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Service struct {
eventSavedEventSubscriber *sqlitepubsub.EventSavedEventSubscriber
metricsTimer *timer.Metrics
transactionRunner *sqlite.TransactionRunner
taskScheduler *downloader.TaskScheduler
migrationsRunner *migrations.Runner
migrations migrations.Migrations
migrationsProgressCallback migrations.ProgressCallback
Expand All @@ -36,6 +37,7 @@ func NewService(
eventSavedEventSubscriber *sqlitepubsub.EventSavedEventSubscriber,
metricsTimer *timer.Metrics,
transactionRunner *sqlite.TransactionRunner,
taskScheduler *downloader.TaskScheduler,
migrationsRunner *migrations.Runner,
migrations migrations.Migrations,
migrationsProgressCallback migrations.ProgressCallback,
Expand All @@ -49,6 +51,7 @@ func NewService(
metricsTimer: metricsTimer,
transactionRunner: transactionRunner,
migrationsRunner: migrationsRunner,
taskScheduler: taskScheduler,
migrations: migrations,
migrationsProgressCallback: migrationsProgressCallback,
}
Expand Down Expand Up @@ -99,6 +102,11 @@ func (s Service) Run(ctx context.Context) error {
errCh <- errors.Wrap(s.transactionRunner.Run(ctx), "transaction runner error")
}()

runners++
go func() {
errCh <- errors.Wrap(s.taskScheduler.Run(ctx), "task scheduler error")
}()

var err error
for i := 0; i < runners; i++ {
err = multierror.Append(err, errors.Wrap(<-errCh, "error returned by runner"))
Expand Down
10 changes: 9 additions & 1 deletion cmd/event-service/di/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,20 @@ var downloaderSet = wire.NewSet(
wire.Bind(new(downloader.RelayConnections), new(*relays.RelayConnections)),

app.NewDatabasePublicKeySource,
wire.Bind(new(downloader.PublicKeySource), new(*app.DatabasePublicKeySource)),
newCachedPublicKeySource,
wire.Bind(new(downloader.PublicKeySource), new(*app.CachedDatabasePublicKeySource)),

relays.NewEventSender,
wire.Bind(new(app.EventSender), new(*relays.EventSender)),

downloader.NewTaskScheduler,
wire.Bind(new(downloader.Scheduler), new(*downloader.TaskScheduler)),
)

func newCachedPublicKeySource(underlying *app.DatabasePublicKeySource) *app.CachedDatabasePublicKeySource {
return app.NewCachedDatabasePublicKeySource(underlying)
}

var domainSet = wire.NewSet(
domain.NewRelaysExtractor,
wire.Bind(new(app.RelaysExtractor), new(*domain.RelaysExtractor)),
Expand Down
15 changes: 11 additions & 4 deletions cmd/event-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions internal/fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"math/rand"
"os"
"testing"
"time"

"github.com/nbd-wtf/go-nostr"
"github.com/planetary-social/nos-event-service/internal"
"github.com/planetary-social/nos-event-service/internal/logging"
"github.com/planetary-social/nos-event-service/service/domain"
"github.com/planetary-social/nos-event-service/service/domain/downloader"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -186,6 +188,18 @@ func SomeMaybeRelayAddress() domain.MaybeRelayAddress {
return domain.NewMaybeRelayAddress(SomeString())
}

func SomeTimeWindow() downloader.TimeWindow {
return downloader.MustNewTimeWindow(SomeTime(), SomeDuration())
}

func SomeTime() time.Time {
return time.Unix(int64(rand.Intn(10000000)), 0)
}

func SomeDuration() time.Duration {
return time.Duration(1+rand.Intn(100)) * time.Second
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randSeq(n int) string {
Expand Down
16 changes: 16 additions & 0 deletions service/adapters/current_time_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package adapters

import (
"time"
)

type CurrentTimeProvider struct {
}

func NewCurrentTimeProvider() *CurrentTimeProvider {
return &CurrentTimeProvider{}
}

func (c *CurrentTimeProvider) GetCurrentTime() time.Time {
return time.Now()
}
21 changes: 21 additions & 0 deletions service/adapters/mocks/current_time_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package mocks

import (
"time"
)

type CurrentTimeProvider struct {
t time.Time
}

func NewCurrentTimeProvider() *CurrentTimeProvider {
return &CurrentTimeProvider{}
}

func (c *CurrentTimeProvider) SetCurrentTime(t time.Time) {
c.t = t
}

func (c *CurrentTimeProvider) GetCurrentTime() time.Time {
return c.t
}
29 changes: 29 additions & 0 deletions service/app/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/planetary-social/nos-event-service/service/domain/downloader"
)

const cachePublicKeysFor = 1 * time.Minute

type DatabaseRelaySource struct {
transactionProvider TransactionProvider
logger logging.Logger
Expand Down Expand Up @@ -110,3 +112,30 @@ func (d *DatabasePublicKeySource) GetPublicKeys(ctx context.Context) (downloader
publicKeysToMonitorFollowees.List(),
), nil
}

type CachedDatabasePublicKeySource struct {
keys *downloader.PublicKeys
t time.Time
source downloader.PublicKeySource
}

func NewCachedDatabasePublicKeySource(
source downloader.PublicKeySource,
) *CachedDatabasePublicKeySource {
return &CachedDatabasePublicKeySource{
source: source,
}
}

func (d *CachedDatabasePublicKeySource) GetPublicKeys(ctx context.Context) (downloader.PublicKeys, error) {
if d.keys == nil || time.Since(d.t) > cachePublicKeysFor {
newKeys, err := d.source.GetPublicKeys(ctx)
if err != nil {
return downloader.PublicKeys{}, errors.Wrap(err, "error getting new public keys")
}
d.keys = &newKeys
d.t = time.Now()
}

return *d.keys, nil
}
Loading

0 comments on commit 6ef1a99

Please sign in to comment.