Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add import bulk tuples #217

Merged
merged 6 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/kessel-relations/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module github.com/project-kessel/relations-api

go 1.22.7

require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.34.2-20240920164238-5a7b106cbb87.2
github.com/MicahParks/keyfunc/v3 v3.3.5
Expand Down Expand Up @@ -75,6 +76,7 @@ require (
github.com/samber/lo v1.47.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down
2 changes: 1 addition & 1 deletion internal/biz/biz.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ import (
)

// ProviderSet is biz providers.
var ProviderSet = wire.NewSet(NewCreateRelationshipsUsecase, NewReadRelationshipsUsecase, NewDeleteRelationshipsUsecase, NewCheckUsecase, NewGetSubjectsUseCase, NewGetResourcesUseCase, NewIsBackendAvailableUsecase)
var ProviderSet = wire.NewSet(NewCreateRelationshipsUsecase, NewReadRelationshipsUsecase, NewDeleteRelationshipsUsecase, NewCheckUsecase, NewGetSubjectsUseCase, NewGetResourcesUseCase, NewIsBackendAvailableUsecase, NewImportBulkTuplesUsecase)
15 changes: 15 additions & 0 deletions internal/biz/relationships.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package biz

import (
"context"
"google.golang.org/grpc"

v1beta1 "github.com/project-kessel/relations-api/api/kessel/relations/v1beta1"

Expand Down Expand Up @@ -34,6 +35,7 @@ type ZanzibarRepository interface {
LookupSubjects(ctx context.Context, subjectType *v1beta1.ObjectType, subject_relation, relation string, resource *v1beta1.ObjectReference, limit uint32, continuation ContinuationToken) (chan *SubjectResult, chan error, error)
LookupResources(ctx context.Context, resouce_type *v1beta1.ObjectType, relation string, subject *v1beta1.SubjectReference, limit uint32, continuation ContinuationToken) (chan *ResourceResult, chan error, error)
IsBackendAvailable() error
ImportBulkTuples(stream grpc.ClientStreamingServer[v1beta1.ImportBulkTuplesRequest, v1beta1.ImportBulkTuplesResponse]) error
}

type CheckUsecase struct {
Expand Down Expand Up @@ -106,3 +108,16 @@ func NewDeleteRelationshipsUsecase(repo ZanzibarRepository, logger log.Logger) *
func (rc *DeleteRelationshipsUsecase) DeleteRelationships(ctx context.Context, r *v1beta1.RelationTupleFilter) error {
return rc.repo.DeleteRelationships(ctx, r)
}

type ImportBulkTuplesUsecase struct {
repo ZanzibarRepository
log *log.Helper
}

func NewImportBulkTuplesUsecase(repo ZanzibarRepository, logger log.Logger) *ImportBulkTuplesUsecase {
return &ImportBulkTuplesUsecase{repo: repo, log: log.NewHelper(logger)}
}

func (rc *ImportBulkTuplesUsecase) ImportBulkTuples(client grpc.ClientStreamingServer[v1beta1.ImportBulkTuplesRequest, v1beta1.ImportBulkTuplesResponse]) error {
return rc.repo.ImportBulkTuples(client)
}
2 changes: 1 addition & 1 deletion internal/data/LocalSpiceDbContainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
// SpicedbImage is the image used for containerized spiceDB in tests
SpicedbImage = "authzed/spicedb"
// SpicedbVersion is the image version used for containerized spiceDB in tests
SpicedbVersion = "v1.22.2"
SpicedbVersion = "v1.37.0"
// SpicedbSchemaBootstrapFile specifies an optional bootstrap schema file to be used for testing
SpicedbSchemaBootstrapFile = "spicedb-test-data/basic_schema.zed"
// SpicedbRelationsBootstrapFile specifies an optional bootstrap file containing relations to be used for testing
Expand Down
45 changes: 45 additions & 0 deletions internal/data/spicedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
"os"
"strings"
Expand Down Expand Up @@ -242,6 +244,49 @@ func (s *SpiceDbRepository) LookupResources(ctx context.Context, resouce_type *a
return resources, errs, nil
}

func (s *SpiceDbRepository) ImportBulkTuples(stream grpc.ClientStreamingServer[apiV1beta1.ImportBulkTuplesRequest, apiV1beta1.ImportBulkTuplesResponse]) error {
if err := s.initialize(); err != nil {
return err
}

var totalImported uint64
for {
req, err := stream.Recv()
if err != nil {
if !errors.Is(err, io.EOF) {
if err := stream.SendAndClose(&apiV1beta1.ImportBulkTuplesResponse{NumImported: totalImported}); err != nil {
return status.Errorf(codes.Internal, "failed to send response: %v", err)
}
}
return err
}
inputRelationships := (*req).Tuples
batch := []*v1.Relationship{}
for _, tuple := range inputRelationships {
batch = append(batch, createSpiceDbRelationship(tuple))
akoserwal marked this conversation as resolved.
Show resolved Hide resolved
}
client, err := s.client.ImportBulkRelationships(context.Background())
Copy link
Member

@alechenninger alechenninger Oct 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am reading this right, it looks like this method...

  1. Gets a batch from the input stream
  2. Starts a streaming RPC with SpiceDB
  3. Sends one batch on that stream
  4. Closes that stream

And then repeats for each batch.

However what we want is:

  1. Start the spicedb stream
  2. For each batch, send that batch to SpiceDB
  3. Then close the stream after there are no more input batches (the client closes the stream on Relations end)

In other words, if a client has one streaming RPC with Relations, there should only be one equivalent streaming RPC with SpiceDB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thank you for the suggest. I update the code.

if err != nil {
return fmt.Errorf("failed to create SpiceDB client: %w", err)
}
if err = client.Send((*v1.ImportBulkRelationshipsRequest)(&v1.BulkImportRelationshipsRequest{
Relationships: batch,
})); err != nil {
if !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send bulkimport request: %w", err)
}
return err
}
if res, err := client.CloseAndRecv(); err != nil {
return fmt.Errorf("error receiving response from Spicedb for bulkimport request: %w", err)
} else {
log.Infof("total number of relationships loaded: %d", res.NumLoaded)
totalImported = res.NumLoaded
return stream.SendAndClose(&apiV1beta1.ImportBulkTuplesResponse{NumImported: totalImported})
}
}
}

func (s *SpiceDbRepository) CreateRelationships(ctx context.Context, rels []*apiV1beta1.Relationship, touch biz.TouchSemantics) error {
if err := s.initialize(); err != nil {
return err
Expand Down
77 changes: 77 additions & 0 deletions internal/data/spicedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package data
import (
"context"
"fmt"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc/metadata"
"io"
"os"
"testing"

Expand Down Expand Up @@ -180,6 +183,80 @@ func TestSecondCreateRelationshipSucceedsWithTouchTrue(t *testing.T) {
assert.True(t, exists)
}

type MockgRPCClientStream struct {
mock.Mock
}

func (m *MockgRPCClientStream) SetHeader(md metadata.MD) error {
panic("implement me")
}

func (m *MockgRPCClientStream) SendHeader(md metadata.MD) error {
panic("implement me")
}

func (m *MockgRPCClientStream) SetTrailer(md metadata.MD) {
panic("implement me")
}

func (m *MockgRPCClientStream) Context() context.Context {
panic("implement me")
}

func (m *MockgRPCClientStream) SendMsg(_ any) error {
panic("implement me")
}

func (m *MockgRPCClientStream) RecvMsg(_ any) error {
panic("implement me")
}

func (m *MockgRPCClientStream) Recv() (*apiV1beta1.ImportBulkTuplesRequest, error) {
args := m.Called()
if req, ok := args.Get(0).(*apiV1beta1.ImportBulkTuplesRequest); ok {
return req, args.Error(1)
}
return nil, args.Error(1)
}

// SendAndClose simulates sending a response and closing the stream
func (m *MockgRPCClientStream) SendAndClose(resp *apiV1beta1.ImportBulkTuplesResponse) error {
args := m.Called(resp)
return args.Error(0)
}

func (m *MockgRPCClientStream) CloseAndRecv() (*apiV1beta1.ImportBulkTuplesResponse, error) {
args := m.Called()
if res, ok := args.Get(0).(*apiV1beta1.ImportBulkTuplesResponse); ok {
return res, args.Error(1)
}
return nil, args.Error(1)
}

func TestImportBulkTuples(t *testing.T) {
rels := []*apiV1beta1.Relationship{
createRelationship("rbac", "group", "bob_club", "t_member", "rbac", "user", "bob5", ""),
akoserwal marked this conversation as resolved.
Show resolved Hide resolved
createRelationship("rbac", "group", "bob_club", "t_member", "rbac", "user", "bob3", ""),
createRelationship("rbac", "group", "bob_club", "t_member", "rbac", "user", "bob6", ""),
createRelationship("rbac", "group", "bob_club", "t_member", "rbac", "user", "bob9", ""),
}

mockgRPCClientStream := new(MockgRPCClientStream)
mockgRPCClientStream.On("Recv").Return(&apiV1beta1.ImportBulkTuplesRequest{Tuples: rels}, nil).Once()
mockgRPCClientStream.On("Recv").Return(nil, io.EOF).Once()
mockgRPCClientStream.On("SendAndClose", &apiV1beta1.ImportBulkTuplesResponse{NumImported: uint64(len(rels))}).Return(nil)

spiceDbRepo, err := container.CreateSpiceDbRepository()
assert.NoError(t, err)

err = spiceDbRepo.ImportBulkTuples(mockgRPCClientStream)
assert.NoError(t, err)
container.WaitForQuantizationInterval()

exists := CheckForRelationship(spiceDbRepo, "bob5", "rbac", "user", "", "t_member", "rbac", "group", "bob_club")
assert.True(t, exists)
}

func TestIsBackendAvailable(t *testing.T) {
t.Parallel()

Expand Down
29 changes: 20 additions & 9 deletions internal/service/relationships.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package service
import (
"context"
"fmt"
"google.golang.org/grpc"

"github.com/project-kessel/relations-api/internal/biz"

Expand All @@ -13,18 +14,20 @@ import (

type RelationshipsService struct {
pb.UnimplementedKesselTupleServiceServer
createUsecase *biz.CreateRelationshipsUsecase
readUsecase *biz.ReadRelationshipsUsecase
deleteUsecase *biz.DeleteRelationshipsUsecase
log *log.Helper
createUsecase *biz.CreateRelationshipsUsecase
readUsecase *biz.ReadRelationshipsUsecase
deleteUsecase *biz.DeleteRelationshipsUsecase
importBulkUsecase *biz.ImportBulkTuplesUsecase
log *log.Helper
}

func NewRelationshipsService(logger log.Logger, createUseCase *biz.CreateRelationshipsUsecase, readUsecase *biz.ReadRelationshipsUsecase, deleteUsecase *biz.DeleteRelationshipsUsecase) *RelationshipsService {
func NewRelationshipsService(logger log.Logger, createUseCase *biz.CreateRelationshipsUsecase, readUsecase *biz.ReadRelationshipsUsecase, deleteUsecase *biz.DeleteRelationshipsUsecase, importBulkUsecase *biz.ImportBulkTuplesUsecase) *RelationshipsService {
return &RelationshipsService{
log: log.NewHelper(logger),
createUsecase: createUseCase,
readUsecase: readUsecase,
deleteUsecase: deleteUsecase,
log: log.NewHelper(logger),
createUsecase: createUseCase,
readUsecase: readUsecase,
deleteUsecase: deleteUsecase,
importBulkUsecase: importBulkUsecase,
}
}

Expand Down Expand Up @@ -72,3 +75,11 @@ func (s *RelationshipsService) DeleteTuples(ctx context.Context, req *pb.DeleteT

return &pb.DeleteTuplesResponse{}, nil
}

func (s *RelationshipsService) ImportBulkTuples(stream grpc.ClientStreamingServer[pb.ImportBulkTuplesRequest, pb.ImportBulkTuplesResponse]) error {
err := s.importBulkUsecase.ImportBulkTuples(stream)
if err != nil {
return fmt.Errorf("error import bulk tuples: %w", err)
}
return nil
}
9 changes: 6 additions & 3 deletions internal/service/relationships_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ func setup(t *testing.T) (error, *RelationshipsService) {
createRelationshipsUsecase := biz.NewCreateRelationshipsUsecase(spiceDbRepository, logger)
readRelationshipsUsecase := biz.NewReadRelationshipsUsecase(spiceDbRepository, logger)
deleteRelationshipsUsecase := biz.NewDeleteRelationshipsUsecase(spiceDbRepository, logger)
relationshipsService := NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase)
importBulkUsecase := biz.NewImportBulkTuplesUsecase(spiceDbRepository, logger)
relationshipsService := NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase, importBulkUsecase)
return err, relationshipsService
}

Expand All @@ -287,7 +288,8 @@ func TestRelationshipsService_ReadRelationships(t *testing.T) {
createRelationshipsUsecase := biz.NewCreateRelationshipsUsecase(spiceDbRepository, logger)
readRelationshipsUsecase := biz.NewReadRelationshipsUsecase(spiceDbRepository, logger)
deleteRelationshipsUsecase := biz.NewDeleteRelationshipsUsecase(spiceDbRepository, logger)
relationshipsService := NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase)
bulkImportTuplesUsecase := biz.NewImportBulkTuplesUsecase(spiceDbRepository, logger)
relationshipsService := NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase, bulkImportTuplesUsecase)

expected := createRelationship(rbac_ns_type("group"), "bob_club", "member", rbac_ns_type("user"), "bob", "")

Expand Down Expand Up @@ -341,7 +343,8 @@ func TestRelationshipsService_ReadRelationships_Paginated(t *testing.T) {
createRelationshipsUsecase := biz.NewCreateRelationshipsUsecase(spiceDbRepository, logger)
readRelationshipsUsecase := biz.NewReadRelationshipsUsecase(spiceDbRepository, logger)
deleteRelationshipsUsecase := biz.NewDeleteRelationshipsUsecase(spiceDbRepository, logger)
relationshipsService := NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase)
bulkImportTuplesUsecase := biz.NewImportBulkTuplesUsecase(spiceDbRepository, logger)
relationshipsService := NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase, bulkImportTuplesUsecase)

expected1 := createRelationship(rbac_ns_type("group"), "bob_club", "member", rbac_ns_type("user"), "bob", "")
expected2 := createRelationship(rbac_ns_type("group"), "other_bob_club", "member", rbac_ns_type("user"), "bob", "")
Expand Down
Loading