Skip to content

Commit

Permalink
coordinator_table_catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
Liquan Pei authored and Liquan Pei committed Oct 24, 2023
1 parent 10e0fa7 commit 1689847
Show file tree
Hide file tree
Showing 36 changed files with 2,612 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ dist
terraform.tfstate
.hypothesis/
.idea
chroma_env/
13 changes: 13 additions & 0 deletions bin/reset.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

eval $(minikube -p chroma-test docker-env)

docker build -t chroma-coordinator:latest -f go/coordinator/Dockerfile .

kubectl delete deployment coordinator -n chroma

# Apply the kubernetes manifests
kubectl apply -f k8s/deployment
kubectl apply -f k8s/crd
kubectl apply -f k8s/cr
kubectl apply -f k8s/test
14 changes: 8 additions & 6 deletions go/coordinator/internal/coordinator/apis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,28 +627,30 @@ func TestUpdateSegment(t *testing.T) {
assert.Equal(t, []*model.Segment{segment}, result)

// Add a new metadata key
metadata.Set("test_str2", &model.SegmentMetadataValueStringType{Value: "str2"})
segment.Metadata.Set("test_str2", &model.SegmentMetadataValueStringType{Value: "str2"})
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Metadata: metadata})
Metadata: segment.Metadata})
result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)

// Update a metadata key
metadata.Set("test_str", &model.SegmentMetadataValueStringType{Value: "str3"})
segment.Metadata.Set("test_str", &model.SegmentMetadataValueStringType{Value: "str3"})
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Metadata: metadata})
Metadata: segment.Metadata})
result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)

// Delete a metadata key
metadata.Remove("test_str")
segment.Metadata.Remove("test_str")
newMetadata := model.NewSegmentMetadata[model.SegmentMetadataValueType]()
newMetadata.Set("test_str", nil)
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Metadata: metadata})
Metadata: newMetadata})
result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)
Expand Down
5 changes: 4 additions & 1 deletion go/coordinator/internal/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"

"github.com/chroma/chroma-coordinator/internal/metastore/coordinator"
"github.com/chroma/chroma-coordinator/internal/metastore/db/dao"
"github.com/chroma/chroma-coordinator/internal/metastore/db/dbcore"
"github.com/chroma/chroma-coordinator/internal/types"
"gorm.io/gorm"
)
Expand All @@ -27,7 +29,8 @@ func NewCoordinator(ctx context.Context, assignmentPolicy CollectionAssignmentPo
collectionAssignmentPolicy: assignmentPolicy,
}

catalog := coordinator.NewMemoryCatalog()
// catalog := coordinator.NewMemoryCatalog()
catalog := coordinator.NewTableCatalog(dbcore.NewTxImpl(), dao.NewMetaDomain())
meta, err := NewMetaTable(s.ctx, catalog)
if err != nil {
return nil, err
Expand Down
165 changes: 165 additions & 0 deletions go/coordinator/internal/metastore/coordinator/model_db_convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package coordinator

import (
"github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel"
"github.com/chroma/chroma-coordinator/internal/model"
"github.com/chroma/chroma-coordinator/internal/types"
"github.com/pingcap/log"
"go.uber.org/zap"
)

func convertCollectionToModel(collectionAndMetadataList []*dbmodel.CollectionAndMetadata) []*model.Collection {
if collectionAndMetadataList == nil {
return nil
}
collections := make([]*model.Collection, 0, len(collectionAndMetadataList))
for _, collectionAndMetadata := range collectionAndMetadataList {
collection := &model.Collection{
ID: types.MustParse(collectionAndMetadata.Collection.ID),
Name: *collectionAndMetadata.Collection.Name,
Topic: *collectionAndMetadata.Collection.Topic,
Dimension: collectionAndMetadata.Collection.Dimension,
Ts: collectionAndMetadata.Collection.Ts,
}
collection.Metadata = convertCollectionMetadataToModel(collectionAndMetadata.CollectionMetadata)
collections = append(collections, collection)
}
log.Debug("collection to model", zap.Any("collections", collections))
return collections
}

func convertCollectionMetadataToModel(collectionMetadataList []*dbmodel.CollectionMetadata) *model.CollectionMetadata[model.CollectionMetadataValueType] {
metadata := model.NewCollectionMetadata[model.CollectionMetadataValueType]()
if collectionMetadataList == nil {
log.Debug("collection metadata to model", zap.Any("collectionMetadata", nil))
return nil
} else {
for _, collectionMetadata := range collectionMetadataList {
if collectionMetadata.Key != nil {
switch {
case collectionMetadata.StrValue != nil:
metadata.Add(*collectionMetadata.Key, &model.CollectionMetadataValueStringType{Value: *collectionMetadata.StrValue})
case collectionMetadata.IntValue != nil:
metadata.Add(*collectionMetadata.Key, &model.CollectionMetadataValueInt64Type{Value: *collectionMetadata.IntValue})
case collectionMetadata.FloatValue != nil:
metadata.Add(*collectionMetadata.Key, &model.CollectionMetadataValueFloat64Type{Value: *collectionMetadata.FloatValue})
default:
}
}
}
if metadata.Empty() {
metadata = nil
}
log.Debug("collection metadata to model", zap.Any("collectionMetadata", metadata))
return metadata
}

}

func convertCollectionMetadataToDB(collectionID string, metadata *model.CollectionMetadata[model.CollectionMetadataValueType]) []*dbmodel.CollectionMetadata {
if metadata == nil {
log.Debug("collection metadata to db", zap.Any("collectionMetadata", nil))
return nil
}
dbCollectionMetadataList := make([]*dbmodel.CollectionMetadata, 0, len(metadata.Metadata))
for key, value := range metadata.Metadata {
keyCopy := key
dbCollectionMetadata := &dbmodel.CollectionMetadata{
CollectionID: collectionID,
Key: &keyCopy,
}
switch v := (value).(type) {
case *model.CollectionMetadataValueStringType:
dbCollectionMetadata.StrValue = &v.Value
case *model.CollectionMetadataValueInt64Type:
dbCollectionMetadata.IntValue = &v.Value
case *model.CollectionMetadataValueFloat64Type:
dbCollectionMetadata.FloatValue = &v.Value
default:
log.Error("unknown collection metadata type", zap.Any("value", v))
}
dbCollectionMetadataList = append(dbCollectionMetadataList, dbCollectionMetadata)
}
log.Debug("collection metadata to db", zap.Any("collectionMetadata", dbCollectionMetadataList))
return dbCollectionMetadataList
}

func convertSegmentToModel(segmentAndMetadataList []*dbmodel.SegmentAndMetadata) []*model.Segment {
if segmentAndMetadataList == nil {
return nil
}
segments := make([]*model.Segment, 0, len(segmentAndMetadataList))
for _, segmentAndMetadata := range segmentAndMetadataList {
segment := &model.Segment{
ID: types.MustParse(segmentAndMetadata.Segment.ID),
Type: segmentAndMetadata.Segment.Type,
Scope: segmentAndMetadata.Segment.Scope,
Topic: segmentAndMetadata.Segment.Topic,
Ts: segmentAndMetadata.Segment.Ts,
}
if segmentAndMetadata.Segment.CollectionID != nil {
segment.CollectionID = types.MustParse(*segmentAndMetadata.Segment.CollectionID)
} else {
segment.CollectionID = types.NilUniqueID()
}

segment.Metadata = convertSegmentMetadataToModel(segmentAndMetadata.SegmentMetadata)
segments = append(segments, segment)
}
log.Debug("segment to model", zap.Any("segments", segments))
return segments
}

func convertSegmentMetadataToModel(segmentMetadataList []*dbmodel.SegmentMetadata) *model.SegmentMetadata[model.SegmentMetadataValueType] {
if segmentMetadataList == nil {
return nil
} else {
metadata := model.NewSegmentMetadata[model.SegmentMetadataValueType]()
for _, segmentMetadata := range segmentMetadataList {
if segmentMetadata.Key != nil {
switch {
case segmentMetadata.StrValue != nil:
metadata.Set(*segmentMetadata.Key, &model.SegmentMetadataValueStringType{Value: *segmentMetadata.StrValue})
case segmentMetadata.IntValue != nil:
metadata.Set(*segmentMetadata.Key, &model.SegmentMetadataValueInt64Type{Value: *segmentMetadata.IntValue})
case segmentMetadata.FloatValue != nil:
metadata.Set(*segmentMetadata.Key, &model.SegmentMetadataValueFloat64Type{Value: *segmentMetadata.FloatValue})
default:
}
}
}
if metadata.Empty() {
metadata = nil
}
log.Debug("segment metadata to model", zap.Any("segmentMetadata", nil))
return metadata
}
}

func convertSegmentMetadataToDB(segmentID string, metadata *model.SegmentMetadata[model.SegmentMetadataValueType]) []*dbmodel.SegmentMetadata {
if metadata == nil {
log.Debug("segment metadata db", zap.Any("segmentMetadata", nil))
return nil
}
dbSegmentMetadataList := make([]*dbmodel.SegmentMetadata, 0, len(metadata.Metadata))
for key, value := range metadata.Metadata {
keyCopy := key
dbSegmentMetadata := &dbmodel.SegmentMetadata{
SegmentID: segmentID,
Key: &keyCopy,
}
switch v := (value).(type) {
case *model.SegmentMetadataValueStringType:
dbSegmentMetadata.StrValue = &v.Value
case *model.SegmentMetadataValueInt64Type:
dbSegmentMetadata.IntValue = &v.Value
case *model.SegmentMetadataValueFloat64Type:
dbSegmentMetadata.FloatValue = &v.Value
default:
log.Error("unknown segment metadata type", zap.Any("value", v))
}
dbSegmentMetadataList = append(dbSegmentMetadataList, dbSegmentMetadata)
}
log.Debug("segment metadata db", zap.Any("segmentMetadata", dbSegmentMetadataList))
return dbSegmentMetadataList
}
121 changes: 121 additions & 0 deletions go/coordinator/internal/metastore/coordinator/model_db_convert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package coordinator

import (
"testing"

"github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel"
"github.com/chroma/chroma-coordinator/internal/model"
"github.com/chroma/chroma-coordinator/internal/types"
"github.com/stretchr/testify/assert"
)

func TestConvertCollectionMetadataToModel(t *testing.T) {
// Test case 1: collectionMetadataList is nil
modelCollectionMetadata := convertCollectionMetadataToModel(nil)
assert.Nil(t, modelCollectionMetadata)

// Test case 2: collectionMetadataList is empty
collectionMetadataList := []*dbmodel.CollectionMetadata{}
modelCollectionMetadata = convertCollectionMetadataToModel(collectionMetadataList)
assert.Nil(t, modelCollectionMetadata)
}

func TestConvertCollectionMetadataToDB(t *testing.T) {
// Test case 1: metadata is nil
dbCollectionMetadataList := convertCollectionMetadataToDB("collectionID", nil)
assert.Nil(t, dbCollectionMetadataList)

// Test case 2: metadata is not nil but empty
metadata := &model.CollectionMetadata[model.CollectionMetadataValueType]{
Metadata: map[string]model.CollectionMetadataValueType{},
}
dbCollectionMetadataList = convertCollectionMetadataToDB("collectionID", metadata)
assert.NotNil(t, dbCollectionMetadataList)
assert.Len(t, dbCollectionMetadataList, 0)

// Test case 3: metadata is not nil and contains values
metadata = &model.CollectionMetadata[model.CollectionMetadataValueType]{
Metadata: map[string]model.CollectionMetadataValueType{
"key1": &model.CollectionMetadataValueStringType{Value: "value1"},
"key2": &model.CollectionMetadataValueInt64Type{Value: 123},
"key3": &model.CollectionMetadataValueFloat64Type{Value: 3.14},
},
}
dbCollectionMetadataList = convertCollectionMetadataToDB("collectionID", metadata)
assert.NotNil(t, dbCollectionMetadataList)
assert.Len(t, dbCollectionMetadataList, 3)
assert.Equal(t, "collectionID", dbCollectionMetadataList[0].CollectionID)
assert.Equal(t, "key1", *dbCollectionMetadataList[0].Key)
assert.Equal(t, "value1", *dbCollectionMetadataList[0].StrValue)
assert.Nil(t, dbCollectionMetadataList[0].IntValue)
assert.Nil(t, dbCollectionMetadataList[0].FloatValue)
assert.Equal(t, "collectionID", dbCollectionMetadataList[1].CollectionID)
assert.Equal(t, "key2", *dbCollectionMetadataList[1].Key)
assert.Nil(t, dbCollectionMetadataList[1].StrValue)
assert.Equal(t, int64(123), *dbCollectionMetadataList[1].IntValue)
assert.Nil(t, dbCollectionMetadataList[1].FloatValue)
assert.Equal(t, "collectionID", dbCollectionMetadataList[2].CollectionID)
assert.Equal(t, "key3", *dbCollectionMetadataList[2].Key)
assert.Nil(t, dbCollectionMetadataList[2].StrValue)
assert.Nil(t, dbCollectionMetadataList[2].IntValue)
assert.Equal(t, 3.14, *dbCollectionMetadataList[2].FloatValue)
}
func TestConvertSegmentToModel(t *testing.T) {
// Test case 1: segmentAndMetadataList is nil
modelSegments := convertSegmentToModel(nil)
assert.Nil(t, modelSegments)

// Test case 2: segmentAndMetadataList is empty
segmentAndMetadataList := []*dbmodel.SegmentAndMetadata{}
modelSegments = convertSegmentToModel(segmentAndMetadataList)
assert.Empty(t, modelSegments)

// Test case 3: segmentAndMetadataList contains one segment with all fields set
segmentID := types.MustParse("515fc331-e117-4b86-bd84-85341128c337")
segmentTopic := "segment_topic"
collectionID := "d9a75e2e-2929-45c4-af06-75b15630edd0"
segmentAndMetadata := &dbmodel.SegmentAndMetadata{
Segment: &dbmodel.Segment{
ID: segmentID.String(),
Type: "segment_type",
Scope: "segment_scope",
Topic: &segmentTopic,
CollectionID: &collectionID,
},
SegmentMetadata: []*dbmodel.SegmentMetadata{},
}
segmentAndMetadataList = []*dbmodel.SegmentAndMetadata{segmentAndMetadata}
modelSegments = convertSegmentToModel(segmentAndMetadataList)
assert.Len(t, modelSegments, 1)
assert.Equal(t, segmentID, modelSegments[0].ID)
assert.Equal(t, "segment_type", modelSegments[0].Type)
assert.Equal(t, "segment_scope", modelSegments[0].Scope)
assert.Equal(t, "segment_topic", *modelSegments[0].Topic)
assert.Equal(t, types.MustParse(collectionID), modelSegments[0].CollectionID)
assert.Nil(t, modelSegments[0].Metadata)
}

func TestConvertSegmentMetadataToModel(t *testing.T) {
// Test case 1: segmentMetadataList is nil
modelSegmentMetadata := convertSegmentMetadataToModel(nil)
assert.Nil(t, modelSegmentMetadata)

// Test case 2: segmentMetadataList is empty
segmentMetadataList := []*dbmodel.SegmentMetadata{}
modelSegmentMetadata = convertSegmentMetadataToModel(segmentMetadataList)
assert.Empty(t, modelSegmentMetadata)

// Test case 3: segmentMetadataList contains one segment metadata with all fields set
segmentID := types.MustParse("515fc331-e117-4b86-bd84-85341128c337")
strKey := "strKey"
strValue := "strValue"
segmentMetadata := &dbmodel.SegmentMetadata{
SegmentID: segmentID.String(),
Key: &strKey,
StrValue: &strValue,
}
segmentMetadataList = []*dbmodel.SegmentMetadata{segmentMetadata}
modelSegmentMetadata = convertSegmentMetadataToModel(segmentMetadataList)
assert.Len(t, modelSegmentMetadata.Keys(), 1)
assert.Equal(t, &model.SegmentMetadataValueStringType{Value: strValue}, modelSegmentMetadata.Get(strKey))
}
Loading

0 comments on commit 1689847

Please sign in to comment.