Skip to content

Commit

Permalink
[ENH] Server side pull logs (#1764)
Browse files Browse the repository at this point in the history
## Description of changes
https://linear.app/trychroma/issue/CHR-296/pull-logs-api
- PullLogs implementation in DAO
- PullLogs API for gRPC
- DB error handling and retry is not included

## Test plan
- [ ] DAO tests
- [ ] grpc tests
  • Loading branch information
weiligu authored Feb 23, 2024
1 parent fc48ebf commit 765e218
Show file tree
Hide file tree
Showing 15 changed files with 651 additions and 122 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*_pb2.py* linguist-generated
*_pb2_grpc.py* linguist-generated
31 changes: 21 additions & 10 deletions chromadb/proto/logservice_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 45 additions & 3 deletions chromadb/proto/logservice_pb2.pyi

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

108 changes: 83 additions & 25 deletions chromadb/proto/logservice_pb2_grpc.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/coordinator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ require (
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
14 changes: 14 additions & 0 deletions go/coordinator/internal/grpcutils/response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package grpcutils

import (
"github.com/chroma/chroma-coordinator/internal/types"
"github.com/pingcap/log"
"go.uber.org/zap"
"google.golang.org/genproto/googleapis/rpc/errdetails"
Expand Down Expand Up @@ -29,3 +30,16 @@ func BuildInvalidArgumentGrpcError(fieldName string, desc string) (error, error)
func BuildInternalGrpcError(msg string) error {
return status.Error(codes.Internal, msg)
}

func BuildErrorForCollectionId(collectionID types.UniqueID, err error) error {
if err != nil || collectionID == types.NilUniqueID() {
log.Error("collection id format error", zap.String("collection.id", collectionID.String()))
grpcError, err := BuildInvalidArgumentGrpcError("collection_id", "wrong collection_id format")
if err != nil {
log.Error("error building grpc error", zap.Error(err))
return err
}
return grpcError
}
return nil
}
6 changes: 6 additions & 0 deletions go/coordinator/internal/logservice/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@ package logservice
import (
"context"
"github.com/chroma/chroma-coordinator/internal/common"
"github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel"
"github.com/chroma/chroma-coordinator/internal/types"
)

type (
IRecordLog interface {
common.Component
PushLogs(ctx context.Context, collectionID types.UniqueID, recordContent [][]byte) (int, error)
PullLogs(ctx context.Context, collectionID types.UniqueID, id int64, batchSize int) ([]*dbmodel.RecordLog, error)
}
)

func (s *RecordLog) PushLogs(ctx context.Context, collectionID types.UniqueID, recordsContent [][]byte) (int, error) {
return s.recordLogDb.PushLogs(collectionID, recordsContent)
}

func (s *RecordLog) PullLogs(ctx context.Context, collectionID types.UniqueID, id int64, batchSize int) ([]*dbmodel.RecordLog, error) {
return s.recordLogDb.PullLogs(collectionID, id, batchSize)
}
38 changes: 30 additions & 8 deletions go/coordinator/internal/logservice/grpc/record_log_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpc
import (
"context"
"github.com/chroma/chroma-coordinator/internal/grpcutils"
"github.com/chroma/chroma-coordinator/internal/proto/coordinatorpb"
"github.com/chroma/chroma-coordinator/internal/proto/logservicepb"
"github.com/chroma/chroma-coordinator/internal/types"
"github.com/pingcap/log"
Expand All @@ -13,14 +14,9 @@ import (
func (s *Server) PushLogs(ctx context.Context, req *logservicepb.PushLogsRequest) (*logservicepb.PushLogsResponse, error) {
res := &logservicepb.PushLogsResponse{}
collectionID, err := types.ToUniqueID(&req.CollectionId)
if err != nil || collectionID == types.NilUniqueID() {
log.Error("collection id format error", zap.String("collection.id", req.CollectionId))
grpcError, err := grpcutils.BuildInvalidArgumentGrpcError("collection_id", "wrong collection_id format")
if err != nil {
log.Error("error building grpc error", zap.Error(err))
return nil, err
}
return nil, grpcError
err = grpcutils.BuildErrorForCollectionId(collectionID, err)
if err != nil {
return nil, err
}
var recordsContent [][]byte
for _, record := range req.Records {
Expand All @@ -45,3 +41,29 @@ func (s *Server) PushLogs(ctx context.Context, req *logservicepb.PushLogsRequest
log.Info("PushLogs success", zap.String("collectionID", req.CollectionId), zap.Int("recordCount", recordCount))
return res, nil
}

func (s *Server) PullLogs(ctx context.Context, req *logservicepb.PullLogsRequest) (*logservicepb.PullLogsResponse, error) {
res := &logservicepb.PullLogsResponse{}
collectionID, err := types.ToUniqueID(&req.CollectionId)
err = grpcutils.BuildErrorForCollectionId(collectionID, err)
if err != nil {
return nil, err
}
records := make([]*coordinatorpb.SubmitEmbeddingRecord, 0)
recordLogs, err := s.logService.PullLogs(ctx, collectionID, req.GetStartFromId(), int(req.BatchSize))
for index := range recordLogs {
record := &coordinatorpb.SubmitEmbeddingRecord{}
if err := proto.Unmarshal(*recordLogs[index].Record, record); err != nil {
log.Error("Unmarshal error", zap.Error(err))
grpcError, err := grpcutils.BuildInvalidArgumentGrpcError("records", "marshaling error")
if err != nil {
return nil, err
}
return nil, grpcError
}
records = append(records, record)
}
res.Records = records
log.Info("PullLogs success", zap.String("collectionID", req.CollectionId), zap.Int("recordCount", len(records)))
return res, nil
}
Loading

0 comments on commit 765e218

Please sign in to comment.