From 8e8e11f2811d457e9f12e0c266ebb70ca892ae43 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 21 Aug 2024 15:30:17 +0200 Subject: [PATCH 1/3] worker: extend cache with contract additions --- api/events.go | 21 +++++++++++++++++++++ bus/routes.go | 9 +++++++++ internal/worker/cache.go | 21 +++++++++++++++++++++ internal/worker/events.go | 1 + 4 files changed, 52 insertions(+) diff --git a/api/events.go b/api/events.go index 611cf48fa..e9600e53b 100644 --- a/api/events.go +++ b/api/events.go @@ -17,6 +17,7 @@ const ( ModuleHost = "host" ModuleSetting = "setting" + EventAdd = "add" EventUpdate = "update" EventDelete = "delete" EventArchive = "archive" @@ -34,6 +35,11 @@ type ( Timestamp time.Time `json:"timestamp"` } + EventContractAdd struct { + Added ContractMetadata `json:"added"` + Timestamp time.Time `json:"timestamp"` + } + EventContractArchive struct { ContractID types.FileContractID `json:"contractID"` Reason string `json:"reason"` @@ -79,6 +85,15 @@ var ( } } + WebhookContractAdd = func(url string, headers map[string]string) webhooks.Webhook { + return webhooks.Webhook{ + Event: EventAdd, + Headers: headers, + Module: ModuleContract, + URL: url, + } + } + WebhookContractArchive = func(url string, headers map[string]string) webhooks.Webhook { return webhooks.Webhook{ Event: EventArchive, @@ -142,6 +157,12 @@ func ParseEventWebhook(event webhooks.Event) (interface{}, error) { switch event.Module { case ModuleContract: switch event.Event { + case EventAdd: + var e EventContractAdd + if err := json.Unmarshal(bytes, &e); err != nil { + return nil, err + } + return e, nil case EventArchive: var e EventContractArchive if err := json.Unmarshal(bytes, &e); err != nil { diff --git a/bus/routes.go b/bus/routes.go index 262e53a11..542485bd5 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -961,6 +961,15 @@ func (b *Bus) contractIDHandlerPOST(jc jape.Context) { if jc.Check("couldn't store contract", err) == nil { jc.Encode(a) } + + b.broadcastAction(webhooks.Event{ + Module: api.ModuleContract, + Event: api.EventAdd, + Payload: api.EventContractAdd{ + Added: a, + Timestamp: time.Now().UTC(), + }, + }) } func (b *Bus) contractIDRenewedHandlerPOST(jc jape.Context) { diff --git a/internal/worker/cache.go b/internal/worker/cache.go index 5081710c2..dfc749d2a 100644 --- a/internal/worker/cache.go +++ b/internal/worker/cache.go @@ -168,6 +168,9 @@ func (c *cache) HandleEvent(event webhooks.Event) (err error) { case api.EventConsensusUpdate: log = log.With("bh", e.BlockHeight, "ts", e.Timestamp) c.handleConsensusUpdate(e) + case api.EventContractAdd: + log = log.With("fcid", e.Added.ID, "ts", e.Timestamp) + c.handleContractAdd(e) case api.EventContractArchive: log = log.With("fcid", e.ContractID, "ts", e.Timestamp) c.handleContractArchive(e) @@ -234,6 +237,24 @@ func (c *cache) handleConsensusUpdate(event api.EventConsensusUpdate) { c.cache.Set(cacheKeyGougingParams, gp) } +func (c *cache) handleContractAdd(event api.EventContractAdd) { + // return early if the cache doesn't have contracts + value, found, _ := c.cache.Get(cacheKeyDownloadContracts) + if !found { + return + } + contracts := value.([]api.ContractMetadata) + + // add the contract to the cache + for _, contract := range contracts { + if contract.ID == event.Added.ID { + return + } + } + contracts = append(contracts, event.Added) + c.cache.Set(cacheKeyDownloadContracts, contracts) +} + func (c *cache) handleContractArchive(event api.EventContractArchive) { // return early if the cache doesn't have contracts value, found, _ := c.cache.Get(cacheKeyDownloadContracts) diff --git a/internal/worker/events.go b/internal/worker/events.go index eb98018d3..e0960fd5c 100644 --- a/internal/worker/events.go +++ b/internal/worker/events.go @@ -112,6 +112,7 @@ func (e *eventSubscriber) Register(ctx context.Context, eventsURL string, opts . // prepare webhooks webhooks := []webhooks.Webhook{ api.WebhookConsensusUpdate(eventsURL, headers), + api.WebhookContractAdd(eventsURL, headers), api.WebhookContractArchive(eventsURL, headers), api.WebhookContractRenew(eventsURL, headers), api.WebhookHostUpdate(eventsURL, headers), From 8cc1e0fe7c8e10b2a2929f3d7c93c1273ca8a6f2 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Wed, 21 Aug 2024 15:49:07 +0200 Subject: [PATCH 2/3] worker: fix TestEventSubscriber --- internal/worker/events_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/worker/events_test.go b/internal/worker/events_test.go index 3de028acd..bd4fc8c8a 100644 --- a/internal/worker/events_test.go +++ b/internal/worker/events_test.go @@ -156,7 +156,7 @@ func TestEventSubscriber(t *testing.T) { time.Sleep(testRegisterInterval) // assert webhook was registered - if webhooks := w.Webhooks(); len(webhooks) != 5 { + if webhooks := w.Webhooks(); len(webhooks) != 6 { t.Fatal("expected 5 webhooks, got", len(webhooks)) } From 0869172a26ae21134e9279352ae155865ed6c4d9 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Thu, 22 Aug 2024 10:06:04 +0200 Subject: [PATCH 3/3] bus: address comments --- bus/routes.go | 12 ++++++------ internal/worker/events_test.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/bus/routes.go b/bus/routes.go index 542485bd5..b5cbdf368 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -947,19 +947,17 @@ func (b *Bus) contractIDHandlerPOST(jc jape.Context) { var req api.ContractAddRequest if jc.DecodeParam("id", &id) != nil || jc.Decode(&req) != nil { return - } - if req.Contract.ID() != id { + } else if req.Contract.ID() != id { http.Error(jc.ResponseWriter, "contract ID mismatch", http.StatusBadRequest) return - } - if req.TotalCost.IsZero() { + } else if req.TotalCost.IsZero() { http.Error(jc.ResponseWriter, "TotalCost can not be zero", http.StatusBadRequest) return } a, err := b.ms.AddContract(jc.Request.Context(), req.Contract, req.ContractPrice, req.TotalCost, req.StartHeight, req.State) - if jc.Check("couldn't store contract", err) == nil { - jc.Encode(a) + if jc.Check("couldn't store contract", err) != nil { + return } b.broadcastAction(webhooks.Event{ @@ -970,6 +968,8 @@ func (b *Bus) contractIDHandlerPOST(jc jape.Context) { Timestamp: time.Now().UTC(), }, }) + + jc.Encode(a) } func (b *Bus) contractIDRenewedHandlerPOST(jc jape.Context) { diff --git a/internal/worker/events_test.go b/internal/worker/events_test.go index bd4fc8c8a..76c89bbfb 100644 --- a/internal/worker/events_test.go +++ b/internal/worker/events_test.go @@ -157,7 +157,7 @@ func TestEventSubscriber(t *testing.T) { // assert webhook was registered if webhooks := w.Webhooks(); len(webhooks) != 6 { - t.Fatal("expected 5 webhooks, got", len(webhooks)) + t.Fatal("expected 6 webhooks, got", len(webhooks)) } // send the same event again