Skip to content

Commit

Permalink
Merge pull request #13 from n-marton/feat/cleanup
Browse files Browse the repository at this point in the history
add functionality cleanup tags on other nodes
  • Loading branch information
ncode authored Feb 8, 2024
2 parents f66b2b6 + e4d341b commit 5fcc416
Showing 1 changed file with 83 additions and 12 deletions.
95 changes: 83 additions & 12 deletions internal/ballot/ballot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package ballot
import (
"encoding/json"
"fmt"
"os/exec"
"sync/atomic"
"time"

"github.com/google/shlex"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
log "github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
"golang.org/x/net/context"
"os/exec"
"sync/atomic"
"time"
)

type ElectionPayload struct {
Expand Down Expand Up @@ -86,6 +87,31 @@ func (b *Ballot) copyServiceToRegistration(service *api.AgentService) *api.Agent
}
}

func (b *Ballot) copyCatalogServiceToRegistration(service *api.CatalogService) *api.CatalogRegistration {
return &api.CatalogRegistration{
ID: service.ID,
Node: service.Node,
Address: service.ServiceAddress,
TaggedAddresses: service.TaggedAddresses,
NodeMeta: service.NodeMeta,
Datacenter: service.Datacenter,
Service: &api.AgentService{
ID: service.ServiceID,
Service: service.ServiceName,
Tags: service.ServiceTags,
Meta: service.ServiceMeta,
Port: service.ServicePort,
Address: service.ServiceAddress,
TaggedAddresses: service.ServiceTaggedAddresses,
Weights: api.AgentWeights{
Passing: service.ServiceWeights.Passing,
Warning: service.ServiceWeights.Warning,
},
EnableTagOverride: service.ServiceEnableTagOverride,
},
}
}

// Run starts the leader election.
func (b *Ballot) Run() (err error) {
b.ctx = context.Background()
Expand All @@ -102,22 +128,26 @@ func (b *Ballot) Run() (err error) {
}

// getService returns the registered service.
func (b *Ballot) getService() (service *api.AgentService, err error) {
func (b *Ballot) getService() (service *api.AgentService, catalogServices []*api.CatalogService, err error) {
agent := b.client.Agent()
services, err := agent.Services()
service, _, err = agent.Service(b.ID, &api.QueryOptions{})
if err != nil {
return service, err
return service, nil, err
}
service, ok := services[b.ID]
if !ok {
return service, fmt.Errorf("service %s not found", b.ID)
if service == nil {
return service, nil, fmt.Errorf("service %s not found", b.ID)
}
return service, err
catalog := b.client.Catalog()
catalogServices, _, err = catalog.Service(b.ID, b.PrimaryTag, &api.QueryOptions{})
if err != nil {
return service, nil, err
}
return service, catalogServices, err
}

// updateServiceTags updates the service tags.
func (b *Ballot) updateServiceTags() error {
service, err := b.getService()
service, _, err := b.getService()
if err != nil {
return err
}
Expand Down Expand Up @@ -156,6 +186,42 @@ func (b *Ballot) updateServiceTags() error {
return err
}

// cleanup is called on promote, it cleans up tags on the instances of the service, useful if an other ballot stopped unexpectedly
func (b *Ballot) cleanup() error {
service, catalogServices, err := b.getService()
if err != nil {
return err
}
for _, catcatalogService := range catalogServices {
if catcatalogService.Address != service.Address {
p := slices.Index(catcatalogService.ServiceTags, b.PrimaryTag)
if p == -1 {
return nil
}
log.WithFields(log.Fields{
"caller": "cleanupCatalogServiceTags",
"service": b.ID,
"node": catcatalogService.Node,
"tags": catcatalogService.ServiceTags,
}).Debug("current service tags")
catcatalogService.ServiceTags = slices.Delete(catcatalogService.ServiceTags, p, p+1)
catalog := b.client.Catalog()
reg := b.copyCatalogServiceToRegistration(catcatalogService)
_, err := catalog.Register(reg, &api.WriteOptions{})
if err != nil {
return err
}
log.WithFields(log.Fields{
"caller": "cleanupCatalogServiceTags",
"service": b.ID,
"node": catcatalogService.Node,
"tags": catcatalogService.ServiceTags,
}).Debug("new service tags")
}
}
return nil
}

// onPromote is called when the node is promoted to leader.
func (b *Ballot) onPromote() (err error) {
b.leader.Store(true)
Expand All @@ -164,6 +230,11 @@ func (b *Ballot) onPromote() (err error) {
b.releaseSession()
return fmt.Errorf("failed to update service tags: %s", err)
}
err = b.cleanup()
if err != nil {
b.releaseSession()
return fmt.Errorf("failed to cleanup old service tags: %s", err)
}
return err
}

Expand All @@ -182,7 +253,7 @@ func (b *Ballot) election() (err error) {

if !b.leader.Load() {
if b.sessionID.Load() != nil {
service, err := b.getService()
service, _, err := b.getService()
if err != nil {
return fmt.Errorf("failed to get service: %s", err)
}
Expand Down

0 comments on commit 5fcc416

Please sign in to comment.