diff --git a/internal/ballot/ballot.go b/internal/ballot/ballot.go index 2061e38..27f4556 100644 --- a/internal/ballot/ballot.go +++ b/internal/ballot/ballot.go @@ -3,15 +3,16 @@ package ballot import ( "encoding/json" "fmt" + "os/exec" + "sync/atomic" + "time" + "github.com/google/shlex" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api/watch" log "github.com/sirupsen/logrus" "golang.org/x/exp/slices" "golang.org/x/net/context" - "os/exec" - "sync/atomic" - "time" ) type ElectionPayload struct { @@ -86,6 +87,31 @@ func (b *Ballot) copyServiceToRegistration(service *api.AgentService) *api.Agent } } +func (b *Ballot) copyCatalogServiceToRegistration(service *api.CatalogService) *api.CatalogRegistration { + return &api.CatalogRegistration{ + ID: service.ID, + Node: service.Node, + Address: service.ServiceAddress, + TaggedAddresses: service.TaggedAddresses, + NodeMeta: service.NodeMeta, + Datacenter: service.Datacenter, + Service: &api.AgentService{ + ID: service.ServiceID, + Service: service.ServiceName, + Tags: service.ServiceTags, + Meta: service.ServiceMeta, + Port: service.ServicePort, + Address: service.ServiceAddress, + TaggedAddresses: service.ServiceTaggedAddresses, + Weights: api.AgentWeights{ + Passing: service.ServiceWeights.Passing, + Warning: service.ServiceWeights.Warning, + }, + EnableTagOverride: service.ServiceEnableTagOverride, + }, + } +} + // Run starts the leader election. func (b *Ballot) Run() (err error) { b.ctx = context.Background() @@ -102,22 +128,26 @@ func (b *Ballot) Run() (err error) { } // getService returns the registered service. -func (b *Ballot) getService() (service *api.AgentService, err error) { +func (b *Ballot) getService() (service *api.AgentService, catalogServices []*api.CatalogService, err error) { agent := b.client.Agent() - services, err := agent.Services() + service, _, err = agent.Service(b.ID, &api.QueryOptions{}) if err != nil { - return service, err + return service, nil, err } - service, ok := services[b.ID] - if !ok { - return service, fmt.Errorf("service %s not found", b.ID) + if service == nil { + return service, nil, fmt.Errorf("service %s not found", b.ID) } - return service, err + catalog := b.client.Catalog() + catalogServices, _, err = catalog.Service(b.ID, b.PrimaryTag, &api.QueryOptions{}) + if err != nil { + return service, nil, err + } + return service, catalogServices, err } // updateServiceTags updates the service tags. func (b *Ballot) updateServiceTags() error { - service, err := b.getService() + service, _, err := b.getService() if err != nil { return err } @@ -156,6 +186,42 @@ func (b *Ballot) updateServiceTags() error { return err } +// cleanup is called on promote, it cleans up tags on the instances of the service, useful if an other ballot stopped unexpectedly +func (b *Ballot) cleanup() error { + service, catalogServices, err := b.getService() + if err != nil { + return err + } + for _, catcatalogService := range catalogServices { + if catcatalogService.Address != service.Address { + p := slices.Index(catcatalogService.ServiceTags, b.PrimaryTag) + if p == -1 { + return nil + } + log.WithFields(log.Fields{ + "caller": "cleanupCatalogServiceTags", + "service": b.ID, + "node": catcatalogService.Node, + "tags": catcatalogService.ServiceTags, + }).Debug("current service tags") + catcatalogService.ServiceTags = slices.Delete(catcatalogService.ServiceTags, p, p+1) + catalog := b.client.Catalog() + reg := b.copyCatalogServiceToRegistration(catcatalogService) + _, err := catalog.Register(reg, &api.WriteOptions{}) + if err != nil { + return err + } + log.WithFields(log.Fields{ + "caller": "cleanupCatalogServiceTags", + "service": b.ID, + "node": catcatalogService.Node, + "tags": catcatalogService.ServiceTags, + }).Debug("new service tags") + } + } + return nil +} + // onPromote is called when the node is promoted to leader. func (b *Ballot) onPromote() (err error) { b.leader.Store(true) @@ -164,6 +230,11 @@ func (b *Ballot) onPromote() (err error) { b.releaseSession() return fmt.Errorf("failed to update service tags: %s", err) } + err = b.cleanup() + if err != nil { + b.releaseSession() + return fmt.Errorf("failed to cleanup old service tags: %s", err) + } return err } @@ -182,7 +253,7 @@ func (b *Ballot) election() (err error) { if !b.leader.Load() { if b.sessionID.Load() != nil { - service, err := b.getService() + service, _, err := b.getService() if err != nil { return fmt.Errorf("failed to get service: %s", err) }