Skip to content

Commit

Permalink
Add a query to get a feed message by sequence number (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq authored Oct 15, 2022
1 parent 7c4e32c commit 06b2e00
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 18 deletions.
12 changes: 6 additions & 6 deletions di/inject_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ var commandsSet = wire.NewSet(
var queriesSet = wire.NewSet(
wire.Struct(new(app.Queries), "*"),

queries.NewCreateHistoryStreamHandler,
wire.Bind(new(portsrpc.CreateHistoryStreamQueryHandler), new(*queries.CreateHistoryStreamHandler)),
wire.Bind(new(ebtadapters.CreateHistoryStreamHandler), new(*queries.CreateHistoryStreamHandler)),

queries.NewReceiveLogHandler,
queries.NewPublishedLogHandler,
queries.NewStatusHandler,
queries.NewBlobDownloadedEventsHandler,
queries.NewRoomsListAliasesHandler,
queries.NewGetMessageBySequenceHandler,

queries.NewCreateHistoryStreamHandler,
wire.Bind(new(portsrpc.CreateHistoryStreamQueryHandler), new(*queries.CreateHistoryStreamHandler)),
wire.Bind(new(ebtadapters.CreateHistoryStreamHandler), new(*queries.CreateHistoryStreamHandler)),

queries.NewGetBlobHandler,
wire.Bind(new(portsrpc.GetBlobQueryHandler), new(*queries.GetBlobHandler)),

queries.NewRoomsListAliasesHandler,
)
4 changes: 4 additions & 0 deletions di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions service/adapters/bolt/feed_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,30 @@ func (b FeedRepository) DeleteFeed(ref refs.Feed) error {
return nil
}

func (b FeedRepository) GetMessage(feed refs.Feed, sequence message.Sequence) (message.Message, error) {
bucket, err := b.getFeedBucket(feed)
if err != nil {
return message.Message{}, errors.Wrap(err, "could not get the bucket")
}

if bucket == nil {
return message.Message{}, errors.New("bucket is nil")
}

key := messageKey(sequence)
messageRefAsBytes := bucket.Get(key)
if messageRefAsBytes == nil {
return message.Message{}, errors.New("message ref not found")
}

msgId, err := refs.NewMessage(string(messageRefAsBytes))
if err != nil {
return message.Message{}, errors.Wrap(err, "failed to create a message ref")
}

return b.messageRepository.Get(msgId)
}

func (b FeedRepository) removeMessageData(ref refs.Message) error {
if err := b.messageRepository.Delete(ref); err != nil {
return errors.Wrap(err, "failed to remove from message repository")
Expand Down
39 changes: 39 additions & 0 deletions service/adapters/bolt/feed_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,45 @@ import (
"go.etcd.io/bbolt"
)

func TestFeedRepository_GetMessageReturnsMessageWhichIsStoredInRepo(t *testing.T) {
db := fixtures.Bolt(t)

feedRef := fixtures.SomeRefFeed()
sequence := message.NewFirstSequence()
msg := fixtures.SomeMessage(sequence, feedRef)

err := db.Update(func(tx *bbolt.Tx) error {
adapters, err := di.BuildTxTestAdapters(tx)
require.NoError(t, err)

adapters.BanListHasher.Mock(feedRef, fixtures.SomeBanListHash())

err = adapters.FeedRepository.UpdateFeed(feedRef, func(feed *feeds.Feed) error {
return feed.AppendMessage(msg)
})
require.NoError(t, err)

return nil
})
require.NoError(t, err)

err = db.View(func(tx *bbolt.Tx) error {
adapters, err := di.BuildTxTestAdapters(tx)
require.NoError(t, err)

adapters.BanListHasher.Mock(feedRef, fixtures.SomeBanListHash())

retrievedMsg, err := adapters.FeedRepository.GetMessage(feedRef, sequence)
require.NoError(t, err)

// todo returned message will not match the saved message due to the way fixtures.SomeMessage works, this should be fixed
require.Equal(t, msg.Raw(), retrievedMsg.Raw())

return nil
})
require.NoError(t, err)
}

func TestFeedRepository_GetFeed_ReturnsAppropriateErrorWhenEmpty(t *testing.T) {
db := fixtures.Bolt(t)

Expand Down
24 changes: 24 additions & 0 deletions service/adapters/bolt/read_feed_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,27 @@ func (b ReadFeedRepository) Count() (int, error) {

return result, nil
}

func (b ReadFeedRepository) GetMessage(feed refs.Feed, sequence message.Sequence) (message.Message, error) {
var result message.Message

if err := b.db.View(func(tx *bbolt.Tx) error {
r, err := b.factory(tx)
if err != nil {
return errors.Wrap(err, "factory returned an error")
}

msg, err := r.Feed.GetMessage(feed, sequence)
if err != nil {
return errors.Wrap(err, "failed to call the feed repository")
}

result = msg

return nil
}); err != nil {
return message.Message{}, errors.Wrap(err, "transaction failed")
}

return result, nil
}
29 changes: 29 additions & 0 deletions service/adapters/bolt/read_feed_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,35 @@ import (
"go.etcd.io/bbolt"
)

func TestReadFeedRepository_GetMessage(t *testing.T) {
db := fixtures.Bolt(t)

feedRef := fixtures.SomeRefFeed()
sequence := message.NewFirstSequence()
msg := fixtures.SomeMessage(sequence, feedRef)

adapters, err := di.BuildTestAdapters(db)
require.NoError(t, err)

err = db.Update(func(tx *bbolt.Tx) error {
txadapters, err := di.BuildTxTestAdapters(tx)
require.NoError(t, err)

txadapters.BanListHasher.Mock(feedRef, fixtures.SomeBanListHash())

return txadapters.FeedRepository.UpdateFeed(feedRef, func(feed *feeds.Feed) error {
return feed.AppendMessage(msg)
})
})
require.NoError(t, err)

retrievedMsg, err := adapters.FeedRepository.GetMessage(feedRef, sequence)
require.NoError(t, err)

// todo returned message will not match the saved message due to the way fixtures.SomeMessage works, this should be fixed
require.Equal(t, msg.Raw(), retrievedMsg.Raw())
}

func TestReadFeedRepository_Count(t *testing.T) {
db := fixtures.Bolt(t)

Expand Down
13 changes: 13 additions & 0 deletions service/adapters/mocks/feed_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,19 @@ type FeedRepositoryMockGetMessagesCall struct {
Limit *int
}

type FeedRepositoryMockGetMessageCall struct {
Feed refs.Feed
Seq message.Sequence
}

type FeedRepositoryMock struct {
GetMessagesCalls []FeedRepositoryMockGetMessagesCall
GetMessagesReturnValue []message.Message
GetMessagesReturnErr error

GetMessageCalls []FeedRepositoryMockGetMessageCall
GetMessageReturnValue message.Message

CountReturnValue int
}

Expand All @@ -31,3 +39,8 @@ func (m *FeedRepositoryMock) GetMessages(id refs.Feed, seq *message.Sequence, li
m.GetMessagesCalls = append(m.GetMessagesCalls, FeedRepositoryMockGetMessagesCall{Id: id, Seq: seq, Limit: limit})
return m.GetMessagesReturnValue, m.GetMessagesReturnErr
}

func (m *FeedRepositoryMock) GetMessage(feed refs.Feed, sequence message.Sequence) (message.Message, error) {
m.GetMessageCalls = append(m.GetMessageCalls, FeedRepositoryMockGetMessageCall{Feed: feed, Seq: sequence})
return m.GetMessageReturnValue, nil
}
1 change: 1 addition & 0 deletions service/app/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ type Queries struct {
GetBlob *queries.GetBlobHandler
BlobDownloadedEvents *queries.BlobDownloadedEventsHandler
RoomsListAliases *queries.RoomsListAliasesHandler
GetMessageBySequence *queries.GetMessageBySequenceHandler
}
17 changes: 17 additions & 0 deletions service/app/queries/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/planetary-social/scuttlego/service/domain/feeds/message"
"github.com/planetary-social/scuttlego/service/domain/identity"
"github.com/planetary-social/scuttlego/service/domain/network"
"github.com/planetary-social/scuttlego/service/domain/refs"
"github.com/planetary-social/scuttlego/service/domain/transport"
)

Expand All @@ -15,6 +16,22 @@ type LogMessage struct {
Sequence ReceiveLogSequence
}

type FeedRepository interface {
// GetMessages returns messages with a sequence greater or equal to the
// provided sequence. If sequence is nil then messages starting from the
// beginning of the feed are returned. Limit specifies the max number of
// returned messages. If limit is nil then all messages matching the
// sequence criteria are returned.
GetMessages(id refs.Feed, seq *message.Sequence, limit *int) ([]message.Message, error) // todo iterator instead of returning a huge array

// GetMessage returns a message with a given sequence from the specified
// feed.
GetMessage(feed refs.Feed, sequence message.Sequence) (message.Message, error)

// Count returns the number of stored feeds.
Count() (int, error)
}

// ReceiveLogSequence is zero-indexed. This type has nothing to do with the
// sequence field of Scuttlebutt messages. It is a part of the system which
// simulates the behaviour of go-ssb's receive log.
Expand Down
12 changes: 0 additions & 12 deletions service/app/queries/create_history_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@ const (
cleanupDelay = 500 * time.Millisecond
)

type FeedRepository interface {
// GetMessages returns messages with a sequence greater or equal to the
// provided sequence. If sequence is nil then messages starting from the
// beginning of the feed are returned. Limit specifies the max number of
// returned messages. If limit is nil then all messages matching the
// sequence criteria are returned.
GetMessages(id refs.Feed, seq *message.Sequence, limit *int) ([]message.Message, error) // todo iterator instead of returning a huge array

// Count returns the number of stored feeds.
Count() (int, error)
}

type MessageSubscriber interface {
// SubscribeToNewMessages subscribes to all new messages.
SubscribeToNewMessages(ctx context.Context) <-chan message.Message
Expand Down
50 changes: 50 additions & 0 deletions service/app/queries/handler_get_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package queries

import (
"github.com/boreq/errors"
"github.com/planetary-social/scuttlego/service/domain/feeds/message"
"github.com/planetary-social/scuttlego/service/domain/refs"
)

type GetMessageBySequence struct {
feed refs.Feed
sequence message.Sequence
}

func NewGetMessageBySequence(feed refs.Feed, sequence message.Sequence) (GetMessageBySequence, error) {
if feed.IsZero() {
return GetMessageBySequence{}, errors.New("zero value of feed")
}
if sequence.IsZero() {
return GetMessageBySequence{}, errors.New("zero value of sequence")
}
return GetMessageBySequence{feed: feed, sequence: sequence}, nil
}

func (q *GetMessageBySequence) Feed() refs.Feed {
return q.feed
}

func (q *GetMessageBySequence) Sequence() message.Sequence {
return q.sequence
}

func (q *GetMessageBySequence) IsZero() bool {
return q.feed.IsZero()
}

type GetMessageBySequenceHandler struct {
feedRepository FeedRepository
}

func NewGetMessageBySequenceHandler(feedRepository FeedRepository) *GetMessageBySequenceHandler {
return &GetMessageBySequenceHandler{feedRepository: feedRepository}
}

func (h *GetMessageBySequenceHandler) Handle(query GetMessageBySequence) (message.Message, error) {
if query.IsZero() {
return message.Message{}, errors.New("zero value of query")
}

return h.feedRepository.GetMessage(query.Feed(), query.Sequence())
}
38 changes: 38 additions & 0 deletions service/app/queries/handler_get_message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package queries_test

import (
"testing"

"github.com/planetary-social/scuttlego/di"
"github.com/planetary-social/scuttlego/fixtures"
"github.com/planetary-social/scuttlego/service/adapters/mocks"
"github.com/planetary-social/scuttlego/service/app/queries"
"github.com/stretchr/testify/require"
)

func TestGetMessageBySequenceHandler(t *testing.T) {
tq, err := di.BuildTestQueries(t)
require.NoError(t, err)

feed := fixtures.SomeRefFeed()
sequence := fixtures.SomeSequence()

query, err := queries.NewGetMessageBySequence(feed, sequence)
require.NoError(t, err)

expectedMessage := fixtures.SomeMessage(sequence, feed)
tq.FeedRepository.GetMessageReturnValue = expectedMessage

msg, err := tq.Queries.GetMessageBySequence.Handle(query)
require.NoError(t, err)
require.Equal(t, expectedMessage, msg)
require.Equal(t,
[]mocks.FeedRepositoryMockGetMessageCall{
{
Feed: feed,
Seq: sequence,
},
},
tq.FeedRepository.GetMessageCalls,
)
}

0 comments on commit 06b2e00

Please sign in to comment.