diff --git a/Tiltfile b/Tiltfile index 29dbea7eea0..11bfb85de1c 100644 --- a/Tiltfile +++ b/Tiltfile @@ -134,7 +134,7 @@ k8s_resource('pulsar', resource_deps=['k8s_setup', 'namespace'], labels=["infras k8s_resource('sysdb-migration', resource_deps=['postgres', 'namespace'], labels=["infrastructure"]) k8s_resource('logservice-migration', resource_deps=['postgres', 'namespace'], labels=["infrastructure"]) k8s_resource('logservice', resource_deps=['sysdb-migration'], labels=["chroma"], port_forwards='50052:50051') -k8s_resource('sysdb', resource_deps=['pulsar', 'sysdb-migration'], labels=["chroma"], port_forwards='50051:50051') +k8s_resource('sysdb', resource_deps=['sysdb-migration'], labels=["chroma"], port_forwards='50051:50051') k8s_resource('frontend-service', resource_deps=['pulsar', 'sysdb', 'logservice'],labels=["chroma"], port_forwards='8000:8000') k8s_resource('query-service', resource_deps=['sysdb'], labels=["chroma"]) k8s_resource('compaction-service', resource_deps=['sysdb'], labels=["chroma"]) diff --git a/go/cmd/coordinator/cmd.go b/go/cmd/coordinator/cmd.go index 6eaa05b40eb..ecdc7645f32 100644 --- a/go/cmd/coordinator/cmd.go +++ b/go/cmd/coordinator/cmd.go @@ -41,12 +41,6 @@ func init() { Cmd.Flags().IntVar(&conf.DBConfig.MaxOpenConns, "max-open-conns", 10, "MetaTable max open connections") Cmd.Flags().StringVar(&conf.DBConfig.SslMode, "ssl-mode", "disable", "SSL mode for database connection") - // Pulsar - Cmd.Flags().StringVar(&conf.PulsarAdminURL, "pulsar-admin-url", "http://localhost:8080", "Pulsar admin url") - Cmd.Flags().StringVar(&conf.PulsarURL, "pulsar-url", "pulsar://localhost:6650", "Pulsar url") - Cmd.Flags().StringVar(&conf.PulsarTenant, "pulsar-tenant", "default", "Pulsar tenant") - Cmd.Flags().StringVar(&conf.PulsarNamespace, "pulsar-namespace", "default", "Pulsar namespace") - // Notification Cmd.Flags().StringVar(&conf.NotificationStoreProvider, "notification-store-provider", "memory", "Notification store provider") Cmd.Flags().StringVar(&conf.NotifierProvider, "notifier-provider", "memory", "Notifier provider") diff --git a/go/pkg/coordinator/grpc/server.go b/go/pkg/coordinator/grpc/server.go index 96b4f53ce9e..003802d3eb9 100644 --- a/go/pkg/coordinator/grpc/server.go +++ b/go/pkg/coordinator/grpc/server.go @@ -7,7 +7,6 @@ import ( "github.com/chroma-core/chroma/go/pkg/grpcutils" - "github.com/apache/pulsar-client-go/pulsar" "github.com/chroma-core/chroma/go/pkg/coordinator" "github.com/chroma-core/chroma/go/pkg/memberlist_manager" "github.com/chroma-core/chroma/go/pkg/metastore/db/dao" @@ -37,12 +36,6 @@ type Config struct { NotifierProvider string NotificationTopic string - // Pulsar config - PulsarAdminURL string - PulsarURL string - PulsarTenant string - PulsarNamespace string - // Kubernetes config KubernetesNamespace string @@ -106,32 +99,12 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gor } var notifier notification.Notifier - var client pulsar.Client - var producer pulsar.Producer if config.NotifierProvider == "memory" { log.Info("Using memory notifier") notifier = notification.NewMemoryNotifier() - } else if config.NotifierProvider == "pulsar" { - log.Info("Using pulsar notifier") - pulsarNotifier, pulsarClient, pulsarProducer, err := createPulsarNotifer(config.PulsarURL, config.NotificationTopic) - notifier = pulsarNotifier - client = pulsarClient - producer = pulsarProducer - if err != nil { - log.Error("Failed to create pulsar notifier", zap.Error(err)) - return nil, err - } } else { - return nil, errors.New("invalid notifier provider, only memory and pulsar are supported") + return nil, errors.New("invalid notifier provider, only memory are supported") } - - if client != nil { - defer client.Close() - } - if producer != nil { - defer producer.Close() - } - coordinator, err := coordinator.NewCoordinator(ctx, db, notificationStore, notifier) if err != nil { return nil, err @@ -189,27 +162,6 @@ func createMemberlistManager(namespace string, memberlistName string, podLabel s return memberlist_manager, nil } -func createPulsarNotifer(pulsarURL string, notificationTopic string) (*notification.PulsarNotifier, pulsar.Client, pulsar.Producer, error) { - client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: pulsarURL, - }) - if err != nil { - log.Error("Failed to create pulsar client", zap.Error(err)) - return nil, nil, nil, err - } - - producer, err := client.CreateProducer(pulsar.ProducerOptions{ - Topic: notificationTopic, - }) - if err != nil { - log.Error("Failed to create producer", zap.Error(err)) - return nil, nil, nil, err - } - - notifier := notification.NewPulsarNotifier(producer) - return notifier, client, producer, nil -} - func (s *Server) Close() error { s.healthServer.Shutdown() s.coordinator.Stop() diff --git a/k8s/distributed-chroma/values.yaml b/k8s/distributed-chroma/values.yaml index 6ae7d7d9b11..54ebeae0922 100644 --- a/k8s/distributed-chroma/values.yaml +++ b/k8s/distributed-chroma/values.yaml @@ -39,9 +39,6 @@ sysdb: replicaCount: 1 env: flags: - pulsar-admin-url: "http://pulsar.chroma:8080" - pulsar-url: "pulsar://pulsar.chroma:6650" - notifier-provider: "pulsar" logService: image: @@ -76,4 +73,3 @@ logServiceMigration: repository: 'local' tag: 'logservice-migration' databaseUrl: 'postgres://chroma:chroma@postgres:5432/log?sslmode=disable' -