diff --git a/cmd/kessel-relations/wire_gen.go b/cmd/kessel-relations/wire_gen.go index d3878d5..a855b8a 100644 --- a/cmd/kessel-relations/wire_gen.go +++ b/cmd/kessel-relations/wire_gen.go @@ -31,7 +31,8 @@ func wireApp(confServer *conf.Server, confData *conf.Data, logger log.Logger) (* createRelationshipsUsecase := biz.NewCreateRelationshipsUsecase(spiceDbRepository, logger) readRelationshipsUsecase := biz.NewReadRelationshipsUsecase(spiceDbRepository, logger) deleteRelationshipsUsecase := biz.NewDeleteRelationshipsUsecase(spiceDbRepository, logger) - relationshipsService := service.NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase) + importBulkTuplesUsecase := biz.NewImportBulkTuplesUsecase(spiceDbRepository, logger) + relationshipsService := service.NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase,importBulkTuplesUsecase) isBackendAvaliableUsecase := biz.NewIsBackendAvailableUsecase(spiceDbRepository) healthService := service.NewHealthService(isBackendAvaliableUsecase) checkUsecase := biz.NewCheckUsecase(spiceDbRepository, logger) diff --git a/go.mod b/go.mod index 6c1410e..ddd56f9 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/project-kessel/relations-api go 1.22.7 + require ( buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.34.2-20240920164238-5a7b106cbb87.2 github.com/MicahParks/keyfunc/v3 v3.3.5 @@ -75,6 +76,7 @@ require ( github.com/samber/lo v1.47.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect diff --git a/go.sum b/go.sum index 684f7f0..99ed31b 100644 --- a/go.sum +++ b/go.sum @@ -182,6 +182,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= diff --git a/internal/biz/biz.go b/internal/biz/biz.go index e41d884..dc413cd 100644 --- a/internal/biz/biz.go +++ b/internal/biz/biz.go @@ -5,4 +5,4 @@ import ( ) // ProviderSet is biz providers. -var ProviderSet = wire.NewSet(NewCreateRelationshipsUsecase, NewReadRelationshipsUsecase, NewDeleteRelationshipsUsecase, NewCheckUsecase, NewGetSubjectsUseCase, NewGetResourcesUseCase, NewIsBackendAvailableUsecase) +var ProviderSet = wire.NewSet(NewCreateRelationshipsUsecase, NewReadRelationshipsUsecase, NewDeleteRelationshipsUsecase, NewCheckUsecase, NewGetSubjectsUseCase, NewGetResourcesUseCase, NewIsBackendAvailableUsecase, NewImportBulkTuplesUsecase) diff --git a/internal/biz/relationships.go b/internal/biz/relationships.go index 536a52b..46d63f1 100644 --- a/internal/biz/relationships.go +++ b/internal/biz/relationships.go @@ -2,6 +2,7 @@ package biz import ( "context" + "google.golang.org/grpc" v1beta1 "github.com/project-kessel/relations-api/api/kessel/relations/v1beta1" @@ -34,6 +35,7 @@ type ZanzibarRepository interface { LookupSubjects(ctx context.Context, subjectType *v1beta1.ObjectType, subject_relation, relation string, resource *v1beta1.ObjectReference, limit uint32, continuation ContinuationToken) (chan *SubjectResult, chan error, error) LookupResources(ctx context.Context, resouce_type *v1beta1.ObjectType, relation string, subject *v1beta1.SubjectReference, limit uint32, continuation ContinuationToken) (chan *ResourceResult, chan error, error) IsBackendAvailable() error + ImportBulkTuples(stream grpc.ClientStreamingServer[v1beta1.ImportBulkTuplesRequest, v1beta1.ImportBulkTuplesResponse]) error } type CheckUsecase struct { @@ -106,3 +108,16 @@ func NewDeleteRelationshipsUsecase(repo ZanzibarRepository, logger log.Logger) * func (rc *DeleteRelationshipsUsecase) DeleteRelationships(ctx context.Context, r *v1beta1.RelationTupleFilter) error { return rc.repo.DeleteRelationships(ctx, r) } + +type ImportBulkTuplesUsecase struct { + repo ZanzibarRepository + log *log.Helper +} + +func NewImportBulkTuplesUsecase(repo ZanzibarRepository, logger log.Logger) *ImportBulkTuplesUsecase { + return &ImportBulkTuplesUsecase{repo: repo, log: log.NewHelper(logger)} +} + +func (rc *ImportBulkTuplesUsecase) ImportBulkTuples(client grpc.ClientStreamingServer[v1beta1.ImportBulkTuplesRequest, v1beta1.ImportBulkTuplesResponse]) error { + return rc.repo.ImportBulkTuples(client) +} diff --git a/internal/data/LocalSpiceDbContainer.go b/internal/data/LocalSpiceDbContainer.go index 7b74b5a..b0dc9b1 100644 --- a/internal/data/LocalSpiceDbContainer.go +++ b/internal/data/LocalSpiceDbContainer.go @@ -28,7 +28,7 @@ const ( // SpicedbImage is the image used for containerized spiceDB in tests SpicedbImage = "authzed/spicedb" // SpicedbVersion is the image version used for containerized spiceDB in tests - SpicedbVersion = "v1.22.2" + SpicedbVersion = "v1.37.0" // SpicedbSchemaBootstrapFile specifies an optional bootstrap schema file to be used for testing SpicedbSchemaBootstrapFile = "spicedb-test-data/basic_schema.zed" // SpicedbRelationsBootstrapFile specifies an optional bootstrap file containing relations to be used for testing diff --git a/internal/data/spicedb.go b/internal/data/spicedb.go index c518ad2..34801bd 100644 --- a/internal/data/spicedb.go +++ b/internal/data/spicedb.go @@ -242,6 +242,49 @@ func (s *SpiceDbRepository) LookupResources(ctx context.Context, resouce_type *a return resources, errs, nil } +func (s *SpiceDbRepository) ImportBulkTuples(stream grpc.ClientStreamingServer[apiV1beta1.ImportBulkTuplesRequest, apiV1beta1.ImportBulkTuplesResponse]) error { + if err := s.initialize(); err != nil { + return err + } + + var totalImported uint64 + client, err := s.client.ImportBulkRelationships(context.Background()) + if err != nil { + return fmt.Errorf("failed to create SpiceDB client: %w", err) + } + + for { + req, streamErr := stream.Recv() + if streamErr != nil { + if req == nil && errors.Is(streamErr, io.EOF) { + if res, closeErr := client.CloseAndRecv(); closeErr != nil { + return fmt.Errorf("error receiving response from Spicedb for bulkimport request: %w", closeErr) + } else { + log.Infof("total number of relationships loaded: %d", res.NumLoaded) + totalImported = res.NumLoaded + return stream.SendAndClose(&apiV1beta1.ImportBulkTuplesResponse{NumImported: totalImported}) + } + } + return streamErr + } + inputRelationships := (*req).Tuples + batch := []*v1.Relationship{} + for _, tuple := range inputRelationships { + tuple.Relation = addRelationPrefix(tuple.Relation, relationPrefix) + batch = append(batch, createSpiceDbRelationship(tuple)) + } + if err = client.Send((*v1.ImportBulkRelationshipsRequest)(&v1.BulkImportRelationshipsRequest{ + Relationships: batch, + })); err != nil { + if !errors.Is(err, io.EOF) { + return fmt.Errorf("failed to send bulkimport request: %w", err) + } + return err + } + } + +} + func (s *SpiceDbRepository) CreateRelationships(ctx context.Context, rels []*apiV1beta1.Relationship, touch biz.TouchSemantics) error { if err := s.initialize(); err != nil { return err diff --git a/internal/data/spicedb_test.go b/internal/data/spicedb_test.go index 11cfb48..05a40be 100644 --- a/internal/data/spicedb_test.go +++ b/internal/data/spicedb_test.go @@ -3,6 +3,9 @@ package data import ( "context" "fmt" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc/metadata" + "io" "os" "testing" @@ -180,6 +183,80 @@ func TestSecondCreateRelationshipSucceedsWithTouchTrue(t *testing.T) { assert.True(t, exists) } +type MockgRPCClientStream struct { + mock.Mock +} + +func (m *MockgRPCClientStream) SetHeader(md metadata.MD) error { + panic("implement me") +} + +func (m *MockgRPCClientStream) SendHeader(md metadata.MD) error { + panic("implement me") +} + +func (m *MockgRPCClientStream) SetTrailer(md metadata.MD) { + panic("implement me") +} + +func (m *MockgRPCClientStream) Context() context.Context { + panic("implement me") +} + +func (m *MockgRPCClientStream) SendMsg(_ any) error { + panic("implement me") +} + +func (m *MockgRPCClientStream) RecvMsg(_ any) error { + panic("implement me") +} + +func (m *MockgRPCClientStream) Recv() (*apiV1beta1.ImportBulkTuplesRequest, error) { + args := m.Called() + if req, ok := args.Get(0).(*apiV1beta1.ImportBulkTuplesRequest); ok { + return req, args.Error(1) + } + return nil, args.Error(1) +} + +// SendAndClose simulates sending a response and closing the stream +func (m *MockgRPCClientStream) SendAndClose(resp *apiV1beta1.ImportBulkTuplesResponse) error { + args := m.Called(resp) + return args.Error(0) +} + +func (m *MockgRPCClientStream) CloseAndRecv() (*apiV1beta1.ImportBulkTuplesResponse, error) { + args := m.Called() + if res, ok := args.Get(0).(*apiV1beta1.ImportBulkTuplesResponse); ok { + return res, args.Error(1) + } + return nil, args.Error(1) +} + +func TestImportBulkTuples(t *testing.T) { + rels := []*apiV1beta1.Relationship{ + createRelationship("rbac", "group", "bob_club", "member", "rbac", "user", "bob5", ""), + createRelationship("rbac", "group", "bob_club", "member", "rbac", "user", "bob3", ""), + createRelationship("rbac", "group", "bob_club", "member", "rbac", "user", "bob6", ""), + createRelationship("rbac", "group", "bob_club", "member", "rbac", "user", "bob9", ""), + } + + mockgRPCClientStream := new(MockgRPCClientStream) + mockgRPCClientStream.On("Recv").Return(&apiV1beta1.ImportBulkTuplesRequest{Tuples: rels}, nil).Once() + mockgRPCClientStream.On("Recv").Return(nil, io.EOF).Once() + mockgRPCClientStream.On("SendAndClose", &apiV1beta1.ImportBulkTuplesResponse{NumImported: uint64(len(rels))}).Return(nil) + + spiceDbRepo, err := container.CreateSpiceDbRepository() + assert.NoError(t, err) + + err = spiceDbRepo.ImportBulkTuples(mockgRPCClientStream) + assert.NoError(t, err) + container.WaitForQuantizationInterval() + + exists := CheckForRelationship(spiceDbRepo, "bob5", "rbac", "user", "", "member", "rbac", "group", "bob_club") + assert.True(t, exists) +} + func TestIsBackendAvailable(t *testing.T) { t.Parallel() diff --git a/internal/service/relationships.go b/internal/service/relationships.go index 2b118a1..1a736d2 100644 --- a/internal/service/relationships.go +++ b/internal/service/relationships.go @@ -3,6 +3,7 @@ package service import ( "context" "fmt" + "google.golang.org/grpc" "github.com/project-kessel/relations-api/internal/biz" @@ -13,18 +14,20 @@ import ( type RelationshipsService struct { pb.UnimplementedKesselTupleServiceServer - createUsecase *biz.CreateRelationshipsUsecase - readUsecase *biz.ReadRelationshipsUsecase - deleteUsecase *biz.DeleteRelationshipsUsecase - log *log.Helper + createUsecase *biz.CreateRelationshipsUsecase + readUsecase *biz.ReadRelationshipsUsecase + deleteUsecase *biz.DeleteRelationshipsUsecase + importBulkUsecase *biz.ImportBulkTuplesUsecase + log *log.Helper } -func NewRelationshipsService(logger log.Logger, createUseCase *biz.CreateRelationshipsUsecase, readUsecase *biz.ReadRelationshipsUsecase, deleteUsecase *biz.DeleteRelationshipsUsecase) *RelationshipsService { +func NewRelationshipsService(logger log.Logger, createUseCase *biz.CreateRelationshipsUsecase, readUsecase *biz.ReadRelationshipsUsecase, deleteUsecase *biz.DeleteRelationshipsUsecase, importBulkUsecase *biz.ImportBulkTuplesUsecase) *RelationshipsService { return &RelationshipsService{ - log: log.NewHelper(logger), - createUsecase: createUseCase, - readUsecase: readUsecase, - deleteUsecase: deleteUsecase, + log: log.NewHelper(logger), + createUsecase: createUseCase, + readUsecase: readUsecase, + deleteUsecase: deleteUsecase, + importBulkUsecase: importBulkUsecase, } } @@ -72,3 +75,11 @@ func (s *RelationshipsService) DeleteTuples(ctx context.Context, req *pb.DeleteT return &pb.DeleteTuplesResponse{}, nil } + +func (s *RelationshipsService) ImportBulkTuples(stream grpc.ClientStreamingServer[pb.ImportBulkTuplesRequest, pb.ImportBulkTuplesResponse]) error { + err := s.importBulkUsecase.ImportBulkTuples(stream) + if err != nil { + return fmt.Errorf("error import bulk tuples: %w", err) + } + return nil +} diff --git a/internal/service/relationships_test.go b/internal/service/relationships_test.go index 6008e8b..bc4363a 100644 --- a/internal/service/relationships_test.go +++ b/internal/service/relationships_test.go @@ -267,7 +267,8 @@ func setup(t *testing.T) (error, *RelationshipsService) { createRelationshipsUsecase := biz.NewCreateRelationshipsUsecase(spiceDbRepository, logger) readRelationshipsUsecase := biz.NewReadRelationshipsUsecase(spiceDbRepository, logger) deleteRelationshipsUsecase := biz.NewDeleteRelationshipsUsecase(spiceDbRepository, logger) - relationshipsService := NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase) + importBulkUsecase := biz.NewImportBulkTuplesUsecase(spiceDbRepository, logger) + relationshipsService := NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase, importBulkUsecase) return err, relationshipsService } @@ -287,7 +288,8 @@ func TestRelationshipsService_ReadRelationships(t *testing.T) { createRelationshipsUsecase := biz.NewCreateRelationshipsUsecase(spiceDbRepository, logger) readRelationshipsUsecase := biz.NewReadRelationshipsUsecase(spiceDbRepository, logger) deleteRelationshipsUsecase := biz.NewDeleteRelationshipsUsecase(spiceDbRepository, logger) - relationshipsService := NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase) + bulkImportTuplesUsecase := biz.NewImportBulkTuplesUsecase(spiceDbRepository, logger) + relationshipsService := NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase, bulkImportTuplesUsecase) expected := createRelationship(rbac_ns_type("group"), "bob_club", "member", rbac_ns_type("user"), "bob", "") @@ -341,7 +343,8 @@ func TestRelationshipsService_ReadRelationships_Paginated(t *testing.T) { createRelationshipsUsecase := biz.NewCreateRelationshipsUsecase(spiceDbRepository, logger) readRelationshipsUsecase := biz.NewReadRelationshipsUsecase(spiceDbRepository, logger) deleteRelationshipsUsecase := biz.NewDeleteRelationshipsUsecase(spiceDbRepository, logger) - relationshipsService := NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase) + bulkImportTuplesUsecase := biz.NewImportBulkTuplesUsecase(spiceDbRepository, logger) + relationshipsService := NewRelationshipsService(logger, createRelationshipsUsecase, readRelationshipsUsecase, deleteRelationshipsUsecase, bulkImportTuplesUsecase) expected1 := createRelationship(rbac_ns_type("group"), "bob_club", "member", rbac_ns_type("user"), "bob", "") expected2 := createRelationship(rbac_ns_type("group"), "other_bob_club", "member", rbac_ns_type("user"), "bob", "")