diff --git a/.gitignore b/.gitignore index 316c32cb6649..86c51ffb749b 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ dist terraform.tfstate .hypothesis/ .idea +chroma_env/ diff --git a/bin/reset.sh b/bin/reset.sh new file mode 100755 index 000000000000..92fb04d8224f --- /dev/null +++ b/bin/reset.sh @@ -0,0 +1,13 @@ + #!/usr/bin/env bash + +eval $(minikube -p chroma-test docker-env) + +docker build -t chroma-coordinator:latest -f go/coordinator/Dockerfile . + +kubectl delete deployment coordinator -n chroma + +# Apply the kubernetes manifests +kubectl apply -f k8s/deployment +kubectl apply -f k8s/crd +kubectl apply -f k8s/cr +kubectl apply -f k8s/test diff --git a/go/coordinator/internal/coordinator/apis_test.go b/go/coordinator/internal/coordinator/apis_test.go index c2535f0128f1..bbd596d8a4c6 100644 --- a/go/coordinator/internal/coordinator/apis_test.go +++ b/go/coordinator/internal/coordinator/apis_test.go @@ -627,28 +627,30 @@ func TestUpdateSegment(t *testing.T) { assert.Equal(t, []*model.Segment{segment}, result) // Add a new metadata key - metadata.Set("test_str2", &model.SegmentMetadataValueStringType{Value: "str2"}) + segment.Metadata.Set("test_str2", &model.SegmentMetadataValueStringType{Value: "str2"}) c.UpdateSegment(ctx, &model.UpdateSegment{ ID: segment.ID, - Metadata: metadata}) + Metadata: segment.Metadata}) result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID()) assert.NoError(t, err) assert.Equal(t, []*model.Segment{segment}, result) // Update a metadata key - metadata.Set("test_str", &model.SegmentMetadataValueStringType{Value: "str3"}) + segment.Metadata.Set("test_str", &model.SegmentMetadataValueStringType{Value: "str3"}) c.UpdateSegment(ctx, &model.UpdateSegment{ ID: segment.ID, - Metadata: metadata}) + Metadata: segment.Metadata}) result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID()) assert.NoError(t, err) assert.Equal(t, []*model.Segment{segment}, result) // Delete a metadata key - metadata.Remove("test_str") + segment.Metadata.Remove("test_str") + newMetadata := model.NewSegmentMetadata[model.SegmentMetadataValueType]() + newMetadata.Set("test_str", nil) c.UpdateSegment(ctx, &model.UpdateSegment{ ID: segment.ID, - Metadata: metadata}) + Metadata: newMetadata}) result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID()) assert.NoError(t, err) assert.Equal(t, []*model.Segment{segment}, result) diff --git a/go/coordinator/internal/coordinator/coordinator.go b/go/coordinator/internal/coordinator/coordinator.go index da657eb97d0d..40576b90d03b 100644 --- a/go/coordinator/internal/coordinator/coordinator.go +++ b/go/coordinator/internal/coordinator/coordinator.go @@ -4,6 +4,8 @@ import ( "context" "github.com/chroma/chroma-coordinator/internal/metastore/coordinator" + "github.com/chroma/chroma-coordinator/internal/metastore/db/dao" + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbcore" "github.com/chroma/chroma-coordinator/internal/types" "gorm.io/gorm" ) @@ -27,7 +29,8 @@ func NewCoordinator(ctx context.Context, assignmentPolicy CollectionAssignmentPo collectionAssignmentPolicy: assignmentPolicy, } - catalog := coordinator.NewMemoryCatalog() + // catalog := coordinator.NewMemoryCatalog() + catalog := coordinator.NewTableCatalog(dbcore.NewTxImpl(), dao.NewMetaDomain()) meta, err := NewMetaTable(s.ctx, catalog) if err != nil { return nil, err diff --git a/go/coordinator/internal/metastore/coordinator/model_db_convert.go b/go/coordinator/internal/metastore/coordinator/model_db_convert.go new file mode 100644 index 000000000000..b2cb610ed01f --- /dev/null +++ b/go/coordinator/internal/metastore/coordinator/model_db_convert.go @@ -0,0 +1,165 @@ +package coordinator + +import ( + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + "github.com/chroma/chroma-coordinator/internal/model" + "github.com/chroma/chroma-coordinator/internal/types" + "github.com/pingcap/log" + "go.uber.org/zap" +) + +func convertCollectionToModel(collectionAndMetadataList []*dbmodel.CollectionAndMetadata) []*model.Collection { + if collectionAndMetadataList == nil { + return nil + } + collections := make([]*model.Collection, 0, len(collectionAndMetadataList)) + for _, collectionAndMetadata := range collectionAndMetadataList { + collection := &model.Collection{ + ID: types.MustParse(collectionAndMetadata.Collection.ID), + Name: *collectionAndMetadata.Collection.Name, + Topic: *collectionAndMetadata.Collection.Topic, + Dimension: collectionAndMetadata.Collection.Dimension, + Ts: collectionAndMetadata.Collection.Ts, + } + collection.Metadata = convertCollectionMetadataToModel(collectionAndMetadata.CollectionMetadata) + collections = append(collections, collection) + } + log.Debug("collection to model", zap.Any("collections", collections)) + return collections +} + +func convertCollectionMetadataToModel(collectionMetadataList []*dbmodel.CollectionMetadata) *model.CollectionMetadata[model.CollectionMetadataValueType] { + metadata := model.NewCollectionMetadata[model.CollectionMetadataValueType]() + if collectionMetadataList == nil { + log.Debug("collection metadata to model", zap.Any("collectionMetadata", nil)) + return nil + } else { + for _, collectionMetadata := range collectionMetadataList { + if collectionMetadata.Key != nil { + switch { + case collectionMetadata.StrValue != nil: + metadata.Add(*collectionMetadata.Key, &model.CollectionMetadataValueStringType{Value: *collectionMetadata.StrValue}) + case collectionMetadata.IntValue != nil: + metadata.Add(*collectionMetadata.Key, &model.CollectionMetadataValueInt64Type{Value: *collectionMetadata.IntValue}) + case collectionMetadata.FloatValue != nil: + metadata.Add(*collectionMetadata.Key, &model.CollectionMetadataValueFloat64Type{Value: *collectionMetadata.FloatValue}) + default: + } + } + } + if metadata.Empty() { + metadata = nil + } + log.Debug("collection metadata to model", zap.Any("collectionMetadata", metadata)) + return metadata + } + +} + +func convertCollectionMetadataToDB(collectionID string, metadata *model.CollectionMetadata[model.CollectionMetadataValueType]) []*dbmodel.CollectionMetadata { + if metadata == nil { + log.Debug("collection metadata to db", zap.Any("collectionMetadata", nil)) + return nil + } + dbCollectionMetadataList := make([]*dbmodel.CollectionMetadata, 0, len(metadata.Metadata)) + for key, value := range metadata.Metadata { + keyCopy := key + dbCollectionMetadata := &dbmodel.CollectionMetadata{ + CollectionID: collectionID, + Key: &keyCopy, + } + switch v := (value).(type) { + case *model.CollectionMetadataValueStringType: + dbCollectionMetadata.StrValue = &v.Value + case *model.CollectionMetadataValueInt64Type: + dbCollectionMetadata.IntValue = &v.Value + case *model.CollectionMetadataValueFloat64Type: + dbCollectionMetadata.FloatValue = &v.Value + default: + log.Error("unknown collection metadata type", zap.Any("value", v)) + } + dbCollectionMetadataList = append(dbCollectionMetadataList, dbCollectionMetadata) + } + log.Debug("collection metadata to db", zap.Any("collectionMetadata", dbCollectionMetadataList)) + return dbCollectionMetadataList +} + +func convertSegmentToModel(segmentAndMetadataList []*dbmodel.SegmentAndMetadata) []*model.Segment { + if segmentAndMetadataList == nil { + return nil + } + segments := make([]*model.Segment, 0, len(segmentAndMetadataList)) + for _, segmentAndMetadata := range segmentAndMetadataList { + segment := &model.Segment{ + ID: types.MustParse(segmentAndMetadata.Segment.ID), + Type: segmentAndMetadata.Segment.Type, + Scope: segmentAndMetadata.Segment.Scope, + Topic: segmentAndMetadata.Segment.Topic, + Ts: segmentAndMetadata.Segment.Ts, + } + if segmentAndMetadata.Segment.CollectionID != nil { + segment.CollectionID = types.MustParse(*segmentAndMetadata.Segment.CollectionID) + } else { + segment.CollectionID = types.NilUniqueID() + } + + segment.Metadata = convertSegmentMetadataToModel(segmentAndMetadata.SegmentMetadata) + segments = append(segments, segment) + } + log.Debug("segment to model", zap.Any("segments", segments)) + return segments +} + +func convertSegmentMetadataToModel(segmentMetadataList []*dbmodel.SegmentMetadata) *model.SegmentMetadata[model.SegmentMetadataValueType] { + if segmentMetadataList == nil { + return nil + } else { + metadata := model.NewSegmentMetadata[model.SegmentMetadataValueType]() + for _, segmentMetadata := range segmentMetadataList { + if segmentMetadata.Key != nil { + switch { + case segmentMetadata.StrValue != nil: + metadata.Set(*segmentMetadata.Key, &model.SegmentMetadataValueStringType{Value: *segmentMetadata.StrValue}) + case segmentMetadata.IntValue != nil: + metadata.Set(*segmentMetadata.Key, &model.SegmentMetadataValueInt64Type{Value: *segmentMetadata.IntValue}) + case segmentMetadata.FloatValue != nil: + metadata.Set(*segmentMetadata.Key, &model.SegmentMetadataValueFloat64Type{Value: *segmentMetadata.FloatValue}) + default: + } + } + } + if metadata.Empty() { + metadata = nil + } + log.Debug("segment metadata to model", zap.Any("segmentMetadata", nil)) + return metadata + } +} + +func convertSegmentMetadataToDB(segmentID string, metadata *model.SegmentMetadata[model.SegmentMetadataValueType]) []*dbmodel.SegmentMetadata { + if metadata == nil { + log.Debug("segment metadata db", zap.Any("segmentMetadata", nil)) + return nil + } + dbSegmentMetadataList := make([]*dbmodel.SegmentMetadata, 0, len(metadata.Metadata)) + for key, value := range metadata.Metadata { + keyCopy := key + dbSegmentMetadata := &dbmodel.SegmentMetadata{ + SegmentID: segmentID, + Key: &keyCopy, + } + switch v := (value).(type) { + case *model.SegmentMetadataValueStringType: + dbSegmentMetadata.StrValue = &v.Value + case *model.SegmentMetadataValueInt64Type: + dbSegmentMetadata.IntValue = &v.Value + case *model.SegmentMetadataValueFloat64Type: + dbSegmentMetadata.FloatValue = &v.Value + default: + log.Error("unknown segment metadata type", zap.Any("value", v)) + } + dbSegmentMetadataList = append(dbSegmentMetadataList, dbSegmentMetadata) + } + log.Debug("segment metadata db", zap.Any("segmentMetadata", dbSegmentMetadataList)) + return dbSegmentMetadataList +} diff --git a/go/coordinator/internal/metastore/coordinator/model_db_convert_test.go b/go/coordinator/internal/metastore/coordinator/model_db_convert_test.go new file mode 100644 index 000000000000..66240d2c53a7 --- /dev/null +++ b/go/coordinator/internal/metastore/coordinator/model_db_convert_test.go @@ -0,0 +1,121 @@ +package coordinator + +import ( + "testing" + + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + "github.com/chroma/chroma-coordinator/internal/model" + "github.com/chroma/chroma-coordinator/internal/types" + "github.com/stretchr/testify/assert" +) + +func TestConvertCollectionMetadataToModel(t *testing.T) { + // Test case 1: collectionMetadataList is nil + modelCollectionMetadata := convertCollectionMetadataToModel(nil) + assert.Nil(t, modelCollectionMetadata) + + // Test case 2: collectionMetadataList is empty + collectionMetadataList := []*dbmodel.CollectionMetadata{} + modelCollectionMetadata = convertCollectionMetadataToModel(collectionMetadataList) + assert.Nil(t, modelCollectionMetadata) +} + +func TestConvertCollectionMetadataToDB(t *testing.T) { + // Test case 1: metadata is nil + dbCollectionMetadataList := convertCollectionMetadataToDB("collectionID", nil) + assert.Nil(t, dbCollectionMetadataList) + + // Test case 2: metadata is not nil but empty + metadata := &model.CollectionMetadata[model.CollectionMetadataValueType]{ + Metadata: map[string]model.CollectionMetadataValueType{}, + } + dbCollectionMetadataList = convertCollectionMetadataToDB("collectionID", metadata) + assert.NotNil(t, dbCollectionMetadataList) + assert.Len(t, dbCollectionMetadataList, 0) + + // Test case 3: metadata is not nil and contains values + metadata = &model.CollectionMetadata[model.CollectionMetadataValueType]{ + Metadata: map[string]model.CollectionMetadataValueType{ + "key1": &model.CollectionMetadataValueStringType{Value: "value1"}, + "key2": &model.CollectionMetadataValueInt64Type{Value: 123}, + "key3": &model.CollectionMetadataValueFloat64Type{Value: 3.14}, + }, + } + dbCollectionMetadataList = convertCollectionMetadataToDB("collectionID", metadata) + assert.NotNil(t, dbCollectionMetadataList) + assert.Len(t, dbCollectionMetadataList, 3) + assert.Equal(t, "collectionID", dbCollectionMetadataList[0].CollectionID) + assert.Equal(t, "key1", *dbCollectionMetadataList[0].Key) + assert.Equal(t, "value1", *dbCollectionMetadataList[0].StrValue) + assert.Nil(t, dbCollectionMetadataList[0].IntValue) + assert.Nil(t, dbCollectionMetadataList[0].FloatValue) + assert.Equal(t, "collectionID", dbCollectionMetadataList[1].CollectionID) + assert.Equal(t, "key2", *dbCollectionMetadataList[1].Key) + assert.Nil(t, dbCollectionMetadataList[1].StrValue) + assert.Equal(t, int64(123), *dbCollectionMetadataList[1].IntValue) + assert.Nil(t, dbCollectionMetadataList[1].FloatValue) + assert.Equal(t, "collectionID", dbCollectionMetadataList[2].CollectionID) + assert.Equal(t, "key3", *dbCollectionMetadataList[2].Key) + assert.Nil(t, dbCollectionMetadataList[2].StrValue) + assert.Nil(t, dbCollectionMetadataList[2].IntValue) + assert.Equal(t, 3.14, *dbCollectionMetadataList[2].FloatValue) +} +func TestConvertSegmentToModel(t *testing.T) { + // Test case 1: segmentAndMetadataList is nil + modelSegments := convertSegmentToModel(nil) + assert.Nil(t, modelSegments) + + // Test case 2: segmentAndMetadataList is empty + segmentAndMetadataList := []*dbmodel.SegmentAndMetadata{} + modelSegments = convertSegmentToModel(segmentAndMetadataList) + assert.Empty(t, modelSegments) + + // Test case 3: segmentAndMetadataList contains one segment with all fields set + segmentID := types.MustParse("515fc331-e117-4b86-bd84-85341128c337") + segmentTopic := "segment_topic" + collectionID := "d9a75e2e-2929-45c4-af06-75b15630edd0" + segmentAndMetadata := &dbmodel.SegmentAndMetadata{ + Segment: &dbmodel.Segment{ + ID: segmentID.String(), + Type: "segment_type", + Scope: "segment_scope", + Topic: &segmentTopic, + CollectionID: &collectionID, + }, + SegmentMetadata: []*dbmodel.SegmentMetadata{}, + } + segmentAndMetadataList = []*dbmodel.SegmentAndMetadata{segmentAndMetadata} + modelSegments = convertSegmentToModel(segmentAndMetadataList) + assert.Len(t, modelSegments, 1) + assert.Equal(t, segmentID, modelSegments[0].ID) + assert.Equal(t, "segment_type", modelSegments[0].Type) + assert.Equal(t, "segment_scope", modelSegments[0].Scope) + assert.Equal(t, "segment_topic", *modelSegments[0].Topic) + assert.Equal(t, types.MustParse(collectionID), modelSegments[0].CollectionID) + assert.Nil(t, modelSegments[0].Metadata) +} + +func TestConvertSegmentMetadataToModel(t *testing.T) { + // Test case 1: segmentMetadataList is nil + modelSegmentMetadata := convertSegmentMetadataToModel(nil) + assert.Nil(t, modelSegmentMetadata) + + // Test case 2: segmentMetadataList is empty + segmentMetadataList := []*dbmodel.SegmentMetadata{} + modelSegmentMetadata = convertSegmentMetadataToModel(segmentMetadataList) + assert.Empty(t, modelSegmentMetadata) + + // Test case 3: segmentMetadataList contains one segment metadata with all fields set + segmentID := types.MustParse("515fc331-e117-4b86-bd84-85341128c337") + strKey := "strKey" + strValue := "strValue" + segmentMetadata := &dbmodel.SegmentMetadata{ + SegmentID: segmentID.String(), + Key: &strKey, + StrValue: &strValue, + } + segmentMetadataList = []*dbmodel.SegmentMetadata{segmentMetadata} + modelSegmentMetadata = convertSegmentMetadataToModel(segmentMetadataList) + assert.Len(t, modelSegmentMetadata.Keys(), 1) + assert.Equal(t, &model.SegmentMetadataValueStringType{Value: strValue}, modelSegmentMetadata.Get(strKey)) +} diff --git a/go/coordinator/internal/metastore/coordinator/table_catalog.go b/go/coordinator/internal/metastore/coordinator/table_catalog.go new file mode 100644 index 000000000000..ce0912619ec1 --- /dev/null +++ b/go/coordinator/internal/metastore/coordinator/table_catalog.go @@ -0,0 +1,342 @@ +package coordinator + +import ( + "context" + + "github.com/chroma/chroma-coordinator/internal/common" + "github.com/chroma/chroma-coordinator/internal/metastore" + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + "github.com/chroma/chroma-coordinator/internal/model" + "github.com/chroma/chroma-coordinator/internal/types" + "github.com/pingcap/log" + "go.uber.org/zap" +) + +type Catalog struct { + metaDomain dbmodel.IMetaDomain + txImpl dbmodel.ITransaction +} + +func NewTableCatalog(txImpl dbmodel.ITransaction, metaDomain dbmodel.IMetaDomain) *Catalog { + return &Catalog{ + txImpl: txImpl, + metaDomain: metaDomain, + } +} + +var _ metastore.Catalog = (*Catalog)(nil) + +func (tc *Catalog) ResetState(ctx context.Context) error { + return tc.txImpl.Transaction(ctx, func(txCtx context.Context) error { + err := tc.metaDomain.CollectionDb(txCtx).DeleteAll() + if err != nil { + log.Error("error reset collection db", zap.Error(err)) + return err + } + err = tc.metaDomain.CollectionMetadataDb(txCtx).DeleteAll() + if err != nil { + log.Error("error reest collection metadata db", zap.Error(err)) + return err + } + err = tc.metaDomain.SegmentDb(txCtx).DeleteAll() + if err != nil { + log.Error("error reset segment db", zap.Error(err)) + return err + } + err = tc.metaDomain.SegmentMetadataDb(txCtx).DeleteAll() + if err != nil { + log.Error("error reset segment metadata db", zap.Error(err)) + return err + } + return nil + }) +} + +func (tc *Catalog) CreateCollection(ctx context.Context, createCollection *model.CreateCollection, ts types.Timestamp) (*model.Collection, error) { + var ressult *model.Collection + + err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error { + // insert collection + dbCollection := &dbmodel.Collection{ + ID: createCollection.ID.String(), + Name: &createCollection.Name, + Topic: &createCollection.Topic, + Dimension: createCollection.Dimension, + Ts: ts, + } + err := tc.metaDomain.CollectionDb(txCtx).Insert(dbCollection) + if err != nil { + return err + } + // insert collection metadata + metadata := createCollection.Metadata + dbCollectionMetadataList := convertCollectionMetadataToDB(createCollection.ID.String(), metadata) + if len(dbCollectionMetadataList) != 0 { + err = tc.metaDomain.CollectionMetadataDb(txCtx).Insert(dbCollectionMetadataList) + if err != nil { + return err + } + } + // get collection + collectionList, err := tc.metaDomain.CollectionDb(txCtx).GetCollections(types.FromUniqueID(createCollection.ID), nil, nil) + if err != nil { + return err + } + ressult = convertCollectionToModel(collectionList)[0] + return nil + }) + if err != nil { + return nil, err + } + return ressult, nil +} + +func (tc *Catalog) GetCollections(ctx context.Context, collectionID types.UniqueID, collectionName *string, collectionTopic *string) ([]*model.Collection, error) { + collectionAndMetadataList, err := tc.metaDomain.CollectionDb(ctx).GetCollections(types.FromUniqueID(collectionID), collectionName, collectionTopic) + if err != nil { + return nil, err + } + collections := convertCollectionToModel(collectionAndMetadataList) + return collections, nil +} + +func (tc *Catalog) DeleteCollection(ctx context.Context, collectionID types.UniqueID) error { + return tc.txImpl.Transaction(ctx, func(txCtx context.Context) error { + err := tc.metaDomain.CollectionDb(txCtx).DeleteCollectionByID(collectionID.String()) + if err != nil { + return err + } + err = tc.metaDomain.CollectionMetadataDb(txCtx).DeleteByCollectionID(collectionID.String()) + if err != nil { + return err + } + return nil + }) +} + +func (tc *Catalog) UpdateCollection(ctx context.Context, updateCollection *model.UpdateCollection, ts types.Timestamp) (*model.Collection, error) { + var result *model.Collection + + err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error { + dbCollection := &dbmodel.Collection{ + ID: updateCollection.ID.String(), + Name: updateCollection.Name, + Topic: updateCollection.Topic, + Dimension: updateCollection.Dimension, + Ts: ts, + } + err := tc.metaDomain.CollectionDb(txCtx).Update(dbCollection) + if err != nil { + return err + } + + // Case 1: if ResetMetadata is true, then delete all metadata for the collection + // Case 2: if ResetMetadata is true and metadata is not nil -> THIS SHOULD NEVER HAPPEN + // Case 3: if ResetMetadata is false, and the metadata is not nil - set the metadata to the value in metadata + // Case 4: if ResetMetadata is false and metadata is nil, then leave the metadata as is + metadata := updateCollection.Metadata + resetMetadata := updateCollection.ResetMetadata + if resetMetadata { + if metadata != nil { // Case 2 + return common.ErrInvalidMetadataUpdate + } else { // Case 1 + err = tc.metaDomain.CollectionMetadataDb(txCtx).DeleteByCollectionID(updateCollection.ID.String()) + if err != nil { + return err + } + } + } else { + if metadata != nil { // Case 3 + err = tc.metaDomain.CollectionMetadataDb(txCtx).DeleteByCollectionID(updateCollection.ID.String()) + if err != nil { + return err + } + dbCollectionMetadataList := convertCollectionMetadataToDB(updateCollection.ID.String(), metadata) + if len(dbCollectionMetadataList) != 0 { + err = tc.metaDomain.CollectionMetadataDb(txCtx).Insert(dbCollectionMetadataList) + if err != nil { + return err + } + } + } + } + collectionList, err := tc.metaDomain.CollectionDb(txCtx).GetCollections(types.FromUniqueID(updateCollection.ID), nil, nil) + if err != nil { + return err + } + result = convertCollectionToModel(collectionList)[0] + return nil + }) + if err != nil { + return nil, err + } + log.Info("collection updated", zap.Any("collection", result)) + return result, nil +} + +func (tc *Catalog) CreateSegment(ctx context.Context, createSegment *model.CreateSegment, ts types.Timestamp) (*model.Segment, error) { + var result *model.Segment + + err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error { + // insert segment + collectionString := createSegment.CollectionID.String() + dbSegment := &dbmodel.Segment{ + ID: createSegment.ID.String(), + CollectionID: &collectionString, + Type: createSegment.Type, + Scope: createSegment.Scope, + Ts: ts, + } + if createSegment.Topic != nil { + dbSegment.Topic = createSegment.Topic + } + err := tc.metaDomain.SegmentDb(txCtx).Insert(dbSegment) + if err != nil { + log.Error("error inserting segment", zap.Error(err)) + return err + } + // insert segment metadata + metadata := createSegment.Metadata + if metadata != nil { + dbSegmentMetadataList := convertSegmentMetadataToDB(createSegment.ID.String(), metadata) + if len(dbSegmentMetadataList) != 0 { + err = tc.metaDomain.SegmentMetadataDb(txCtx).Insert(dbSegmentMetadataList) + if err != nil { + log.Error("error inserting segment metadata", zap.Error(err)) + return err + } + } + } + // get segment + segmentList, err := tc.metaDomain.SegmentDb(txCtx).GetSegments(createSegment.ID, nil, nil, nil, types.NilUniqueID()) + if err != nil { + log.Error("error getting segment", zap.Error(err)) + return err + } + result = convertSegmentToModel(segmentList)[0] + return nil + }) + if err != nil { + log.Error("error creating segment", zap.Error(err)) + return nil, err + } + log.Info("segment created", zap.Any("segment", result)) + return result, nil +} + +func (tc *Catalog) GetSegments(ctx context.Context, segmentID types.UniqueID, segmentType *string, scope *string, topic *string, collectionID types.UniqueID, ts types.Timestamp) ([]*model.Segment, error) { + segmentAndMetadataList, err := tc.metaDomain.SegmentDb(ctx).GetSegments(segmentID, segmentType, scope, topic, collectionID) + if err != nil { + return nil, err + } + segments := make([]*model.Segment, 0, len(segmentAndMetadataList)) + for _, segmentAndMetadata := range segmentAndMetadataList { + segment := &model.Segment{ + ID: types.MustParse(segmentAndMetadata.Segment.ID), + Type: segmentAndMetadata.Segment.Type, + Scope: segmentAndMetadata.Segment.Scope, + Topic: segmentAndMetadata.Segment.Topic, + Ts: segmentAndMetadata.Segment.Ts, + } + + if segmentAndMetadata.Segment.CollectionID != nil { + segment.CollectionID = types.MustParse(*segmentAndMetadata.Segment.CollectionID) + } else { + segment.CollectionID = types.NilUniqueID() + } + segment.Metadata = convertSegmentMetadataToModel(segmentAndMetadata.SegmentMetadata) + segments = append(segments, segment) + } + return segments, nil +} + +func (tc *Catalog) DeleteSegment(ctx context.Context, segmentID types.UniqueID) error { + return tc.txImpl.Transaction(ctx, func(txCtx context.Context) error { + err := tc.metaDomain.SegmentDb(txCtx).DeleteSegmentByID(segmentID.String()) + if err != nil { + log.Error("error deleting segment", zap.Error(err)) + return err + } + err = tc.metaDomain.SegmentMetadataDb(txCtx).DeleteBySegmentID(segmentID.String()) + if err != nil { + log.Error("error deleting segment metadata", zap.Error(err)) + return err + } + return nil + }) +} + +func (tc *Catalog) UpdateSegment(ctx context.Context, updateSegment *model.UpdateSegment, ts types.Timestamp) (*model.Segment, error) { + var result *model.Segment + + err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error { + // update segment + dbSegment := &dbmodel.UpdateSegment{ + ID: updateSegment.ID.String(), + Topic: updateSegment.Topic, + ResetTopic: updateSegment.ResetTopic, + Collection: updateSegment.Collection, + ResetCollection: updateSegment.ResetCollection, + } + + err := tc.metaDomain.SegmentDb(txCtx).Update(dbSegment) + if err != nil { + return err + } + + // Case 1: if ResetMetadata is true, then delete all metadata for the collection + // Case 2: if ResetMetadata is true and metadata is not nil -> THIS SHOULD NEVER HAPPEN + // Case 3: if ResetMetadata is false, and the metadata is not nil - set the metadata to the value in metadata + // Case 4: if ResetMetadata is false and metadata is nil, then leave the metadata as is + metadata := updateSegment.Metadata + resetMetadata := updateSegment.ResetMetadata + if resetMetadata { + if metadata != nil { // Case 2 + return common.ErrInvalidMetadataUpdate + } else { // Case 1 + err := tc.metaDomain.SegmentMetadataDb(txCtx).DeleteBySegmentID(updateSegment.ID.String()) + if err != nil { + return err + } + } + } else { + if metadata != nil { // Case 3 + err := tc.metaDomain.SegmentMetadataDb(txCtx).DeleteBySegmentIDAndKeys(updateSegment.ID.String(), metadata.Keys()) + if err != nil { + log.Error("error deleting segment metadata", zap.Error(err)) + return err + } + newMetadata := model.NewSegmentMetadata[model.SegmentMetadataValueType]() + for _, key := range metadata.Keys() { + if metadata.Get(key) == nil { + metadata.Remove(key) + } else { + newMetadata.Set(key, metadata.Get(key)) + } + } + dbSegmentMetadataList := convertSegmentMetadataToDB(updateSegment.ID.String(), newMetadata) + if len(dbSegmentMetadataList) != 0 { + err = tc.metaDomain.SegmentMetadataDb(txCtx).Insert(dbSegmentMetadataList) + if err != nil { + return err + } + } + } + } + + // get segment + segmentList, err := tc.metaDomain.SegmentDb(txCtx).GetSegments(updateSegment.ID, nil, nil, nil, types.NilUniqueID()) + if err != nil { + log.Error("error getting segment", zap.Error(err)) + return err + } + result = convertSegmentToModel(segmentList)[0] + return nil + }) + if err != nil { + log.Error("error updating segment", zap.Error(err)) + return nil, err + } + log.Debug("segment updated", zap.Any("segment", result)) + return result, nil +} diff --git a/go/coordinator/internal/metastore/coordinator/table_catalog_test.go b/go/coordinator/internal/metastore/coordinator/table_catalog_test.go new file mode 100644 index 000000000000..bcbfa9ebfb3f --- /dev/null +++ b/go/coordinator/internal/metastore/coordinator/table_catalog_test.go @@ -0,0 +1,135 @@ +package coordinator + +import ( + "context" + "testing" + + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel/mocks" + "github.com/chroma/chroma-coordinator/internal/model" + "github.com/chroma/chroma-coordinator/internal/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestCatalog_CreateCollection(t *testing.T) { + // create a mock transaction implementation + mockTxImpl := &mocks.ITransaction{} + + // create a mock meta domain implementation + mockMetaDomain := &mocks.IMetaDomain{} + + // create a new catalog instance + catalog := NewTableCatalog(mockTxImpl, mockMetaDomain) + + // create a mock collection + metadata := model.NewCollectionMetadata[model.CollectionMetadataValueType]() + metadata.Add("test_key", &model.CollectionMetadataValueStringType{Value: "test_value"}) + collection := &model.CreateCollection{ + ID: types.MustParse("00000000-0000-0000-0000-000000000001"), + Name: "test_collection", + Metadata: metadata, + } + + // create a mock timestamp + ts := types.Timestamp(1234567890) + + // mock the insert collection method + name := "test_collection" + mockTxImpl.On("Transaction", context.Background(), mock.Anything).Return(nil) + mockMetaDomain.On("CollectionDb", context.Background()).Return(&mocks.ICollectionDb{}) + mockMetaDomain.CollectionDb(context.Background()).(*mocks.ICollectionDb).On("Insert", &dbmodel.Collection{ + ID: "00000000-0000-0000-0000-000000000001", + Name: &name, + // Topic: "test_topic", + Ts: ts, + }).Return(nil) + + // mock the insert collection metadata method + testKey := "test_key" + testValue := "test_value" + mockMetaDomain.On("CollectionMetadataDb", context.Background()).Return(&mocks.ICollectionMetadataDb{}) + mockMetaDomain.CollectionMetadataDb(context.Background()).(*mocks.ICollectionMetadataDb).On("Insert", []*dbmodel.CollectionMetadata{ + { + CollectionID: "00000000-0000-0000-0000-000000000001", + Key: &testKey, + StrValue: &testValue, + Ts: ts, + }, + }).Return(nil) + + // call the CreateCollection method + _, err := catalog.CreateCollection(context.Background(), collection, ts) + + // assert that the method returned no error + assert.NoError(t, err) + + // assert that the mock methods were called as expected + mockMetaDomain.AssertExpectations(t) +} + +func TestCatalog_GetCollections(t *testing.T) { + // create a mock meta domain implementation + mockMetaDomain := &mocks.IMetaDomain{} + + // create a new catalog instance + catalog := NewTableCatalog(nil, mockMetaDomain) + + // create a mock collection ID + collectionID := types.MustParse("00000000-0000-0000-0000-000000000001") + + // create a mock collection name + collectionName := "test_collection" + + // create a mock collection topic + collectionTopic := "test_topic" + + // create a mock collection and metadata list + name := "test_collection" + testKey := "test_key" + testValue := "test_value" + collectionAndMetadataList := []*dbmodel.CollectionAndMetadata{ + { + Collection: &dbmodel.Collection{ + ID: "00000000-0000-0000-0000-000000000001", + Name: &name, + //Topic: "test_topic", + Ts: types.Timestamp(1234567890), + }, + CollectionMetadata: []*dbmodel.CollectionMetadata{ + { + CollectionID: "00000000-0000-0000-0000-000000000001", + Key: &testKey, + StrValue: &testValue, + Ts: types.Timestamp(1234567890), + }, + }, + }, + } + + // mock the get collections method + mockMetaDomain.On("CollectionDb", context.Background()).Return(&mocks.ICollectionDb{}) + mockMetaDomain.CollectionDb(context.Background()).(*mocks.ICollectionDb).On("GetCollections", types.FromUniqueID(collectionID), &collectionName, &collectionTopic).Return(collectionAndMetadataList, nil) + + // call the GetCollections method + collections, err := catalog.GetCollections(context.Background(), collectionID, &collectionName, &collectionTopic) + + // assert that the method returned no error + assert.NoError(t, err) + + // assert that the collections were returned as expected + metadata := model.NewCollectionMetadata[model.CollectionMetadataValueType]() + metadata.Add("test_key", &model.CollectionMetadataValueStringType{Value: "test_value"}) + assert.Equal(t, []*model.CreateCollection{ + { + ID: types.MustParse("00000000-0000-0000-0000-000000000001"), + Name: "test_collection", + //Topic: "test_topic", + Ts: types.Timestamp(1234567890), + Metadata: metadata, + }, + }, collections) + + // assert that the mock methods were called as expected + mockMetaDomain.AssertExpectations(t) +} diff --git a/go/coordinator/internal/metastore/db/dao/collection.go b/go/coordinator/internal/metastore/db/dao/collection.go new file mode 100644 index 000000000000..65e3ee36ebfa --- /dev/null +++ b/go/coordinator/internal/metastore/db/dao/collection.go @@ -0,0 +1,163 @@ +package dao + +import ( + "database/sql" + + "go.uber.org/zap" + "gorm.io/gorm" + + "github.com/chroma/chroma-coordinator/internal/common" + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + "github.com/go-sql-driver/mysql" + "github.com/pingcap/log" +) + +type collectionDb struct { + db *gorm.DB +} + +var _ dbmodel.ICollectionDb = &collectionDb{} + +func (s *collectionDb) DeleteAll() error { + return s.db.Where("1 = 1").Delete(&dbmodel.Collection{}).Error +} + +func (s *collectionDb) GetCollections(id *string, name *string, topic *string) ([]*dbmodel.CollectionAndMetadata, error) { + var collections []*dbmodel.CollectionAndMetadata + + query := s.db.Table("collections"). + Select("collections.id, collections.name, collections.topic, collections.dimension, collection_metadata.key, collection_metadata.str_value, collection_metadata.int_value, collection_metadata.float_value"). + Joins("LEFT JOIN collection_metadata ON collections.id = collection_metadata.collection_id"). + Order("collections.id") + + if id != nil { + query = query.Where("collections.id = ?", *id) + } + if topic != nil { + query = query.Where("collections.topic = ?", *topic) + } + if name != nil { + query = query.Where("collections.name = ?", *name) + } + + rows, err := query.Rows() + if err != nil { + return nil, err + } + defer rows.Close() + + var currentCollectionID string = "" + var metadata []*dbmodel.CollectionMetadata + var currentCollection *dbmodel.CollectionAndMetadata + + for rows.Next() { + var ( + collectionID string + collectionName string + collectionTopic string + collectionDimension sql.NullInt32 + key sql.NullString + strValue sql.NullString + intValue sql.NullInt64 + floatValue sql.NullFloat64 + ) + + err := rows.Scan(&collectionID, &collectionName, &collectionTopic, &collectionDimension, &key, &strValue, &intValue, &floatValue) + if err != nil { + log.Error("scan collection failed", zap.Error(err)) + return nil, err + } + if collectionID != currentCollectionID { + currentCollectionID = collectionID + metadata = nil + + currentCollection = &dbmodel.CollectionAndMetadata{ + Collection: &dbmodel.Collection{ + ID: collectionID, + Name: &collectionName, + Topic: &collectionTopic, + }, + CollectionMetadata: metadata, + } + if collectionDimension.Valid { + currentCollection.Collection.Dimension = &collectionDimension.Int32 + } else { + currentCollection.Collection.Dimension = nil + } + + if currentCollectionID != "" { + collections = append(collections, currentCollection) + } + } + + collectionMetadata := &dbmodel.CollectionMetadata{ + CollectionID: collectionID, + } + + if key.Valid { + collectionMetadata.Key = &key.String + } else { + collectionMetadata.Key = nil + } + + if strValue.Valid { + collectionMetadata.StrValue = &strValue.String + } else { + collectionMetadata.StrValue = nil + } + if intValue.Valid { + collectionMetadata.IntValue = &intValue.Int64 + } else { + collectionMetadata.IntValue = nil + } + if floatValue.Valid { + collectionMetadata.FloatValue = &floatValue.Float64 + } else { + collectionMetadata.FloatValue = nil + } + + metadata = append(metadata, collectionMetadata) + currentCollection.CollectionMetadata = metadata + } + log.Info("collections", zap.Any("collections", collections)) + return collections, nil +} + +func (s *collectionDb) DeleteCollectionByID(collectionID string) error { + return s.db.Where("id = ?", collectionID).Delete(&dbmodel.Collection{}).Error +} + +func (s *collectionDb) Insert(in *dbmodel.Collection) error { + err := s.db.Create(&in).Error + + if err != nil { + // TODO: This only works for MySQL, figure out a way for Postgres. + log.Error("insert collection failed", zap.String("collectionID", in.ID), zap.Int64("ts", in.Ts), zap.Error(err)) + mysqlErr := err.(*mysql.MySQLError) + switch mysqlErr.Number { + case 1062: + return common.ErrCollectionUniqueConstraintViolation + } + return err + } + return nil +} + +func generateCollectionUpdatesWithoutID(in *dbmodel.Collection) map[string]interface{} { + ret := map[string]interface{}{} + if in.Name != nil { + ret["name"] = *in.Name + } + if in.Topic != nil { + ret["topic"] = *in.Topic + } + if in.Dimension != nil { + ret["dimension"] = *in.Dimension + } + return ret +} + +func (s *collectionDb) Update(in *dbmodel.Collection) error { + updates := generateCollectionUpdatesWithoutID(in) + return s.db.Model(&dbmodel.Collection{}).Where("id = ?", in.ID).Updates(updates).Error +} diff --git a/go/coordinator/internal/metastore/db/dao/collection_metadata.go b/go/coordinator/internal/metastore/db/dao/collection_metadata.go new file mode 100644 index 000000000000..0f9ba00057ec --- /dev/null +++ b/go/coordinator/internal/metastore/db/dao/collection_metadata.go @@ -0,0 +1,26 @@ +package dao + +import ( + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type collectionMetadataDb struct { + db *gorm.DB +} + +func (s *collectionMetadataDb) DeleteAll() error { + return s.db.Where("1 = 1").Delete(&dbmodel.CollectionMetadata{}).Error +} + +func (s *collectionMetadataDb) DeleteByCollectionID(collectionID string) error { + return s.db.Where("collection_id = ?", collectionID).Delete(&dbmodel.CollectionMetadata{}).Error +} + +func (s *collectionMetadataDb) Insert(in []*dbmodel.CollectionMetadata) error { + return s.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "collection_id"}, {Name: "key"}}, + DoUpdates: clause.AssignmentColumns([]string{"str_value", "int_value", "float_value"}), + }).Create(in).Error +} diff --git a/go/coordinator/internal/metastore/db/dao/collection_test.go b/go/coordinator/internal/metastore/db/dao/collection_test.go new file mode 100644 index 000000000000..9a5a93f22eeb --- /dev/null +++ b/go/coordinator/internal/metastore/db/dao/collection_test.go @@ -0,0 +1,70 @@ +package dao + +import ( + "testing" + + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + "github.com/chroma/chroma-coordinator/internal/types" + "github.com/stretchr/testify/assert" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func TestCollectionDb_GetCollections(t *testing.T) { + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + assert.NoError(t, err) + + err = db.AutoMigrate(&dbmodel.Collection{}, &dbmodel.CollectionMetadata{}) + assert.NoError(t, err) + name := "test_name" + topic := "test_topic" + collection := &dbmodel.Collection{ + ID: types.NewUniqueID().String(), + Name: &name, + Topic: &topic, + } + err = db.Create(collection).Error + assert.NoError(t, err) + testKey := "test" + testValue := "test" + metadata := &dbmodel.CollectionMetadata{ + CollectionID: collection.ID, + Key: &testKey, + StrValue: &testValue, + } + err = db.Create(metadata).Error + assert.NoError(t, err) + + collectionDb := &collectionDb{ + db: db, + } + + // Test when all parameters are nil + collections, err := collectionDb.GetCollections(nil, nil, nil) + assert.NoError(t, err) + assert.Len(t, collections, 1) + assert.Equal(t, collection.ID, collections[0].Collection.ID) + assert.Equal(t, collection.Name, collections[0].Collection.Name) + assert.Equal(t, collection.Topic, collections[0].Collection.Topic) + assert.Len(t, collections[0].CollectionMetadata, 1) + assert.Equal(t, metadata.Key, collections[0].CollectionMetadata[0].Key) + assert.Equal(t, metadata.StrValue, collections[0].CollectionMetadata[0].StrValue) + + // Test when filtering by ID + collections, err = collectionDb.GetCollections(nil, nil, nil) + assert.NoError(t, err) + assert.Len(t, collections, 1) + assert.Equal(t, collection.ID, collections[0].Collection.ID) + + // Test when filtering by name + collections, err = collectionDb.GetCollections(nil, collection.Name, nil) + assert.NoError(t, err) + assert.Len(t, collections, 1) + assert.Equal(t, collection.ID, collections[0].Collection.ID) + + // Test when filtering by topic + collections, err = collectionDb.GetCollections(nil, nil, collection.Topic) + assert.NoError(t, err) + assert.Len(t, collections, 1) + assert.Equal(t, collection.ID, collections[0].Collection.ID) +} diff --git a/go/coordinator/internal/metastore/db/dao/common.go b/go/coordinator/internal/metastore/db/dao/common.go new file mode 100644 index 000000000000..92ade4bd3b63 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dao/common.go @@ -0,0 +1,30 @@ +package dao + +import ( + "context" + + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbcore" + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" +) + +type metaDomain struct{} + +func NewMetaDomain() *metaDomain { + return &metaDomain{} +} + +func (*metaDomain) CollectionDb(ctx context.Context) dbmodel.ICollectionDb { + return &collectionDb{dbcore.GetDB(ctx)} +} + +func (*metaDomain) CollectionMetadataDb(ctx context.Context) dbmodel.ICollectionMetadataDb { + return &collectionMetadataDb{dbcore.GetDB(ctx)} +} + +func (*metaDomain) SegmentDb(ctx context.Context) dbmodel.ISegmentDb { + return &segmentDb{dbcore.GetDB(ctx)} +} + +func (*metaDomain) SegmentMetadataDb(ctx context.Context) dbmodel.ISegmentMetadataDb { + return &segmentMetadataDb{dbcore.GetDB(ctx)} +} diff --git a/go/coordinator/internal/metastore/db/dao/segment.go b/go/coordinator/internal/metastore/db/dao/segment.go new file mode 100644 index 000000000000..c4c3842e2784 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dao/segment.go @@ -0,0 +1,184 @@ +package dao + +import ( + "database/sql" + + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + "github.com/chroma/chroma-coordinator/internal/types" + "github.com/pingcap/log" + "go.uber.org/zap" + "gorm.io/gorm" +) + +type segmentDb struct { + db *gorm.DB +} + +func (s *segmentDb) DeleteAll() error { + return s.db.Where("1=1").Delete(&dbmodel.Segment{}).Error +} + +func (s *segmentDb) DeleteSegmentByID(id string) error { + return s.db.Where("id = ?", id).Delete(&dbmodel.Segment{}).Error +} + +func (s *segmentDb) Insert(in *dbmodel.Segment) error { + err := s.db.Create(&in).Error + + if err != nil { + log.Error("insert segment failed", zap.String("segmentID", in.ID), zap.Int64("ts", in.Ts), zap.Error(err)) + return err + } + + return nil +} + +func (s *segmentDb) GetSegments(id types.UniqueID, segmentType *string, scope *string, topic *string, collectionID types.UniqueID) ([]*dbmodel.SegmentAndMetadata, error) { + var segments []*dbmodel.SegmentAndMetadata + + query := s.db.Table("segments"). + Select("segments.id, segments.collection_id, segments.type, segments.scope, segments.topic, segment_metadata.key, segment_metadata.str_value, segment_metadata.int_value, segment_metadata.float_value"). + Joins("LEFT JOIN segment_metadata ON segments.id = segment_metadata.segment_id"). + Order("segments.id") + + if id != types.NilUniqueID() { + query = query.Where("id = ?", id.String()) + } + if segmentType != nil { + query = query.Where("type = ?", segmentType) + } + if scope != nil { + query = query.Where("scope = ?", scope) + } + if topic != nil { + query = query.Where("topic = ?", topic) + } + if collectionID != types.NilUniqueID() { + query = query.Where("collection_id = ?", collectionID.String()) + } + + rows, err := query.Rows() + if err != nil { + log.Error("get segments failed", zap.String("segmentID", id.String()), zap.String("segmentType", *segmentType), zap.String("scope", *scope), zap.String("collectionTopic", *topic), zap.Error(err)) + return nil, err + } + defer rows.Close() + + var currentSegmentID string = "" + var metadata []*dbmodel.SegmentMetadata + var currentSegment *dbmodel.SegmentAndMetadata + + for rows.Next() { + var ( + segmentID string + collectionID sql.NullString + segmentType string + scope string + topic sql.NullString + key sql.NullString + strValue sql.NullString + intValue sql.NullInt64 + floatValue sql.NullFloat64 + ) + + err := rows.Scan(&segmentID, &collectionID, &segmentType, &scope, &topic, &key, &strValue, &intValue, &floatValue) + if err != nil { + log.Error("scan segment failed", zap.Error(err)) + } + if segmentID != currentSegmentID { + currentSegmentID = segmentID + metadata = nil + + currentSegment = &dbmodel.SegmentAndMetadata{ + Segment: &dbmodel.Segment{ + ID: segmentID, + Type: segmentType, + Scope: scope, + }, + SegmentMetadata: metadata, + } + if collectionID.Valid { + currentSegment.Segment.CollectionID = &collectionID.String + } else { + currentSegment.Segment.CollectionID = nil + } + + if topic.Valid { + currentSegment.Segment.Topic = &topic.String + } else { + currentSegment.Segment.Topic = nil + } + + if currentSegmentID != "" { + segments = append(segments, currentSegment) + } + + } + segmentMetadata := &dbmodel.SegmentMetadata{ + SegmentID: segmentID, + } + if key.Valid { + segmentMetadata.Key = &key.String + } else { + segmentMetadata.Key = nil + } + + if strValue.Valid { + segmentMetadata.StrValue = &strValue.String + } else { + segmentMetadata.StrValue = nil + } + + if intValue.Valid { + segmentMetadata.IntValue = &intValue.Int64 + } else { + segmentMetadata.IntValue = nil + } + + if floatValue.Valid { + segmentMetadata.FloatValue = &floatValue.Float64 + } else { + segmentMetadata.FloatValue = nil + } + + metadata = append(metadata, segmentMetadata) + currentSegment.SegmentMetadata = metadata + } + log.Info("get segments success", zap.Any("segments", segments)) + return segments, nil +} + +func generateSegmentUpdatesWithoutID(in *dbmodel.UpdateSegment) map[string]interface{} { + // Case 1: if ResetTopic is true and topic is nil, then set the topic to nil + // Case 2: if ResetTopic is true and topic is not nil -> THIS SHOULD NEVER HAPPEN + // Case 3: if ResetTopic is false and topic is not nil - set the topic to the value in topic + // Case 4: if ResetTopic is false and topic is nil, then leave the topic as is + log.Info("generate segment updates without id", zap.Any("in", in)) + ret := map[string]interface{}{} + if in.ResetTopic { + if in.Topic == nil { + ret["topic"] = nil + } + } else { + if in.Topic != nil { + ret["topic"] = *in.Topic + } + } + + if in.ResetCollection { + if in.Collection == nil { + ret["collection_id"] = nil + } + } else { + if in.Collection != nil { + ret["collection_id"] = *in.Collection + } + } + log.Info("generate segment updates without id", zap.Any("updates", ret)) + return ret +} + +func (s *segmentDb) Update(in *dbmodel.UpdateSegment) error { + updates := generateSegmentUpdatesWithoutID(in) + return s.db.Model(&dbmodel.Segment{}).Where("id = ?", in.ID).Updates(updates).Error +} diff --git a/go/coordinator/internal/metastore/db/dao/segment_metadata.go b/go/coordinator/internal/metastore/db/dao/segment_metadata.go new file mode 100644 index 000000000000..14d4d2ec2d04 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dao/segment_metadata.go @@ -0,0 +1,35 @@ +package dao + +import ( + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type segmentMetadataDb struct { + db *gorm.DB +} + +func (s *segmentMetadataDb) DeleteAll() error { + return s.db.Where("1 = 1").Delete(&dbmodel.SegmentMetadata{}).Error +} + +func (s *segmentMetadataDb) DeleteBySegmentID(segmentID string) error { + return s.db.Where("segment_id = ?", segmentID).Delete(&dbmodel.SegmentMetadata{}).Error +} + +func (s *segmentMetadataDb) DeleteBySegmentIDAndKeys(segmentID string, keys []string) error { + return s.db. + Where("segment_id = ?", segmentID). + Where("`key` IN ?", keys). + Delete(&dbmodel.SegmentMetadata{}).Error +} + +func (s *segmentMetadataDb) Insert(in []*dbmodel.SegmentMetadata) error { + return s.db.Clauses( + clause.OnConflict{ + Columns: []clause.Column{{Name: "segment_id"}, {Name: "key"}}, + DoUpdates: clause.AssignmentColumns([]string{"str_value", "int_value", "float_value", "ts"}), + }, + ).Create(in).Error +} diff --git a/go/coordinator/internal/metastore/db/dao/segment_test.go b/go/coordinator/internal/metastore/db/dao/segment_test.go new file mode 100644 index 000000000000..34522869faaf --- /dev/null +++ b/go/coordinator/internal/metastore/db/dao/segment_test.go @@ -0,0 +1,89 @@ +package dao + +import ( + "testing" + + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + "github.com/chroma/chroma-coordinator/internal/types" + "github.com/stretchr/testify/assert" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func TestSegmentDb_GetSegments(t *testing.T) { + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + assert.NoError(t, err) + + err = db.AutoMigrate(&dbmodel.Segment{}, &dbmodel.SegmentMetadata{}) + assert.NoError(t, err) + + uniqueID := types.NewUniqueID() + collectionID := uniqueID.String() + testTopic := "test_topic" + segment := &dbmodel.Segment{ + ID: uniqueID.String(), + CollectionID: &collectionID, + Type: "test_type", + Scope: "test_scope", + Topic: &testTopic, + } + err = db.Create(segment).Error + assert.NoError(t, err) + + testKey := "test" + testValue := "test" + metadata := &dbmodel.SegmentMetadata{ + SegmentID: segment.ID, + Key: &testKey, + StrValue: &testValue, + } + err = db.Create(metadata).Error + assert.NoError(t, err) + + segmentDb := &segmentDb{ + db: db, + } + + // Test when all parameters are nil + segments, err := segmentDb.GetSegments(types.NilUniqueID(), nil, nil, nil, types.NilUniqueID()) + assert.NoError(t, err) + assert.Len(t, segments, 1) + assert.Equal(t, segment.ID, segments[0].Segment.ID) + assert.Equal(t, segment.CollectionID, segments[0].Segment.CollectionID) + assert.Equal(t, segment.Type, segments[0].Segment.Type) + assert.Equal(t, segment.Scope, segments[0].Segment.Scope) + assert.Equal(t, segment.Topic, segments[0].Segment.Topic) + assert.Len(t, segments[0].SegmentMetadata, 1) + assert.Equal(t, metadata.Key, segments[0].SegmentMetadata[0].Key) + assert.Equal(t, metadata.StrValue, segments[0].SegmentMetadata[0].StrValue) + + // Test when filtering by ID + segments, err = segmentDb.GetSegments(types.MustParse(segment.ID), nil, nil, nil, types.NilUniqueID()) + assert.NoError(t, err) + assert.Len(t, segments, 1) + assert.Equal(t, segment.ID, segments[0].Segment.ID) + + // Test when filtering by type + segments, err = segmentDb.GetSegments(types.NilUniqueID(), &segment.Type, nil, nil, types.NilUniqueID()) + assert.NoError(t, err) + assert.Len(t, segments, 1) + assert.Equal(t, segment.ID, segments[0].Segment.ID) + + // Test when filtering by scope + segments, err = segmentDb.GetSegments(types.NilUniqueID(), nil, &segment.Scope, nil, types.NilUniqueID()) + assert.NoError(t, err) + assert.Len(t, segments, 1) + assert.Equal(t, segment.ID, segments[0].Segment.ID) + + // Test when filtering by topic + segments, err = segmentDb.GetSegments(types.NilUniqueID(), nil, nil, segment.Topic, types.NilUniqueID()) + assert.NoError(t, err) + assert.Len(t, segments, 1) + assert.Equal(t, segment.ID, segments[0].Segment.ID) + + // Test when filtering by collection ID + segments, err = segmentDb.GetSegments(types.NilUniqueID(), nil, nil, nil, types.MustParse(*segment.CollectionID)) + assert.NoError(t, err) + assert.Len(t, segments, 1) + assert.Equal(t, segment.ID, segments[0].Segment.ID) +} diff --git a/go/coordinator/internal/metastore/db/dbcore/core.go b/go/coordinator/internal/metastore/db/dbcore/core.go new file mode 100644 index 000000000000..c8cb638ac288 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dbcore/core.go @@ -0,0 +1,133 @@ +package dbcore + +import ( + "context" + "fmt" + "reflect" + + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + "github.com/pingcap/log" + "go.uber.org/zap" + "gorm.io/driver/mysql" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +var ( + globalDB *gorm.DB +) + +type DBConfig struct { + Username string + Password string + Address string + DBName string + MaxIdleConns int + MaxOpenConns int +} + +func Connect(cfg DBConfig) (*gorm.DB, error) { + dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local&tls=true&interpolateParams=true", + cfg.Username, cfg.Password, cfg.Address, cfg.DBName) + + ormLogger := logger.Default + ormLogger.LogMode(logger.Info) + db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{ + Logger: ormLogger, + CreateBatchSize: 100, + }) + if err != nil { + log.Error("fail to connect db", + zap.String("host", cfg.Address), + zap.String("database", cfg.DBName), + zap.Error(err)) + return nil, err + } + + idb, err := db.DB() + if err != nil { + log.Error("fail to create db instance", + zap.String("host", cfg.Address), + zap.String("database", cfg.DBName), + zap.Error(err)) + return nil, err + } + idb.SetMaxIdleConns(cfg.MaxIdleConns) + idb.SetMaxOpenConns(cfg.MaxOpenConns) + + globalDB = db + + log.Info("db connected success", + zap.String("host", cfg.Address), + zap.String("database", cfg.DBName), + zap.Error(err)) + + return db, nil +} + +// SetGlobalDB Only for test +func SetGlobalDB(db *gorm.DB) { + globalDB = db +} + +type ctxTransactionKey struct{} + +func CtxWithTransaction(ctx context.Context, tx *gorm.DB) context.Context { + if ctx == nil { + ctx = context.Background() + } + return context.WithValue(ctx, ctxTransactionKey{}, tx) +} + +type txImpl struct{} + +func NewTxImpl() *txImpl { + return &txImpl{} +} + +func (*txImpl) Transaction(ctx context.Context, fn func(txctx context.Context) error) error { + db := globalDB.WithContext(ctx) + + return db.Transaction(func(tx *gorm.DB) error { + txCtx := CtxWithTransaction(ctx, tx) + return fn(txCtx) + }) +} + +func GetDB(ctx context.Context) *gorm.DB { + iface := ctx.Value(ctxTransactionKey{}) + + if iface != nil { + tx, ok := iface.(*gorm.DB) + if !ok { + log.Error("unexpect context value type", zap.Any("type", reflect.TypeOf(tx))) + return nil + } + + return tx + } + + return globalDB.WithContext(ctx) +} + +func ConfigDatabaseForTesting() *gorm.DB { + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + if err != nil { + panic("failed to connect database") + } + SetGlobalDB(db) + + // Setup collection related tables + db.Migrator().DropTable(&dbmodel.Collection{}) + db.Migrator().DropTable(&dbmodel.CollectionMetadata{}) + db.Migrator().CreateTable(&dbmodel.Collection{}) + db.Migrator().CreateTable(&dbmodel.CollectionMetadata{}) + + // Setup segment related tables + db.Migrator().DropTable(&dbmodel.Segment{}) + db.Migrator().DropTable(&dbmodel.SegmentMetadata{}) + db.Migrator().CreateTable(&dbmodel.Segment{}) + db.Migrator().CreateTable(&dbmodel.SegmentMetadata{}) + return db +} diff --git a/go/coordinator/internal/metastore/db/dbmodel/collection.go b/go/coordinator/internal/metastore/db/dbmodel/collection.go new file mode 100644 index 000000000000..0e8ce7dbea1d --- /dev/null +++ b/go/coordinator/internal/metastore/db/dbmodel/collection.go @@ -0,0 +1,36 @@ +package dbmodel + +import ( + "time" + + "github.com/chroma/chroma-coordinator/internal/types" +) + +type Collection struct { + ID string `gorm:"id:primaryKey"` + Name *string `gorm:"name"` + Topic *string `gorm:"topic"` + Dimension *int32 `gorm:"dimension"` + Ts types.Timestamp `gorm:"ts"` + IsDeleted bool `gorm:"is_deleted"` + CreatedAt time.Time `gorm:"created_at, default:CURRENT_TIMESTAMP"` + UpdatedAt time.Time `gorm:"updated_at, default:CURRENT_TIMESTAMP"` +} + +func (v Collection) TableName() string { + return "collections" +} + +type CollectionAndMetadata struct { + Collection *Collection + CollectionMetadata []*CollectionMetadata +} + +//go:generate mockery --name=ICollectionDb +type ICollectionDb interface { + GetCollections(collectionID *string, collectionName *string, collectionTopic *string) ([]*CollectionAndMetadata, error) + DeleteCollectionByID(collectionID string) error + Insert(in *Collection) error + Update(in *Collection) error + DeleteAll() error +} diff --git a/go/coordinator/internal/metastore/db/dbmodel/collection_metadata.go b/go/coordinator/internal/metastore/db/dbmodel/collection_metadata.go new file mode 100644 index 000000000000..4a2b00edd79a --- /dev/null +++ b/go/coordinator/internal/metastore/db/dbmodel/collection_metadata.go @@ -0,0 +1,29 @@ +package dbmodel + +import ( + "time" + + "github.com/chroma/chroma-coordinator/internal/types" +) + +type CollectionMetadata struct { + CollectionID string `gorm:"collection_id;primaryKey"` + Key *string `gorm:"key;primaryKey"` + StrValue *string `gorm:"str_value"` + IntValue *int64 `gorm:"int_value"` + FloatValue *float64 `gorm:"float_value"` + Ts types.Timestamp `gorm:"ts"` + CreatedAt time.Time `gorm:"created_at"` + UpdatedAt time.Time `gorm:"updated_at"` +} + +func (v CollectionMetadata) TableName() string { + return "collection_metadata" +} + +//go:generate mockery --name=ICollectionMetadataDb +type ICollectionMetadataDb interface { + DeleteByCollectionID(collectionID string) error + Insert(in []*CollectionMetadata) error + DeleteAll() error +} diff --git a/go/coordinator/internal/metastore/db/dbmodel/common.go b/go/coordinator/internal/metastore/db/dbmodel/common.go new file mode 100644 index 000000000000..75134a1ec2e0 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dbmodel/common.go @@ -0,0 +1,16 @@ +package dbmodel + +import "context" + +//go:generate mockery --name=IMetaDomain +type IMetaDomain interface { + CollectionDb(ctx context.Context) ICollectionDb + CollectionMetadataDb(ctx context.Context) ICollectionMetadataDb + SegmentDb(ctx context.Context) ISegmentDb + SegmentMetadataDb(ctx context.Context) ISegmentMetadataDb +} + +//go:generate mockery --name=ITransaction +type ITransaction interface { + Transaction(ctx context.Context, fn func(txCtx context.Context) error) error +} diff --git a/go/coordinator/internal/metastore/db/dbmodel/mocks/ICollectionDb.go b/go/coordinator/internal/metastore/db/dbmodel/mocks/ICollectionDb.go new file mode 100644 index 000000000000..ce905a37e755 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dbmodel/mocks/ICollectionDb.go @@ -0,0 +1,109 @@ +// Code generated by mockery v2.33.3. DO NOT EDIT. + +package mocks + +import ( + dbmodel "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + mock "github.com/stretchr/testify/mock" +) + +// ICollectionDb is an autogenerated mock type for the ICollectionDb type +type ICollectionDb struct { + mock.Mock +} + +// DeleteAll provides a mock function with given fields: +func (_m *ICollectionDb) DeleteAll() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DeleteCollectionByID provides a mock function with given fields: collectionID +func (_m *ICollectionDb) DeleteCollectionByID(collectionID string) error { + ret := _m.Called(collectionID) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(collectionID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetCollections provides a mock function with given fields: collectionID, collectionName, collectionTopic +func (_m *ICollectionDb) GetCollections(collectionID *string, collectionName *string, collectionTopic *string) ([]*dbmodel.CollectionAndMetadata, error) { + ret := _m.Called(collectionID, collectionName, collectionTopic) + + var r0 []*dbmodel.CollectionAndMetadata + var r1 error + if rf, ok := ret.Get(0).(func(*string, *string, *string) ([]*dbmodel.CollectionAndMetadata, error)); ok { + return rf(collectionID, collectionName, collectionTopic) + } + if rf, ok := ret.Get(0).(func(*string, *string, *string) []*dbmodel.CollectionAndMetadata); ok { + r0 = rf(collectionID, collectionName, collectionTopic) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*dbmodel.CollectionAndMetadata) + } + } + + if rf, ok := ret.Get(1).(func(*string, *string, *string) error); ok { + r1 = rf(collectionID, collectionName, collectionTopic) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Insert provides a mock function with given fields: in +func (_m *ICollectionDb) Insert(in *dbmodel.Collection) error { + ret := _m.Called(in) + + var r0 error + if rf, ok := ret.Get(0).(func(*dbmodel.Collection) error); ok { + r0 = rf(in) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Update provides a mock function with given fields: in +func (_m *ICollectionDb) Update(in *dbmodel.Collection) error { + ret := _m.Called(in) + + var r0 error + if rf, ok := ret.Get(0).(func(*dbmodel.Collection) error); ok { + r0 = rf(in) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewICollectionDb creates a new instance of ICollectionDb. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewICollectionDb(t interface { + mock.TestingT + Cleanup(func()) +}) *ICollectionDb { + mock := &ICollectionDb{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/go/coordinator/internal/metastore/db/dbmodel/mocks/ICollectionMetadataDb.go b/go/coordinator/internal/metastore/db/dbmodel/mocks/ICollectionMetadataDb.go new file mode 100644 index 000000000000..87d71909b064 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dbmodel/mocks/ICollectionMetadataDb.go @@ -0,0 +1,69 @@ +// Code generated by mockery v2.33.3. DO NOT EDIT. + +package mocks + +import ( + dbmodel "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + mock "github.com/stretchr/testify/mock" +) + +// ICollectionMetadataDb is an autogenerated mock type for the ICollectionMetadataDb type +type ICollectionMetadataDb struct { + mock.Mock +} + +// DeleteAll provides a mock function with given fields: +func (_m *ICollectionMetadataDb) DeleteAll() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DeleteByCollectionID provides a mock function with given fields: collectionID +func (_m *ICollectionMetadataDb) DeleteByCollectionID(collectionID string) error { + ret := _m.Called(collectionID) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(collectionID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Insert provides a mock function with given fields: in +func (_m *ICollectionMetadataDb) Insert(in []*dbmodel.CollectionMetadata) error { + ret := _m.Called(in) + + var r0 error + if rf, ok := ret.Get(0).(func([]*dbmodel.CollectionMetadata) error); ok { + r0 = rf(in) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewICollectionMetadataDb creates a new instance of ICollectionMetadataDb. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewICollectionMetadataDb(t interface { + mock.TestingT + Cleanup(func()) +}) *ICollectionMetadataDb { + mock := &ICollectionMetadataDb{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/go/coordinator/internal/metastore/db/dbmodel/mocks/IMetaDomain.go b/go/coordinator/internal/metastore/db/dbmodel/mocks/IMetaDomain.go new file mode 100644 index 000000000000..7a7d18d23d94 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dbmodel/mocks/IMetaDomain.go @@ -0,0 +1,93 @@ +// Code generated by mockery v2.33.3. DO NOT EDIT. + +package mocks + +import ( + context "context" + + dbmodel "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + mock "github.com/stretchr/testify/mock" +) + +// IMetaDomain is an autogenerated mock type for the IMetaDomain type +type IMetaDomain struct { + mock.Mock +} + +// CollectionDb provides a mock function with given fields: ctx +func (_m *IMetaDomain) CollectionDb(ctx context.Context) dbmodel.ICollectionDb { + ret := _m.Called(ctx) + + var r0 dbmodel.ICollectionDb + if rf, ok := ret.Get(0).(func(context.Context) dbmodel.ICollectionDb); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(dbmodel.ICollectionDb) + } + } + + return r0 +} + +// CollectionMetadataDb provides a mock function with given fields: ctx +func (_m *IMetaDomain) CollectionMetadataDb(ctx context.Context) dbmodel.ICollectionMetadataDb { + ret := _m.Called(ctx) + + var r0 dbmodel.ICollectionMetadataDb + if rf, ok := ret.Get(0).(func(context.Context) dbmodel.ICollectionMetadataDb); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(dbmodel.ICollectionMetadataDb) + } + } + + return r0 +} + +// SegmentDb provides a mock function with given fields: ctx +func (_m *IMetaDomain) SegmentDb(ctx context.Context) dbmodel.ISegmentDb { + ret := _m.Called(ctx) + + var r0 dbmodel.ISegmentDb + if rf, ok := ret.Get(0).(func(context.Context) dbmodel.ISegmentDb); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(dbmodel.ISegmentDb) + } + } + + return r0 +} + +// SegmentMetadataDb provides a mock function with given fields: ctx +func (_m *IMetaDomain) SegmentMetadataDb(ctx context.Context) dbmodel.ISegmentMetadataDb { + ret := _m.Called(ctx) + + var r0 dbmodel.ISegmentMetadataDb + if rf, ok := ret.Get(0).(func(context.Context) dbmodel.ISegmentMetadataDb); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(dbmodel.ISegmentMetadataDb) + } + } + + return r0 +} + +// NewIMetaDomain creates a new instance of IMetaDomain. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewIMetaDomain(t interface { + mock.TestingT + Cleanup(func()) +}) *IMetaDomain { + mock := &IMetaDomain{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/go/coordinator/internal/metastore/db/dbmodel/mocks/ISegmentDb.go b/go/coordinator/internal/metastore/db/dbmodel/mocks/ISegmentDb.go new file mode 100644 index 000000000000..1a519766bbab --- /dev/null +++ b/go/coordinator/internal/metastore/db/dbmodel/mocks/ISegmentDb.go @@ -0,0 +1,111 @@ +// Code generated by mockery v2.33.3. DO NOT EDIT. + +package mocks + +import ( + dbmodel "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + mock "github.com/stretchr/testify/mock" + + types "github.com/chroma/chroma-coordinator/internal/types" +) + +// ISegmentDb is an autogenerated mock type for the ISegmentDb type +type ISegmentDb struct { + mock.Mock +} + +// DeleteAll provides a mock function with given fields: +func (_m *ISegmentDb) DeleteAll() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DeleteSegmentByID provides a mock function with given fields: id +func (_m *ISegmentDb) DeleteSegmentByID(id string) error { + ret := _m.Called(id) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetSegments provides a mock function with given fields: id, segmentType, scope, topic, collectionID +func (_m *ISegmentDb) GetSegments(id types.UniqueID, segmentType *string, scope *string, topic *string, collectionID types.UniqueID) ([]*dbmodel.SegmentAndMetadata, error) { + ret := _m.Called(id, segmentType, scope, topic, collectionID) + + var r0 []*dbmodel.SegmentAndMetadata + var r1 error + if rf, ok := ret.Get(0).(func(types.UniqueID, *string, *string, *string, types.UniqueID) ([]*dbmodel.SegmentAndMetadata, error)); ok { + return rf(id, segmentType, scope, topic, collectionID) + } + if rf, ok := ret.Get(0).(func(types.UniqueID, *string, *string, *string, types.UniqueID) []*dbmodel.SegmentAndMetadata); ok { + r0 = rf(id, segmentType, scope, topic, collectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*dbmodel.SegmentAndMetadata) + } + } + + if rf, ok := ret.Get(1).(func(types.UniqueID, *string, *string, *string, types.UniqueID) error); ok { + r1 = rf(id, segmentType, scope, topic, collectionID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Insert provides a mock function with given fields: _a0 +func (_m *ISegmentDb) Insert(_a0 *dbmodel.Segment) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*dbmodel.Segment) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Update provides a mock function with given fields: _a0 +func (_m *ISegmentDb) Update(_a0 *dbmodel.UpdateSegment) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*dbmodel.UpdateSegment) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewISegmentDb creates a new instance of ISegmentDb. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewISegmentDb(t interface { + mock.TestingT + Cleanup(func()) +}) *ISegmentDb { + mock := &ISegmentDb{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/go/coordinator/internal/metastore/db/dbmodel/mocks/ISegmentMetadataDb.go b/go/coordinator/internal/metastore/db/dbmodel/mocks/ISegmentMetadataDb.go new file mode 100644 index 000000000000..24c56b6d8351 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dbmodel/mocks/ISegmentMetadataDb.go @@ -0,0 +1,83 @@ +// Code generated by mockery v2.33.3. DO NOT EDIT. + +package mocks + +import ( + dbmodel "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + mock "github.com/stretchr/testify/mock" +) + +// ISegmentMetadataDb is an autogenerated mock type for the ISegmentMetadataDb type +type ISegmentMetadataDb struct { + mock.Mock +} + +// DeleteAll provides a mock function with given fields: +func (_m *ISegmentMetadataDb) DeleteAll() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DeleteBySegmentID provides a mock function with given fields: segmentID +func (_m *ISegmentMetadataDb) DeleteBySegmentID(segmentID string) error { + ret := _m.Called(segmentID) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(segmentID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DeleteBySegmentIDAndKeys provides a mock function with given fields: segmentID, keys +func (_m *ISegmentMetadataDb) DeleteBySegmentIDAndKeys(segmentID string, keys []string) error { + ret := _m.Called(segmentID, keys) + + var r0 error + if rf, ok := ret.Get(0).(func(string, []string) error); ok { + r0 = rf(segmentID, keys) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Insert provides a mock function with given fields: in +func (_m *ISegmentMetadataDb) Insert(in []*dbmodel.SegmentMetadata) error { + ret := _m.Called(in) + + var r0 error + if rf, ok := ret.Get(0).(func([]*dbmodel.SegmentMetadata) error); ok { + r0 = rf(in) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewISegmentMetadataDb creates a new instance of ISegmentMetadataDb. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewISegmentMetadataDb(t interface { + mock.TestingT + Cleanup(func()) +}) *ISegmentMetadataDb { + mock := &ISegmentMetadataDb{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/go/coordinator/internal/metastore/db/dbmodel/mocks/ITransaction.go b/go/coordinator/internal/metastore/db/dbmodel/mocks/ITransaction.go new file mode 100644 index 000000000000..79c20ef32282 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dbmodel/mocks/ITransaction.go @@ -0,0 +1,42 @@ +// Code generated by mockery v2.33.3. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// ITransaction is an autogenerated mock type for the ITransaction type +type ITransaction struct { + mock.Mock +} + +// Transaction provides a mock function with given fields: ctx, fn +func (_m *ITransaction) Transaction(ctx context.Context, fn func(context.Context) error) error { + ret := _m.Called(ctx, fn) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, func(context.Context) error) error); ok { + r0 = rf(ctx, fn) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewITransaction creates a new instance of ITransaction. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewITransaction(t interface { + mock.TestingT + Cleanup(func()) +}) *ITransaction { + mock := &ITransaction{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/go/coordinator/internal/metastore/db/dbmodel/segment.go b/go/coordinator/internal/metastore/db/dbmodel/segment.go new file mode 100644 index 000000000000..84c51f7481b4 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dbmodel/segment.go @@ -0,0 +1,45 @@ +package dbmodel + +import ( + "time" + + "github.com/chroma/chroma-coordinator/internal/types" +) + +type Segment struct { + ID string `gorm:"id;primaryKey"` + Type string `gorm:"type"` + Scope string `gorm:"scope"` + Topic *string `gorm:"topic"` + Ts types.Timestamp `gorm:"ts"` + IsDeleted bool `gorm:"default:false"` + CreatedAt time.Time `gorm:"created_at;default:CURRENT_TIMESTAMP"` + UpdatedAt time.Time `gorm:"created_at;default:CURRENT_TIMESTAMP"` + CollectionID *string `gorm:"collection_id"` +} + +func (s Segment) TableName() string { + return "segments" +} + +type SegmentAndMetadata struct { + Segment *Segment + SegmentMetadata []*SegmentMetadata +} + +type UpdateSegment struct { + ID string + Topic *string + ResetTopic bool + Collection *string + ResetCollection bool +} + +//go:generate mockery --name=ISegmentDb +type ISegmentDb interface { + GetSegments(id types.UniqueID, segmentType *string, scope *string, topic *string, collectionID types.UniqueID) ([]*SegmentAndMetadata, error) + DeleteSegmentByID(id string) error + Insert(*Segment) error + Update(*UpdateSegment) error + DeleteAll() error +} diff --git a/go/coordinator/internal/metastore/db/dbmodel/segment_metadata.go b/go/coordinator/internal/metastore/db/dbmodel/segment_metadata.go new file mode 100644 index 000000000000..a1054b496cf7 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dbmodel/segment_metadata.go @@ -0,0 +1,30 @@ +package dbmodel + +import ( + "time" + + "github.com/chroma/chroma-coordinator/internal/types" +) + +type SegmentMetadata struct { + SegmentID string `gorm:"segment_id;primaryKey"` + Key *string `gorm:"key;primaryKey"` + StrValue *string `gorm:"str_value"` + IntValue *int64 `gorm:"int_value"` + FloatValue *float64 `gorm:"float_value"` + Ts types.Timestamp `gorm:"ts"` + CreatedAt time.Time `gorm:"created_at"` + UpdatedAt time.Time `gorm:"updated_at"` +} + +func (SegmentMetadata) TableName() string { + return "segment_metadata" +} + +//go:generate mockery --name=ISegmentMetadataDb +type ISegmentMetadataDb interface { + DeleteBySegmentID(segmentID string) error + DeleteBySegmentIDAndKeys(segmentID string, keys []string) error + Insert(in []*SegmentMetadata) error + DeleteAll() error +} diff --git a/go/coordinator/internal/metastore/mocks/Catalog.go b/go/coordinator/internal/metastore/mocks/Catalog.go new file mode 100644 index 000000000000..5926bc768f0d --- /dev/null +++ b/go/coordinator/internal/metastore/mocks/Catalog.go @@ -0,0 +1,204 @@ +// Code generated by mockery v2.33.3. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + + model "github.com/chroma/chroma-coordinator/internal/model" + + types "github.com/chroma/chroma-coordinator/internal/types" +) + +// Catalog is an autogenerated mock type for the Catalog type +type Catalog struct { + mock.Mock +} + +// CreateCollection provides a mock function with given fields: ctx, collectionInfo, ts +func (_m *Catalog) CreateCollection(ctx context.Context, collectionInfo *model.CreateCollection, ts int64) (*model.Collection, error) { + ret := _m.Called(ctx, collectionInfo, ts) + + var r0 *model.Collection + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *model.CreateCollection, int64) (*model.Collection, error)); ok { + return rf(ctx, collectionInfo, ts) + } + if rf, ok := ret.Get(0).(func(context.Context, *model.CreateCollection, int64) *model.Collection); ok { + r0 = rf(ctx, collectionInfo, ts) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.Collection) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *model.CreateCollection, int64) error); ok { + r1 = rf(ctx, collectionInfo, ts) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CreateSegment provides a mock function with given fields: ctx, segmentInfo, ts +func (_m *Catalog) CreateSegment(ctx context.Context, segmentInfo *model.CreateSegment, ts int64) (*model.Segment, error) { + ret := _m.Called(ctx, segmentInfo, ts) + + var r0 *model.Segment + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *model.CreateSegment, int64) (*model.Segment, error)); ok { + return rf(ctx, segmentInfo, ts) + } + if rf, ok := ret.Get(0).(func(context.Context, *model.CreateSegment, int64) *model.Segment); ok { + r0 = rf(ctx, segmentInfo, ts) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.Segment) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *model.CreateSegment, int64) error); ok { + r1 = rf(ctx, segmentInfo, ts) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DeleteCollection provides a mock function with given fields: ctx, collectionID +func (_m *Catalog) DeleteCollection(ctx context.Context, collectionID types.UniqueID) error { + ret := _m.Called(ctx, collectionID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, types.UniqueID) error); ok { + r0 = rf(ctx, collectionID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DeleteSegment provides a mock function with given fields: ctx, segmentID +func (_m *Catalog) DeleteSegment(ctx context.Context, segmentID types.UniqueID) error { + ret := _m.Called(ctx, segmentID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, types.UniqueID) error); ok { + r0 = rf(ctx, segmentID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetCollections provides a mock function with given fields: ctx, collectionID, collectionName, collectionTopic +func (_m *Catalog) GetCollections(ctx context.Context, collectionID types.UniqueID, collectionName *string, collectionTopic *string) ([]*model.Collection, error) { + ret := _m.Called(ctx, collectionID, collectionName, collectionTopic) + + var r0 []*model.Collection + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, types.UniqueID, *string, *string) ([]*model.Collection, error)); ok { + return rf(ctx, collectionID, collectionName, collectionTopic) + } + if rf, ok := ret.Get(0).(func(context.Context, types.UniqueID, *string, *string) []*model.Collection); ok { + r0 = rf(ctx, collectionID, collectionName, collectionTopic) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*model.Collection) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, types.UniqueID, *string, *string) error); ok { + r1 = rf(ctx, collectionID, collectionName, collectionTopic) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetSegments provides a mock function with given fields: ctx, segmentID, segmentType, scope, topic, collectionID, ts +func (_m *Catalog) GetSegments(ctx context.Context, segmentID types.UniqueID, segmentType *string, scope *string, topic *string, collectionID types.UniqueID, ts int64) ([]*model.Segment, error) { + ret := _m.Called(ctx, segmentID, segmentType, scope, topic, collectionID, ts) + + var r0 []*model.Segment + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, types.UniqueID, *string, *string, *string, types.UniqueID, int64) ([]*model.Segment, error)); ok { + return rf(ctx, segmentID, segmentType, scope, topic, collectionID, ts) + } + if rf, ok := ret.Get(0).(func(context.Context, types.UniqueID, *string, *string, *string, types.UniqueID, int64) []*model.Segment); ok { + r0 = rf(ctx, segmentID, segmentType, scope, topic, collectionID, ts) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*model.Segment) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, types.UniqueID, *string, *string, *string, types.UniqueID, int64) error); ok { + r1 = rf(ctx, segmentID, segmentType, scope, topic, collectionID, ts) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ResetState provides a mock function with given fields: ctx +func (_m *Catalog) ResetState(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// UpdateCollection provides a mock function with given fields: ctx, collectionInfo, ts +func (_m *Catalog) UpdateCollection(ctx context.Context, collectionInfo *model.UpdateCollection, ts int64) (*model.Collection, error) { + ret := _m.Called(ctx, collectionInfo, ts) + + var r0 *model.Collection + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *model.UpdateCollection, int64) (*model.Collection, error)); ok { + return rf(ctx, collectionInfo, ts) + } + if rf, ok := ret.Get(0).(func(context.Context, *model.UpdateCollection, int64) *model.Collection); ok { + r0 = rf(ctx, collectionInfo, ts) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.Collection) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *model.UpdateCollection, int64) error); ok { + r1 = rf(ctx, collectionInfo, ts) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewCatalog creates a new instance of Catalog. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewCatalog(t interface { + mock.TestingT + Cleanup(func()) +}) *Catalog { + mock := &Catalog{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/go/coordinator/internal/model/collection_metadata.go b/go/coordinator/internal/model/collection_metadata.go index 6cc5730b1797..9ef3295a48a2 100644 --- a/go/coordinator/internal/model/collection_metadata.go +++ b/go/coordinator/internal/model/collection_metadata.go @@ -43,3 +43,7 @@ func (m *CollectionMetadata[T]) Get(key string) T { func (m *CollectionMetadata[T]) Remove(key string) { delete(m.Metadata, key) } + +func (m *CollectionMetadata[T]) Empty() bool { + return len(m.Metadata) == 0 +} diff --git a/go/coordinator/internal/model/segment_metadata.go b/go/coordinator/internal/model/segment_metadata.go index 15e985919bbc..eda7497063d6 100644 --- a/go/coordinator/internal/model/segment_metadata.go +++ b/go/coordinator/internal/model/segment_metadata.go @@ -51,3 +51,7 @@ func (m *SegmentMetadata[T]) Keys() []string { } return keys } + +func (m *SegmentMetadata[T]) Empty() bool { + return len(m.Metadata) == 0 +} diff --git a/go/coordinator/scripts/collections.sql b/go/coordinator/scripts/collections.sql new file mode 100644 index 000000000000..9eb27865a9ec --- /dev/null +++ b/go/coordinator/scripts/collections.sql @@ -0,0 +1,92 @@ +CREATE DATABASE IF NOT EXISTS chroma_meta CHARACTER SET utf8mb4; + +CREATE TABLE IF NOT EXISTS chroma_meta.collections ( + id VARCHAR(36) NOT NULL, + `name` VARCHAR(256) NOT NULL, + topic TEXT NOT NULL, + dimension INTEGER, + ts BIGINT UNSIGNED DEFAULT 0, + is_deleted BOOL DEFAULT FALSE, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP on update current_timestamp, + UNIQUE (`name`), + PRIMARY KEY (id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +CREATE TABLE IF NOT EXISTS chroma_meta.collection_metadata ( + collection_id VARCHAR(36) NOT NULL, + `key` VARCHAR(256) NOT NULL, + str_value TEXT, + int_value INTEGER, + float_value REAL, + ts BIGINT UNSIGNED DEFAULT 0, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP on update current_timestamp, + PRIMARY KEY (collection_id, `key`), + -- FOREIGN KEY (collection_id) REFERENCES collections(id) ON DELETE CASCADE -- TODO: Decide if we need foreign key constraints +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- Restriction on collection name +-- Record id length is only stored in the HSNW index and metadata segment. +-- We probably still need to restrict the length of record id. More generous than UUID. +-- Needs to store the epoch in the table. + +-- We need to add database and potentially tenant id to the record id. + +CREATE TABLE IF NOT EXISTS collections ( + id VARCHAR(36) NOT NULL, + `name` VARCHAR(256) NOT NULL, + topic TEXT NOT NULL, + dimension INTEGER, + ts BIGINT UNSIGNED DEFAULT 0, + is_deleted BOOL DEFAULT FALSE, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP on update current_timestamp, + UNIQUE (`name`), + PRIMARY KEY (id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +CREATE TABLE IF NOT EXISTS collection_metadata ( + collection_id VARCHAR(36) NOT NULL, + `key` VARCHAR(256) NOT NULL, + str_value TEXT, + int_value INTEGER, + float_value REAL, + ts BIGINT UNSIGNED DEFAULT 0, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP on update current_timestamp, + PRIMARY KEY (collection_id, `key`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + + +select + collections.id, + collections.`name`, + collections.topic, + collections.dimension, + collection_metadata.`key`, + collection_metadata.str_value, + collection_metadata.int_value, + collection_metadata.float_value + from + collections left join collection_metadata + on collections.id = collection_metadata.collection_id + order by + collections.id asc + + +select + collections.id, + collections.`name`, + collections.topic, + collections.dimension, + collection_metadata.`key`, + collection_metadata.str_value, + collection_metadata.int_value, + collection_metadata.float_value + from + collections left join collection_metadata + on collections.id = collection_metadata.collection_id + where collections.id = '43babc1a-e403-4a50-91a9-16621ba29ab0' + order by + collections.id asc; diff --git a/go/coordinator/scripts/deploy.sh b/go/coordinator/scripts/deploy.sh new file mode 100644 index 000000000000..02e3f320aaf0 --- /dev/null +++ b/go/coordinator/scripts/deploy.sh @@ -0,0 +1,18 @@ +make docker + +kind load docker-image chroma-coordinator + +kubectl create namespace chroma + +helm upgrade --install chroma \ + --namespace chroma \ + --set image.repository=chroma-coordinator \ + --set image.tag=latest \ + --set image.pullPolicy=Never \ + deploy/charts/chroma-coordinator + +helm uninstall chroma --namespace chroma + +kubectl logs chroma-coordinator-85f6678657-cndfx -n chroma + +kubectl port-forward -n chroma svc/chroma-coordinator 6649:6649 diff --git a/go/coordinator/scripts/generate_proto.sh b/go/coordinator/scripts/generate_proto.sh new file mode 100644 index 000000000000..f8275ba8fb4b --- /dev/null +++ b/go/coordinator/scripts/generate_proto.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative worker.proto +protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative coordinator.proto +protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative logwriter.proto +protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative metadata.proto + +grpcurl -import-path internal/proto -proto worker.proto -plaintext localhost:7070 workerpb.WorkerService/CheckHealth + +grpcurl -d '{"term":0}' -import-path internal/proto -proto logwriter.proto -plaintext localhost:9090 logwriterpb.LogWriterService/GetLogWriterAssignment + +export GOPATH=~/go + +protoc --go_out=./coordinatorpb --go_opt paths=source_relative --plugin protoc-gen-go="${GOPATH}/bin/protoc-gen-go" --go-grpc_out=./coordinatorpb --go-grpc_opt paths=source_relative --plugin protoc-gen-go-grpc="${GOPATH}/bin/protoc-gen-go-grpc" coordinator.proto -I. +protoc --go_out=./metadatapb --go_opt paths=source_relative --plugin protoc-gen-go="${GOPATH}/bin/protoc-gen-go" --go-grpc_out=./metadatapb --go-grpc_opt paths=source_relative --plugin protoc-gen-go-grpc="${GOPATH}/bin/protoc-gen-go-grpc" metadata.proto -I. +protoc --go_out=./commonpb --go_opt paths=source_relative --plugin protoc-gen-go="${GOPATH}/bin/protoc-gen-go" --go-grpc_out=./commonpb --go-grpc_opt paths=source_relative --plugin protoc-gen-go-grpc="${GOPATH}/bin/protoc-gen-go-grpc" common.proto -I. + + +protoc --go_out=../../go/coordinator/internal/proto/coordinatorpb --go_opt paths=source_relative --plugin protoc-gen-go="${GOPATH}/bin/protoc-gen-go" --go-grpc_out=../../go/coordinator/internal/proto/coordinatorpb --go-grpc_opt paths=source_relative --plugin protoc-gen-go-grpc="${GOPATH}/bin/protoc-gen-go-grpc" chroma.proto -I. + +protoc --go_out=../../../go/coordinator/internal/proto/coordinatorpb --go_opt paths=source_relative --plugin protoc-gen-go="${GOPATH}/bin/protoc-gen-go" --go-grpc_out=../../../go/coordinator/internal/proto/coordinatorpb --go-grpc_opt paths=source_relative --plugin protoc-gen-go-grpc="${GOPATH}/bin/protoc-gen-go-grpc" chroma.proto -I. + +protoc --go_out=../../../go/coordinator/internal/proto/coordinatorpb --go_opt paths=source_relative --plugin protoc-gen-go="${GOPATH}/bin/protoc-gen-go" --go-grpc_out=../../../go/coordinator/internal/proto/coordinatorpb --go-grpc_opt paths=source_relative --plugin protoc-gen-go-grpc="${GOPATH}/bin/protoc-gen-go-grpc" coordinator.proto -I. + + +protoc --go_out=../go/coordinator/internal/proto/coordinatorpb --go_opt paths=source_relative --plugin protoc-gen-go="${GOPATH}/bin/protoc-gen-go" --go-grpc_out=../go/coordinator/internal/proto/coordinatorpb --go-grpc_opt paths=source_relative --plugin protoc-gen-go-grpc="${GOPATH}/bin/protoc-gen-go-grpc" chromadb/proto/chroma.proto -I. diff --git a/go/coordinator/scripts/gprcurl.sh b/go/coordinator/scripts/gprcurl.sh new file mode 100644 index 000000000000..1eadf9faf1cf --- /dev/null +++ b/go/coordinator/scripts/gprcurl.sh @@ -0,0 +1,27 @@ +grpcurl -d @ -import-path internal/proto -proto coordinator.proto -plaintext localhost:7070 coordinator.MetadataService/CreateCollection <