Skip to content

Commit

Permalink
worker: update cache when hosts reannounce
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan authored and ChrisSchinnerl committed Jul 30, 2024
1 parent e0b464f commit 054dd7a
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 0 deletions.
24 changes: 24 additions & 0 deletions api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
ModuleConsensus = "consensus"
ModuleContract = "contract"
ModuleContractSet = "contract_set"
ModuleHost = "host"
ModuleSetting = "setting"

EventUpdate = "update"
Expand Down Expand Up @@ -44,6 +45,12 @@ type (
Timestamp time.Time `json:"timestamp"`
}

EventHostUpdate struct {
HostKey types.PublicKey `json:"hostKey"`
NetAddr string `json:"netAddr"`
Timestamp time.Time `json:"timestamp"`
}

EventContractSetUpdate struct {
Name string `json:"name"`
ContractIDs []types.FileContractID `json:"contractIDs"`
Expand Down Expand Up @@ -99,6 +106,15 @@ var (
}
}

WebhookHostUpdate = func(url string, headers map[string]string) webhooks.Webhook {
return webhooks.Webhook{
Event: EventUpdate,
Headers: headers,
Module: ModuleHost,
URL: url,
}
}

WebhookSettingUpdate = func(url string, headers map[string]string) webhooks.Webhook {
return webhooks.Webhook{
Event: EventUpdate,
Expand Down Expand Up @@ -155,6 +171,14 @@ func ParseEventWebhook(event webhooks.Event) (interface{}, error) {
}
return e, nil
}
case ModuleHost:
if event.Event == EventUpdate {
var e EventHostUpdate
if err := json.Unmarshal(bytes, &e); err != nil {
return nil, err
}
return e, nil
}
case ModuleSetting:
switch event.Event {
case EventUpdate:
Expand Down
11 changes: 11 additions & 0 deletions internal/chain/chainsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ func (s *ChainSubscriber) applyChainUpdate(tx ChainUpdateTx, cau chain.ApplyUpda
for hk, ha := range hus {
if err := tx.UpdateHost(hk, ha, cau.State.Index.Height, b.ID(), b.Timestamp); err != nil {
return fmt.Errorf("failed to update host: %w", err)
} else if IsSynced(b) {
// broadcast host update
s.webhooksMgr.BroadcastAction(s.shutdownCtx, webhooks.Event{
Module: api.ModuleHost,
Event: api.EventUpdate,
Payload: api.EventHostUpdate{
HostKey: hk,
NetAddr: ha.NetAddress,
Timestamp: time.Now().UTC(),
},
})
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions internal/test/e2e/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestEvents(t *testing.T) {
api.WebhookContractArchive,
api.WebhookContractRenew,
api.WebhookContractSetUpdate,
api.WebhookHostUpdate,
api.WebhookSettingDelete,
api.WebhookSettingUpdate,
}
Expand All @@ -49,6 +50,15 @@ func TestEvents(t *testing.T) {
return nil
}

// ignore host updates with net address diff. from the update we assert to receive
if event.Module == api.ModuleHost && event.Event == api.EventUpdate {
if parsed, err := api.ParseEventWebhook(event); err != nil {
t.Fatal(err)
} else if parsed.(api.EventHostUpdate).NetAddr != "127.0.0.1:0" {
return nil
}
}

// check if the event is expected
if !isKnownEvent(event) {
return fmt.Errorf("unexpected event %+v", event)
Expand Down Expand Up @@ -120,6 +130,12 @@ func TestEvents(t *testing.T) {
// delete setting
tt.OK(b.DeleteSetting(context.Background(), api.SettingRedundancy))

// update host setting
h := cluster.hosts[0]
settings := h.settings.Settings()
settings.NetAddress = "127.0.0.1:0"
tt.OK(h.UpdateSettings(settings))

// wait until we received the events
tt.Retry(10, time.Second, func() error {
mu.Lock()
Expand Down Expand Up @@ -152,6 +168,10 @@ func TestEvents(t *testing.T) {
if e.TransactionFee.IsZero() || e.BlockHeight == 0 || e.Timestamp.IsZero() || !e.Synced {
t.Fatalf("unexpected event %+v", e)
}
case api.EventHostUpdate:
if e.HostKey != h.PublicKey() || e.NetAddr != "127.0.0.1:0" || e.Timestamp.IsZero() {
t.Fatalf("unexpected event %+v", e)
}
case api.EventSettingUpdate:
if e.Key != api.SettingGouging || e.Timestamp.IsZero() {
t.Fatalf("unexpected event %+v", e)
Expand Down
23 changes: 23 additions & 0 deletions internal/worker/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (c *cache) HandleEvent(event webhooks.Event) (err error) {
case api.EventContractRenew:
log = log.With("fcid", e.Renewal.ID, "renewedFrom", e.Renewal.RenewedFrom, "ts", e.Timestamp)
c.handleContractRenew(e)
case api.EventHostUpdate:
log = log.With("hk", e.HostKey, "ts", e.Timestamp)
c.handleHostUpdate(e)
case api.EventSettingUpdate:
log = log.With("key", e.Key, "ts", e.Timestamp)
err = c.handleSettingUpdate(e)
Expand Down Expand Up @@ -204,6 +207,7 @@ func (c *cache) Initialize(ctx context.Context, workerAPI string, webhookOpts ..
api.WebhookConsensusUpdate(eventsURL, headers),
api.WebhookContractArchive(eventsURL, headers),
api.WebhookContractRenew(eventsURL, headers),
api.WebhookHostUpdate(eventsURL, headers),
api.WebhookSettingUpdate(eventsURL, headers),
} {
if err := c.b.RegisterWebhook(ctx, wh); err != nil {
Expand Down Expand Up @@ -273,6 +277,25 @@ func (c *cache) handleContractRenew(event api.EventContractRenew) {
c.cache.Set(cacheKeyDownloadContracts, contracts)
}

func (c *cache) handleHostUpdate(e api.EventHostUpdate) {
// return early if the cache doesn't have contracts
value, found, _ := c.cache.Get(cacheKeyDownloadContracts)
if !found {
return
}
contracts := value.([]api.ContractMetadata)

// update the host's IP in the cache
for i, contract := range contracts {
if contract.HostKey == e.HostKey {
contracts[i].HostIP = e.NetAddr
break
}
}

c.cache.Set(cacheKeyDownloadContracts, contracts)
}

func (c *cache) handleSettingDelete(e api.EventSettingDelete) {
if e.Key == api.SettingGouging || e.Key == api.SettingRedundancy {
c.cache.Invalidate(cacheKeyGougingParams)
Expand Down
1 change: 1 addition & 0 deletions internal/worker/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func TestWorkerCache(t *testing.T) {
{Module: api.ModuleConsensus, Event: api.EventUpdate, Payload: nil},
{Module: api.ModuleContract, Event: api.EventArchive, Payload: nil},
{Module: api.ModuleContract, Event: api.EventRenew, Payload: nil},
{Module: api.ModuleHost, Event: api.EventUpdate, Payload: nil},
{Module: api.ModuleSetting, Event: api.EventUpdate, Payload: nil},
{Module: api.ModuleSetting, Event: api.EventDelete, Payload: nil},
} {
Expand Down

0 comments on commit 054dd7a

Please sign in to comment.