Skip to content

Commit

Permalink
[ENH]: enable purging with leader election api k8s (#2064)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - New functionality
	 - Add purging logs background worker

## Test plan
*How are these changes tested?*
Tested locally with property test
  • Loading branch information
nicolasgere authored May 2, 2024
1 parent b0b089a commit d278a35
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 1 deletion.
4 changes: 3 additions & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ k8s_resource(
'test-memberlist:MemberList',
'test-memberlist-reader:ClusterRole',
'test-memberlist-reader-binding:ClusterRoleBinding',
'compaction-service-config:configmap',
'lease-watcher:role',
'logservice-serviceaccount-rolebinding:rolebinding',
'compaction-service-config:configmap'
],
new_name='k8s_setup',
labels=["infrastructure"],
Expand Down
2 changes: 2 additions & 0 deletions go/cmd/logservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"github.com/chroma-core/chroma/go/pkg/log/configuration"
"github.com/chroma-core/chroma/go/pkg/log/purging"
"github.com/chroma-core/chroma/go/pkg/log/repository"
"github.com/chroma-core/chroma/go/pkg/log/server"
"github.com/chroma-core/chroma/go/pkg/proto/logservicepb"
Expand Down Expand Up @@ -49,6 +50,7 @@ func main() {
s := grpc.NewServer(grpc.UnaryInterceptor(otel.ServerGrpcInterceptor))
logservicepb.RegisterLogServiceServer(s, server)
log.Info("log service started", zap.String("address", listener.Addr().String()))
go purging.RunPurging(ctx, lr)
if err := s.Serve(listener); err != nil {
log.Fatal("failed to serve", zap.Error(err))
}
Expand Down
108 changes: 108 additions & 0 deletions go/pkg/log/purging/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package purging

import (
"context"
"github.com/chroma-core/chroma/go/pkg/log/repository"
"github.com/pingcap/log"
"os"
"time"

"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)

func RunPurging(ctx context.Context, lg *repository.LogRepository) {
log.Info("starting purging")
podName, _ := os.LookupEnv("POD_NAME")
if podName == "" {
log.Error("POD_NAME environment variable is not set")
return
}
namespace, _ := os.LookupEnv("POD_NAMESPACE")
if namespace == "" {
log.Error("POD_NAMESPACE environment variable is not set")
return
}
client, err := createKubernetesClient()
if err != nil {
log.Error("failed to create kubernetes client", zap.Error(err))
return
}

elector, err := setupLeaderElection(client, namespace, podName, lg)
if err != nil {
log.Error("failed to setup leader election", zap.Error(err))
return
}

elector.Run(ctx)
return
}

func createKubernetesClient() (*kubernetes.Clientset, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(config)
}

func setupLeaderElection(client *kubernetes.Clientset, namespace, podName string, lg *repository.LogRepository) (lr *leaderelection.LeaderElector, err error) {
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: "log-purging-lock",
Namespace: namespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: podName,
},
}

lr, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
log.Info("started leading")
performPurgingLoop(ctx, lr, lg)
},
OnStoppedLeading: func() {
log.Info("stopped leading")
},
},
})
return
}

func performPurgingLoop(ctx context.Context, le *leaderelection.LeaderElector, lg *repository.LogRepository) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
log.Info("checking leader status")
if le.IsLeader() {
log.Info("leader is active")
if err := lg.PurgeRecords(ctx); err != nil {
log.Error("failed to purge records", zap.Error(err))
continue
}
log.Info("purged records")
} else {
log.Info("leader is inactive")
break
}
}
}
}
32 changes: 32 additions & 0 deletions k8s/distributed-chroma/templates/logservice.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ spec:
# TODO properly use flow control here to check which type of value we need.
{{ .value | nindent 14 }}
{{ end }}
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: "{{ .Values.logService.image.repository }}:{{ .Values.logService.image.tag }}"
imagePullPolicy: IfNotPresent
name: logservice
Expand Down Expand Up @@ -51,3 +59,27 @@ metadata:
namespace: {{ .Values.namespace }}

---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: logservice-serviceaccount-rolebinding
namespace: {{ .Values.namespace }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: lease-watcher
subjects:
- kind: ServiceAccount
name: logservice-serviceaccount
namespace: {{ .Values.namespace }}

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: {{ .Values.namespace }}
name: lease-watcher
rules:
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "create", "watch", "update", "delete"]

0 comments on commit d278a35

Please sign in to comment.