Skip to content

Commit

Permalink
Migrate Host scanning code to raw SQL (#1297)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl authored Jun 13, 2024
1 parent 0e6b100 commit d870f60
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 157 deletions.
6 changes: 3 additions & 3 deletions internal/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ func (s *DB) Exec(ctx context.Context, query string, args ...any) (sql.Result, e
func (s *DB) Prepare(ctx context.Context, query string) (*LoggedStmt, error) {
start := time.Now()
stmt, err := s.db.PrepareContext(ctx, query)
if dur := time.Since(start); dur > s.longQueryDuration {
s.log.Debug("slow prepare", zap.String("query", query), zap.Duration("elapsed", dur), zap.Stack("stack"))
} else if err != nil {
if err != nil {
return nil, err
} else if dur := time.Since(start); dur > s.longQueryDuration {
s.log.Debug("slow prepare", zap.String("query", query), zap.Duration("elapsed", dur), zap.Stack("stack"))
}
return &LoggedStmt{
Stmt: stmt,
Expand Down
162 changes: 8 additions & 154 deletions stores/hostdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ const (
// consensusInfoID defines the primary key of the entry in the consensusInfo
// table.
consensusInfoID = 1

// hostRetrievalBatchSize is the number of hosts we fetch from the
// database per batch. Empirically tested to verify that this is a value
// that performs reasonably well.
hostRetrievalBatchSize = 10000
)

var (
Expand Down Expand Up @@ -396,38 +391,12 @@ func (ss *SQLStore) UpdateHostCheck(ctx context.Context, autopilotID string, hk
}

// HostsForScanning returns the address of hosts for scanning.
func (ss *SQLStore) HostsForScanning(ctx context.Context, maxLastScan time.Time, offset, limit int) ([]api.HostAddress, error) {
if offset < 0 {
return nil, sql.ErrNegativeOffset
}

var hosts []struct {
PublicKey publicKey `gorm:"unique;index;NOT NULL"`
NetAddress string
}
var hostAddresses []api.HostAddress

err := ss.db.
WithContext(ctx).
Model(&dbHost{}).
Where("last_scan < ?", maxLastScan.UnixNano()).
Offset(offset).
Limit(limit).
Order("last_scan ASC").
FindInBatches(&hosts, hostRetrievalBatchSize, func(tx *gorm.DB, batch int) error {
for _, h := range hosts {
hostAddresses = append(hostAddresses, api.HostAddress{
PublicKey: types.PublicKey(h.PublicKey),
NetAddress: h.NetAddress,
})
}
return nil
}).
Error
if err != nil {
return nil, err
}
return hostAddresses, err
func (ss *SQLStore) HostsForScanning(ctx context.Context, maxLastScan time.Time, offset, limit int) (hosts []api.HostAddress, err error) {
err = ss.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
hosts, err = tx.HostsForScanning(ctx, maxLastScan, offset, limit)
return err
})
return
}

func (ss *SQLStore) SearchHosts(ctx context.Context, autopilotID, filterMode, usabilityMode, addressContains string, keyIn []types.PublicKey, offset, limit int) ([]api.Host, error) {
Expand Down Expand Up @@ -554,123 +523,8 @@ func (ss *SQLStore) HostBlocklist(ctx context.Context) (blocklist []string, err
}

func (ss *SQLStore) RecordHostScans(ctx context.Context, scans []api.HostScan) error {
if len(scans) == 0 {
return nil // nothing to do
}

// Get keys from input.
keyMap := make(map[publicKey]struct{})
var hks []publicKey
for _, scan := range scans {
if _, exists := keyMap[publicKey(scan.HostKey)]; !exists {
hks = append(hks, publicKey(scan.HostKey))
keyMap[publicKey(scan.HostKey)] = struct{}{}
}
}

// Fetch hosts for which to add scans. This can be done outsisde the
// transaction to reduce the time we spend in the transaction since we don't
// need it to be perfectly consistent.
var hosts []dbHost
for i := 0; i < len(hks); i += maxSQLVars {
end := i + maxSQLVars
if end > len(hks) {
end = len(hks)
}
var batchHosts []dbHost
if err := ss.db.WithContext(ctx).Where("public_key IN (?)", hks[i:end]).
Find(&batchHosts).Error; err != nil {
return err
}
hosts = append(hosts, batchHosts...)
}
hostMap := make(map[publicKey]dbHost)
for _, h := range hosts {
hostMap[h.PublicKey] = h
}

// Write the interactions and update to the hosts atomically within a single
// transaction.
return ss.retryTransaction(ctx, func(tx *gorm.DB) error {
// Handle scans
for _, scan := range scans {
host, exists := hostMap[publicKey(scan.HostKey)]
if !exists {
continue // host doesn't exist
}
lastScan := time.Unix(0, host.LastScan)

if scan.Success {
// Handle successful scan.
host.SuccessfulInteractions++
if host.LastScan > 0 && lastScan.Before(scan.Timestamp) {
host.Uptime += scan.Timestamp.Sub(lastScan)
}
host.RecentDowntime = 0
host.RecentScanFailures = 0

// overwrite the NetAddress in the settings with the one we
// received through the host announcement
scan.Settings.NetAddress = host.NetAddress
host.Settings = hostSettings(scan.Settings)

// scans can only update the price table if the current
// pricetable is expired anyway, ensuring scans never
// overwrite a valid price table since the price table from
// scans are not paid for and thus not useful for anything
// aside from gouging checks
if time.Now().After(host.PriceTableExpiry.Time) {
host.PriceTable = convertHostPriceTable(scan.PriceTable)
host.PriceTableExpiry = dsql.NullTime{
Time: time.Now(),
Valid: true,
}
}
} else {
// Handle failed scan.
host.FailedInteractions++
host.RecentScanFailures++
if host.LastScan > 0 && lastScan.Before(scan.Timestamp) {
host.Downtime += scan.Timestamp.Sub(lastScan)
host.RecentDowntime += scan.Timestamp.Sub(lastScan)
}
}

host.TotalScans++
host.Scanned = host.Scanned || scan.Success
host.SecondToLastScanSuccess = host.LastScanSuccess
host.LastScanSuccess = scan.Success
host.LastScan = scan.Timestamp.UnixNano()

// Save to map again.
hostMap[host.PublicKey] = host
}

// Persist.
for _, h := range hostMap {
err := tx.Model(&dbHost{}).
Where("public_key", h.PublicKey).
Updates(map[string]interface{}{
"scanned": h.Scanned,
"total_scans": h.TotalScans,
"second_to_last_scan_success": h.SecondToLastScanSuccess,
"last_scan_success": h.LastScanSuccess,
"recent_downtime": h.RecentDowntime,
"recent_scan_failures": h.RecentScanFailures,
"downtime": h.Downtime,
"uptime": h.Uptime,
"last_scan": h.LastScan,
"settings": h.Settings,
"price_table": h.PriceTable,
"price_table_expiry": h.PriceTableExpiry,
"successful_interactions": h.SuccessfulInteractions,
"failed_interactions": h.FailedInteractions,
}).Error
if err != nil {
return err
}
}
return nil
return ss.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
return tx.RecordHostScans(ctx, scans)
})
}

Expand Down
12 changes: 12 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ type (
// InsertObject inserts a new object into the database.
InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error

// HostsForScanning returns a list of hosts to scan which haven't been
// scanned since at least maxLastScan.
HostsForScanning(ctx context.Context, maxLastScan time.Time, offset, limit int) ([]api.HostAddress, error)

// ListBuckets returns a list of all buckets in the database.
ListBuckets(ctx context.Context) ([]api.Bucket, error)

Expand Down Expand Up @@ -126,6 +130,14 @@ type (
// or slab buffer.
PruneSlabs(ctx context.Context, limit int64) (int64, error)

// RecordHostScans records the results of host scans in the database
// such as recording the settings and price table of a host in case of
// success and updating the uptime and downtime of a host.
// NOTE: The price table is only updated if the known price table is
// expired since price tables from scans are not paid for and are
// therefore only useful for gouging checks.
RecordHostScans(ctx context.Context, scans []api.HostScan) error

// RemoveOfflineHosts removes all hosts that have been offline for
// longer than maxDownTime and been scanned at least minRecentFailures
// times. The contracts of those hosts are also removed.
Expand Down
80 changes: 80 additions & 0 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,31 @@ func CopyObject(ctx context.Context, tx sql.Tx, srcBucket, dstBucket, srcKey, ds
return fetchMetadata(dstObjID)
}

func HostsForScanning(ctx context.Context, tx sql.Tx, maxLastScan time.Time, offset, limit int) ([]api.HostAddress, error) {
if offset < 0 {
return nil, ErrNegativeOffset
} else if limit == -1 {
limit = math.MaxInt64
}

rows, err := tx.Query(ctx, "SELECT public_key, net_address FROM hosts WHERE last_scan < ? LIMIT ? OFFSET ?",
maxLastScan.UnixNano(), limit, offset)
if err != nil {
return nil, fmt.Errorf("failed to fetch hosts for scanning: %w", err)
}
defer rows.Close()

var hosts []api.HostAddress
for rows.Next() {
var ha api.HostAddress
if err := rows.Scan((*PublicKey)(&ha.PublicKey), &ha.NetAddress); err != nil {
return nil, fmt.Errorf("failed to scan host row: %w", err)
}
hosts = append(hosts, ha)
}
return hosts, nil
}

func InsertMultipartUpload(ctx context.Context, tx sql.Tx, bucket, key string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error) {
// fetch bucket id
var bucketID int64
Expand Down Expand Up @@ -823,6 +848,61 @@ func ObjectsStats(ctx context.Context, tx sql.Tx, opts api.ObjectsStatsOpts) (ap
}, nil
}

func RecordHostScans(ctx context.Context, tx sql.Tx, scans []api.HostScan) error {
if len(scans) == 0 {
return nil
}
// NOTE: The order of the assignments in the UPDATE statement is important
// for MySQL compatibility. e.g. second_to_last_scan_success must be set
// before last_scan_success.
stmt, err := tx.Prepare(ctx, `
UPDATE hosts SET
scanned = scanned OR ?,
total_scans = total_scans + 1,
second_to_last_scan_success = last_scan_success,
last_scan_success = ?,
recent_downtime = CASE WHEN ? AND last_scan > 0 AND last_scan < ? THEN recent_downtime + ? - last_scan ELSE CASE WHEN ? THEN 0 ELSE recent_downtime END END,
recent_scan_failures = CASE WHEN ? THEN 0 ELSE recent_scan_failures + 1 END,
downtime = CASE WHEN ? AND last_scan > 0 AND last_scan < ? THEN downtime + ? - last_scan ELSE downtime END,
uptime = CASE WHEN ? AND last_scan > 0 AND last_scan < ? THEN uptime + ? - last_scan ELSE uptime END,
last_scan = ?,
settings = CASE WHEN ? THEN ? ELSE settings END,
price_table = CASE WHEN ? THEN ? ELSE price_table END,
price_table_expiry = CASE WHEN ? AND price_table_expiry IS NOT NULL AND ? > price_table_expiry THEN ? ELSE price_table_expiry END,
successful_interactions = CASE WHEN ? THEN successful_interactions + 1 ELSE successful_interactions END,
failed_interactions = CASE WHEN ? THEN failed_interactions + 1 ELSE failed_interactions END
WHERE public_key = ?
`)
if err != nil {
return fmt.Errorf("failed to prepare statement to update host with scan: %w", err)
}
defer stmt.Close()

now := time.Now()
for _, scan := range scans {
scanTime := scan.Timestamp.UnixNano()
_, err = stmt.Exec(ctx,
scan.Success, // scanned
scan.Success, // last_scan_success
!scan.Success, scanTime, scanTime, scan.Success, // recent_downtime
scan.Success, // recent_scan_failures
!scan.Success, scanTime, scanTime, // downtime
scan.Success, scanTime, scanTime, // uptime
scanTime, // last_scan
scan.Success, Settings(scan.Settings), // settings
scan.Success, PriceTable(scan.PriceTable), // price_table
scan.Success, now, now, // price_table_expiry
scan.Success, // successful_interactions
!scan.Success, // failed_interactions
PublicKey(scan.HostKey),
)
if err != nil {
return fmt.Errorf("failed to update host with scan: %w", err)
}
}
return nil
}

func RemoveOfflineHosts(ctx context.Context, tx sql.Tx, minRecentFailures uint64, maxDownTime time.Duration) (int64, error) {
// fetch contracts
rows, err := tx.Query(ctx, `
Expand Down
8 changes: 8 additions & 0 deletions stores/sql/mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ func (tx *MainDatabaseTx) DeleteObjects(ctx context.Context, bucket string, key
}
}

func (tx *MainDatabaseTx) HostsForScanning(ctx context.Context, maxLastScan time.Time, offset, limit int) ([]api.HostAddress, error) {
return ssql.HostsForScanning(ctx, tx, maxLastScan, offset, limit)
}

func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error {
// get bucket id
var bucketID int64
Expand Down Expand Up @@ -457,6 +461,10 @@ func (tx *MainDatabaseTx) PruneSlabs(ctx context.Context, limit int64) (int64, e
return res.RowsAffected()
}

func (tx *MainDatabaseTx) RecordHostScans(ctx context.Context, scans []api.HostScan) error {
return ssql.RecordHostScans(ctx, tx, scans)
}

func (tx *MainDatabaseTx) RemoveOfflineHosts(ctx context.Context, minRecentFailures uint64, maxDownTime time.Duration) (int64, error) {
return ssql.RemoveOfflineHosts(ctx, tx, minRecentFailures, maxDownTime)
}
Expand Down
8 changes: 8 additions & 0 deletions stores/sql/sqlite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ func (tx *MainDatabaseTx) DeleteObjects(ctx context.Context, bucket string, key
}
}

func (tx *MainDatabaseTx) HostsForScanning(ctx context.Context, maxLastScan time.Time, offset, limit int) ([]api.HostAddress, error) {
return ssql.HostsForScanning(ctx, tx, maxLastScan, offset, limit)
}

func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error {
// get bucket id
var bucketID int64
Expand Down Expand Up @@ -454,6 +458,10 @@ func (tx *MainDatabaseTx) PruneSlabs(ctx context.Context, limit int64) (int64, e
return res.RowsAffected()
}

func (tx *MainDatabaseTx) RecordHostScans(ctx context.Context, scans []api.HostScan) error {
return ssql.RecordHostScans(ctx, tx, scans)
}

func (tx *MainDatabaseTx) RemoveOfflineHosts(ctx context.Context, minRecentFailures uint64, maxDownTime time.Duration) (int64, error) {
return ssql.RemoveOfflineHosts(ctx, tx, minRecentFailures, maxDownTime)
}
Expand Down

0 comments on commit d870f60

Please sign in to comment.