Skip to content

Commit

Permalink
Migrate webhooks db code to raw SQL (#1317)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl authored Jun 20, 2024
1 parent dd35323 commit 9225732
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 53 deletions.
13 changes: 13 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/webhooks"
)

// The database interfaces define all methods that a SQL database must implement
Expand Down Expand Up @@ -37,6 +38,10 @@ type (
// AddMultipartPart adds a part to an unfinished multipart upload.
AddMultipartPart(ctx context.Context, bucket, path, contractSet, eTag, uploadID string, partNumber int, slices object.SlabSlices) error

// AddWebhook adds a new webhook to the database. If the webhook already
// exists, it is updated.
AddWebhook(ctx context.Context, wh webhooks.Webhook) error

// ArchiveContract moves a contract from the regular contracts to the
// archived ones.
ArchiveContract(ctx context.Context, fcid types.FileContractID, reason string) error
Expand Down Expand Up @@ -81,6 +86,11 @@ type (
// contains the root, latest_host is updated to that host.
DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error)

// DeleteWebhook deletes the webhook with the matching module, event and
// URL of the provided webhook. If the webhook doesn't exist,
// webhooks.ErrWebhookNotFound is returned.
DeleteWebhook(ctx context.Context, wh webhooks.Webhook) error

// InsertBufferedSlab inserts a buffered slab into the database. This
// includes the creation of a buffered slab as well as the corresponding
// regular slab it is linked to. It returns the ID of the buffered slab
Expand Down Expand Up @@ -230,6 +240,9 @@ type (
// between 'minValidity' and 'maxValidity' is used to determine the time
// the health of the updated slabs becomes invalid
UpdateSlabHealth(ctx context.Context, limit int64, minValidity, maxValidity time.Duration) (int64, error)

// Webhooks returns all registered webhooks.
Webhooks(ctx context.Context) ([]webhooks.Webhook, error)
}

MetricsDatabase interface {
Expand Down
34 changes: 34 additions & 0 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/sql"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/webhooks"
"lukechampine.com/frand"
)

Expand Down Expand Up @@ -376,6 +377,18 @@ func DeleteHostSector(ctx context.Context, tx sql.Tx, hk types.PublicKey, root t
return int(deletedSectors), nil
}

func DeleteWebhook(ctx context.Context, tx sql.Tx, wh webhooks.Webhook) error {
res, err := tx.Exec(ctx, "DELETE FROM webhooks WHERE module = ? AND event = ? AND url = ?", wh.Module, wh.Event, wh.URL)
if err != nil {
return fmt.Errorf("failed to delete webhook: %w", err)
} else if n, err := res.RowsAffected(); err != nil {
return fmt.Errorf("failed to check rows affected: %w", err)
} else if n == 0 {
return webhooks.ErrWebhookNotFound
}
return nil
}

func HostAllowlist(ctx context.Context, tx sql.Tx) ([]types.PublicKey, error) {
rows, err := tx.Query(ctx, "SELECT entry FROM host_allowlist_entries")
if err != nil {
Expand Down Expand Up @@ -1393,6 +1406,27 @@ func UpdateBucketPolicy(ctx context.Context, tx sql.Tx, bucket string, bp api.Bu
return nil
}

func Webhooks(ctx context.Context, tx sql.Tx) ([]webhooks.Webhook, error) {
rows, err := tx.Query(ctx, "SELECT module, event, url, headers FROM webhooks")
if err != nil {
return nil, fmt.Errorf("failed to fetch webhooks: %w", err)
}
defer rows.Close()

var whs []webhooks.Webhook
for rows.Next() {
var webhook webhooks.Webhook
var headers string
if err := rows.Scan(&webhook.Module, &webhook.Event, &webhook.URL, &headers); err != nil {
return nil, fmt.Errorf("failed to scan webhook: %w", err)
} else if err := json.Unmarshal([]byte(headers), &webhook.Headers); err != nil {
return nil, fmt.Errorf("failed to unmarshal headers: %w", err)
}
whs = append(whs, webhook)
}
return whs, nil
}

func scanAutopilot(s scanner) (api.Autopilot, error) {
var a api.Autopilot
if err := s.Scan(&a.ID, (*AutopilotConfig)(&a.Config), &a.CurrentPeriod); err != nil {
Expand Down
26 changes: 26 additions & 0 deletions stores/sql/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
ssql "go.sia.tech/renterd/stores/sql"
"go.sia.tech/renterd/webhooks"
"lukechampine.com/frand"

"go.sia.tech/renterd/internal/sql"
Expand Down Expand Up @@ -134,6 +135,23 @@ func (tx *MainDatabaseTx) AbortMultipartUpload(ctx context.Context, bucket, path
return ssql.AbortMultipartUpload(ctx, tx, bucket, path, uploadID)
}

func (tx *MainDatabaseTx) AddWebhook(ctx context.Context, wh webhooks.Webhook) error {
headers := "{}"
if len(wh.Headers) > 0 {
h, err := json.Marshal(wh.Headers)
if err != nil {
return fmt.Errorf("failed to marshal headers: %w", err)
}
headers = string(h)
}
_, err := tx.Exec(ctx, "INSERT INTO webhooks (created_at, module, event, url, headers) VALUES (?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE headers = VALUES(headers)",
time.Now(), wh.Module, wh.Event, wh.URL, headers)
if err != nil {
return fmt.Errorf("failed to insert webhook: %w", err)
}
return nil
}

func (tx *MainDatabaseTx) ArchiveContract(ctx context.Context, fcid types.FileContractID, reason string) error {
return ssql.ArchiveContract(ctx, tx, fcid, reason)
}
Expand Down Expand Up @@ -254,6 +272,10 @@ func (tx *MainDatabaseTx) InsertMultipartUpload(ctx context.Context, bucket, key
return ssql.InsertMultipartUpload(ctx, tx, bucket, key, ec, mimeType, metadata)
}

func (tx *MainDatabaseTx) DeleteWebhook(ctx context.Context, wh webhooks.Webhook) error {
return ssql.DeleteWebhook(ctx, tx, wh)
}

func (tx *MainDatabaseTx) DeleteBucket(ctx context.Context, bucket string) error {
return ssql.DeleteBucket(ctx, tx, bucket)
}
Expand Down Expand Up @@ -907,6 +929,10 @@ func (tx *MainDatabaseTx) UpdateSlabHealth(ctx context.Context, limit int64, min
return res.RowsAffected()
}

func (tx *MainDatabaseTx) Webhooks(ctx context.Context) ([]webhooks.Webhook, error) {
return ssql.Webhooks(ctx, tx)
}

func (tx *MainDatabaseTx) insertSlabs(ctx context.Context, objID, partID *int64, contractSet string, slices object.SlabSlices) error {
if (objID == nil) == (partID == nil) {
return errors.New("exactly one of objID and partID must be set")
Expand Down
26 changes: 26 additions & 0 deletions stores/sql/sqlite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.sia.tech/renterd/internal/sql"
"go.sia.tech/renterd/object"
ssql "go.sia.tech/renterd/stores/sql"
"go.sia.tech/renterd/webhooks"
"lukechampine.com/frand"

"go.uber.org/zap"
Expand Down Expand Up @@ -133,6 +134,23 @@ func (tx *MainDatabaseTx) AbortMultipartUpload(ctx context.Context, bucket, path
return ssql.AbortMultipartUpload(ctx, tx, bucket, path, uploadID)
}

func (tx *MainDatabaseTx) AddWebhook(ctx context.Context, wh webhooks.Webhook) error {
headers := "{}"
if len(wh.Headers) > 0 {
h, err := json.Marshal(wh.Headers)
if err != nil {
return fmt.Errorf("failed to marshal headers: %w", err)
}
headers = string(h)
}
_, err := tx.Exec(ctx, "INSERT INTO webhooks (created_at, module, event, url, headers) VALUES (?, ?, ?, ?, ?) ON CONFLICT DO UPDATE SET headers = EXCLUDED.headers",
time.Now(), wh.Module, wh.Event, wh.URL, headers)
if err != nil {
return fmt.Errorf("failed to insert webhook: %w", err)
}
return nil
}

func (tx *MainDatabaseTx) ArchiveContract(ctx context.Context, fcid types.FileContractID, reason string) error {
return ssql.ArchiveContract(ctx, tx, fcid, reason)
}
Expand Down Expand Up @@ -250,6 +268,10 @@ func (tx *MainDatabaseTx) DeleteHostSector(ctx context.Context, hk types.PublicK
return ssql.DeleteHostSector(ctx, tx, hk, root)
}

func (tx *MainDatabaseTx) DeleteWebhook(ctx context.Context, wh webhooks.Webhook) error {
return ssql.DeleteWebhook(ctx, tx, wh)
}

func (tx *MainDatabaseTx) InsertBufferedSlab(ctx context.Context, fileName string, contractSetID int64, ec object.EncryptionKey, minShards, totalShards uint8) (int64, error) {
return ssql.InsertBufferedSlab(ctx, tx, fileName, contractSetID, ec, minShards, totalShards)
}
Expand Down Expand Up @@ -899,6 +921,10 @@ func (tx *MainDatabaseTx) UpdateSlabHealth(ctx context.Context, limit int64, min
return res.RowsAffected()
}

func (tx *MainDatabaseTx) Webhooks(ctx context.Context) ([]webhooks.Webhook, error) {
return ssql.Webhooks(ctx, tx)
}

func (tx *MainDatabaseTx) insertSlabs(ctx context.Context, objID, partID *int64, contractSet string, slices object.SlabSlices) error {
if (objID == nil) == (partID == nil) {
return errors.New("exactly one of objID and partID must be set")
Expand Down
66 changes: 13 additions & 53 deletions stores/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,66 +3,26 @@ package stores
import (
"context"

sql "go.sia.tech/renterd/stores/sql"
"go.sia.tech/renterd/webhooks"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type (
dbWebhook struct {
Model

Module string `gorm:"uniqueIndex:idx_module_event_url;NOT NULL;size:255"`
Event string `gorm:"uniqueIndex:idx_module_event_url;NOT NULL;size:255"`
URL string `gorm:"uniqueIndex:idx_module_event_url;NOT NULL;size:255"`

Headers map[string]string `gorm:"serializer:json"`
}
)

func (dbWebhook) TableName() string {
return "webhooks"
}

func (s *SQLStore) DeleteWebhook(ctx context.Context, wb webhooks.Webhook) error {
return s.retryTransaction(ctx, func(tx *gorm.DB) error {
res := tx.Exec("DELETE FROM webhooks WHERE module = ? AND event = ? AND url = ?",
wb.Module, wb.Event, wb.URL)
if res.Error != nil {
return res.Error
} else if res.RowsAffected == 0 {
return gorm.ErrRecordNotFound
}
return nil
func (s *SQLStore) AddWebhook(ctx context.Context, wh webhooks.Webhook) error {
return s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
return tx.AddWebhook(ctx, wh)
})
}

func (s *SQLStore) AddWebhook(ctx context.Context, wh webhooks.Webhook) error {
return s.retryTransaction(ctx, func(tx *gorm.DB) error {
return tx.Clauses(clause.OnConflict{
DoNothing: true,
}).Create(&dbWebhook{
Module: wh.Module,
Event: wh.Event,
URL: wh.URL,
Headers: wh.Headers,
}).Error
func (s *SQLStore) DeleteWebhook(ctx context.Context, wh webhooks.Webhook) error {
return s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
return tx.DeleteWebhook(ctx, wh)
})
}

func (s *SQLStore) Webhooks(ctx context.Context) ([]webhooks.Webhook, error) {
var dbWebhooks []dbWebhook
if err := s.db.WithContext(ctx).Find(&dbWebhooks).Error; err != nil {
return nil, err
}
var whs []webhooks.Webhook
for _, wb := range dbWebhooks {
whs = append(whs, webhooks.Webhook{
Module: wb.Module,
Event: wb.Event,
URL: wb.URL,
Headers: wb.Headers,
})
}
return whs, nil
func (s *SQLStore) Webhooks(ctx context.Context) (whs []webhooks.Webhook, err error) {
err = s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
whs, err = tx.Webhooks(ctx)
return err
})
return
}
6 changes: 6 additions & 0 deletions stores/webhooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ func TestWebhooks(t *testing.T) {
Module: "foo",
Event: "bar",
URL: "http://example.com",
Headers: map[string]string{
"foo1": "bar1",
},
}
wh2 := webhooks.Webhook{
Module: "foo2",
Event: "bar2",
URL: "http://example2.com",
Headers: map[string]string{
"foo2": "bar2",
},
}

// Add hook.
Expand Down

0 comments on commit 9225732

Please sign in to comment.