Skip to content

Commit

Permalink
fix to start a single spicedb stream for each batch and close stream …
Browse files Browse the repository at this point in the history
…for no inputs
  • Loading branch information
akoserwal committed Oct 4, 2024
1 parent f3b7cfd commit d75289e
Showing 1 changed file with 21 additions and 17 deletions.
38 changes: 21 additions & 17 deletions internal/data/spicedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,26 +250,36 @@ func (s *SpiceDbRepository) ImportBulkTuples(stream grpc.ClientStreamingServer[a
}

var totalImported uint64
client, err := s.client.ImportBulkRelationships(context.Background())
if err != nil {
return fmt.Errorf("failed to create SpiceDB client: %w", err)
}

for {
req, err := stream.Recv()
if err != nil {
if !errors.Is(err, io.EOF) {
if err := stream.SendAndClose(&apiV1beta1.ImportBulkTuplesResponse{NumImported: totalImported}); err != nil {
return status.Errorf(codes.Internal, "failed to send response: %v", err)
req, streamErr := stream.Recv()
if streamErr != nil {
if req == nil && errors.Is(streamErr, io.EOF) {
if res, closeErr := client.CloseAndRecv(); closeErr != nil {
return fmt.Errorf("error receiving response from Spicedb for bulkimport request: %w", closeErr)
} else {
log.Infof("total number of relationships loaded: %d", res.NumLoaded)
totalImported = res.NumLoaded
return stream.SendAndClose(&apiV1beta1.ImportBulkTuplesResponse{NumImported: totalImported})
}
}
return err
if !errors.Is(streamErr, io.EOF) {
if streamErr = stream.SendAndClose(&apiV1beta1.ImportBulkTuplesResponse{NumImported: totalImported}); err != nil {
return status.Errorf(codes.Internal, "failed to send response: %v", streamErr)
}
}
return streamErr
}
inputRelationships := (*req).Tuples
batch := []*v1.Relationship{}
for _, tuple := range inputRelationships {
tuple.Relation = addRelationPrefix(tuple.Relation, relationPrefix)
batch = append(batch, createSpiceDbRelationship(tuple))
}
client, err := s.client.ImportBulkRelationships(context.Background())
if err != nil {
return fmt.Errorf("failed to create SpiceDB client: %w", err)
}
if err = client.Send((*v1.ImportBulkRelationshipsRequest)(&v1.BulkImportRelationshipsRequest{
Relationships: batch,
})); err != nil {
Expand All @@ -278,14 +288,8 @@ func (s *SpiceDbRepository) ImportBulkTuples(stream grpc.ClientStreamingServer[a
}
return err
}
if res, err := client.CloseAndRecv(); err != nil {
return fmt.Errorf("error receiving response from Spicedb for bulkimport request: %w", err)
} else {
log.Infof("total number of relationships loaded: %d", res.NumLoaded)
totalImported = res.NumLoaded
return stream.SendAndClose(&apiV1beta1.ImportBulkTuplesResponse{NumImported: totalImported})
}
}

}

func (s *SpiceDbRepository) CreateRelationships(ctx context.Context, rels []*apiV1beta1.Relationship, touch biz.TouchSemantics) error {
Expand Down

0 comments on commit d75289e

Please sign in to comment.