From d870f60f46aec06da7fe1368a0ecc5cfeb50199c Mon Sep 17 00:00:00 2001 From: Christopher Schinnerl Date: Thu, 13 Jun 2024 11:12:34 +0200 Subject: [PATCH] Migrate Host scanning code to raw SQL (#1297) --- internal/sql/sql.go | 6 +- stores/hostdb.go | 162 ++------------------------------------ stores/sql/database.go | 12 +++ stores/sql/main.go | 80 +++++++++++++++++++ stores/sql/mysql/main.go | 8 ++ stores/sql/sqlite/main.go | 8 ++ 6 files changed, 119 insertions(+), 157 deletions(-) diff --git a/internal/sql/sql.go b/internal/sql/sql.go index c44d700f1..23b499213 100644 --- a/internal/sql/sql.go +++ b/internal/sql/sql.go @@ -94,10 +94,10 @@ func (s *DB) Exec(ctx context.Context, query string, args ...any) (sql.Result, e func (s *DB) Prepare(ctx context.Context, query string) (*LoggedStmt, error) { start := time.Now() stmt, err := s.db.PrepareContext(ctx, query) - if dur := time.Since(start); dur > s.longQueryDuration { - s.log.Debug("slow prepare", zap.String("query", query), zap.Duration("elapsed", dur), zap.Stack("stack")) - } else if err != nil { + if err != nil { return nil, err + } else if dur := time.Since(start); dur > s.longQueryDuration { + s.log.Debug("slow prepare", zap.String("query", query), zap.Duration("elapsed", dur), zap.Stack("stack")) } return &LoggedStmt{ Stmt: stmt, diff --git a/stores/hostdb.go b/stores/hostdb.go index c80fe67ba..9cbd24cb7 100644 --- a/stores/hostdb.go +++ b/stores/hostdb.go @@ -28,11 +28,6 @@ const ( // consensusInfoID defines the primary key of the entry in the consensusInfo // table. consensusInfoID = 1 - - // hostRetrievalBatchSize is the number of hosts we fetch from the - // database per batch. Empirically tested to verify that this is a value - // that performs reasonably well. - hostRetrievalBatchSize = 10000 ) var ( @@ -396,38 +391,12 @@ func (ss *SQLStore) UpdateHostCheck(ctx context.Context, autopilotID string, hk } // HostsForScanning returns the address of hosts for scanning. -func (ss *SQLStore) HostsForScanning(ctx context.Context, maxLastScan time.Time, offset, limit int) ([]api.HostAddress, error) { - if offset < 0 { - return nil, sql.ErrNegativeOffset - } - - var hosts []struct { - PublicKey publicKey `gorm:"unique;index;NOT NULL"` - NetAddress string - } - var hostAddresses []api.HostAddress - - err := ss.db. - WithContext(ctx). - Model(&dbHost{}). - Where("last_scan < ?", maxLastScan.UnixNano()). - Offset(offset). - Limit(limit). - Order("last_scan ASC"). - FindInBatches(&hosts, hostRetrievalBatchSize, func(tx *gorm.DB, batch int) error { - for _, h := range hosts { - hostAddresses = append(hostAddresses, api.HostAddress{ - PublicKey: types.PublicKey(h.PublicKey), - NetAddress: h.NetAddress, - }) - } - return nil - }). - Error - if err != nil { - return nil, err - } - return hostAddresses, err +func (ss *SQLStore) HostsForScanning(ctx context.Context, maxLastScan time.Time, offset, limit int) (hosts []api.HostAddress, err error) { + err = ss.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error { + hosts, err = tx.HostsForScanning(ctx, maxLastScan, offset, limit) + return err + }) + return } func (ss *SQLStore) SearchHosts(ctx context.Context, autopilotID, filterMode, usabilityMode, addressContains string, keyIn []types.PublicKey, offset, limit int) ([]api.Host, error) { @@ -554,123 +523,8 @@ func (ss *SQLStore) HostBlocklist(ctx context.Context) (blocklist []string, err } func (ss *SQLStore) RecordHostScans(ctx context.Context, scans []api.HostScan) error { - if len(scans) == 0 { - return nil // nothing to do - } - - // Get keys from input. - keyMap := make(map[publicKey]struct{}) - var hks []publicKey - for _, scan := range scans { - if _, exists := keyMap[publicKey(scan.HostKey)]; !exists { - hks = append(hks, publicKey(scan.HostKey)) - keyMap[publicKey(scan.HostKey)] = struct{}{} - } - } - - // Fetch hosts for which to add scans. This can be done outsisde the - // transaction to reduce the time we spend in the transaction since we don't - // need it to be perfectly consistent. - var hosts []dbHost - for i := 0; i < len(hks); i += maxSQLVars { - end := i + maxSQLVars - if end > len(hks) { - end = len(hks) - } - var batchHosts []dbHost - if err := ss.db.WithContext(ctx).Where("public_key IN (?)", hks[i:end]). - Find(&batchHosts).Error; err != nil { - return err - } - hosts = append(hosts, batchHosts...) - } - hostMap := make(map[publicKey]dbHost) - for _, h := range hosts { - hostMap[h.PublicKey] = h - } - - // Write the interactions and update to the hosts atomically within a single - // transaction. - return ss.retryTransaction(ctx, func(tx *gorm.DB) error { - // Handle scans - for _, scan := range scans { - host, exists := hostMap[publicKey(scan.HostKey)] - if !exists { - continue // host doesn't exist - } - lastScan := time.Unix(0, host.LastScan) - - if scan.Success { - // Handle successful scan. - host.SuccessfulInteractions++ - if host.LastScan > 0 && lastScan.Before(scan.Timestamp) { - host.Uptime += scan.Timestamp.Sub(lastScan) - } - host.RecentDowntime = 0 - host.RecentScanFailures = 0 - - // overwrite the NetAddress in the settings with the one we - // received through the host announcement - scan.Settings.NetAddress = host.NetAddress - host.Settings = hostSettings(scan.Settings) - - // scans can only update the price table if the current - // pricetable is expired anyway, ensuring scans never - // overwrite a valid price table since the price table from - // scans are not paid for and thus not useful for anything - // aside from gouging checks - if time.Now().After(host.PriceTableExpiry.Time) { - host.PriceTable = convertHostPriceTable(scan.PriceTable) - host.PriceTableExpiry = dsql.NullTime{ - Time: time.Now(), - Valid: true, - } - } - } else { - // Handle failed scan. - host.FailedInteractions++ - host.RecentScanFailures++ - if host.LastScan > 0 && lastScan.Before(scan.Timestamp) { - host.Downtime += scan.Timestamp.Sub(lastScan) - host.RecentDowntime += scan.Timestamp.Sub(lastScan) - } - } - - host.TotalScans++ - host.Scanned = host.Scanned || scan.Success - host.SecondToLastScanSuccess = host.LastScanSuccess - host.LastScanSuccess = scan.Success - host.LastScan = scan.Timestamp.UnixNano() - - // Save to map again. - hostMap[host.PublicKey] = host - } - - // Persist. - for _, h := range hostMap { - err := tx.Model(&dbHost{}). - Where("public_key", h.PublicKey). - Updates(map[string]interface{}{ - "scanned": h.Scanned, - "total_scans": h.TotalScans, - "second_to_last_scan_success": h.SecondToLastScanSuccess, - "last_scan_success": h.LastScanSuccess, - "recent_downtime": h.RecentDowntime, - "recent_scan_failures": h.RecentScanFailures, - "downtime": h.Downtime, - "uptime": h.Uptime, - "last_scan": h.LastScan, - "settings": h.Settings, - "price_table": h.PriceTable, - "price_table_expiry": h.PriceTableExpiry, - "successful_interactions": h.SuccessfulInteractions, - "failed_interactions": h.FailedInteractions, - }).Error - if err != nil { - return err - } - } - return nil + return ss.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error { + return tx.RecordHostScans(ctx, scans) }) } diff --git a/stores/sql/database.go b/stores/sql/database.go index a63537d71..c1b0c3745 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -99,6 +99,10 @@ type ( // InsertObject inserts a new object into the database. InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error + // HostsForScanning returns a list of hosts to scan which haven't been + // scanned since at least maxLastScan. + HostsForScanning(ctx context.Context, maxLastScan time.Time, offset, limit int) ([]api.HostAddress, error) + // ListBuckets returns a list of all buckets in the database. ListBuckets(ctx context.Context) ([]api.Bucket, error) @@ -126,6 +130,14 @@ type ( // or slab buffer. PruneSlabs(ctx context.Context, limit int64) (int64, error) + // RecordHostScans records the results of host scans in the database + // such as recording the settings and price table of a host in case of + // success and updating the uptime and downtime of a host. + // NOTE: The price table is only updated if the known price table is + // expired since price tables from scans are not paid for and are + // therefore only useful for gouging checks. + RecordHostScans(ctx context.Context, scans []api.HostScan) error + // RemoveOfflineHosts removes all hosts that have been offline for // longer than maxDownTime and been scanned at least minRecentFailures // times. The contracts of those hosts are also removed. diff --git a/stores/sql/main.go b/stores/sql/main.go index eceb7adea..548b5b347 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -305,6 +305,31 @@ func CopyObject(ctx context.Context, tx sql.Tx, srcBucket, dstBucket, srcKey, ds return fetchMetadata(dstObjID) } +func HostsForScanning(ctx context.Context, tx sql.Tx, maxLastScan time.Time, offset, limit int) ([]api.HostAddress, error) { + if offset < 0 { + return nil, ErrNegativeOffset + } else if limit == -1 { + limit = math.MaxInt64 + } + + rows, err := tx.Query(ctx, "SELECT public_key, net_address FROM hosts WHERE last_scan < ? LIMIT ? OFFSET ?", + maxLastScan.UnixNano(), limit, offset) + if err != nil { + return nil, fmt.Errorf("failed to fetch hosts for scanning: %w", err) + } + defer rows.Close() + + var hosts []api.HostAddress + for rows.Next() { + var ha api.HostAddress + if err := rows.Scan((*PublicKey)(&ha.PublicKey), &ha.NetAddress); err != nil { + return nil, fmt.Errorf("failed to scan host row: %w", err) + } + hosts = append(hosts, ha) + } + return hosts, nil +} + func InsertMultipartUpload(ctx context.Context, tx sql.Tx, bucket, key string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error) { // fetch bucket id var bucketID int64 @@ -823,6 +848,61 @@ func ObjectsStats(ctx context.Context, tx sql.Tx, opts api.ObjectsStatsOpts) (ap }, nil } +func RecordHostScans(ctx context.Context, tx sql.Tx, scans []api.HostScan) error { + if len(scans) == 0 { + return nil + } + // NOTE: The order of the assignments in the UPDATE statement is important + // for MySQL compatibility. e.g. second_to_last_scan_success must be set + // before last_scan_success. + stmt, err := tx.Prepare(ctx, ` + UPDATE hosts SET + scanned = scanned OR ?, + total_scans = total_scans + 1, + second_to_last_scan_success = last_scan_success, + last_scan_success = ?, + recent_downtime = CASE WHEN ? AND last_scan > 0 AND last_scan < ? THEN recent_downtime + ? - last_scan ELSE CASE WHEN ? THEN 0 ELSE recent_downtime END END, + recent_scan_failures = CASE WHEN ? THEN 0 ELSE recent_scan_failures + 1 END, + downtime = CASE WHEN ? AND last_scan > 0 AND last_scan < ? THEN downtime + ? - last_scan ELSE downtime END, + uptime = CASE WHEN ? AND last_scan > 0 AND last_scan < ? THEN uptime + ? - last_scan ELSE uptime END, + last_scan = ?, + settings = CASE WHEN ? THEN ? ELSE settings END, + price_table = CASE WHEN ? THEN ? ELSE price_table END, + price_table_expiry = CASE WHEN ? AND price_table_expiry IS NOT NULL AND ? > price_table_expiry THEN ? ELSE price_table_expiry END, + successful_interactions = CASE WHEN ? THEN successful_interactions + 1 ELSE successful_interactions END, + failed_interactions = CASE WHEN ? THEN failed_interactions + 1 ELSE failed_interactions END + WHERE public_key = ? + `) + if err != nil { + return fmt.Errorf("failed to prepare statement to update host with scan: %w", err) + } + defer stmt.Close() + + now := time.Now() + for _, scan := range scans { + scanTime := scan.Timestamp.UnixNano() + _, err = stmt.Exec(ctx, + scan.Success, // scanned + scan.Success, // last_scan_success + !scan.Success, scanTime, scanTime, scan.Success, // recent_downtime + scan.Success, // recent_scan_failures + !scan.Success, scanTime, scanTime, // downtime + scan.Success, scanTime, scanTime, // uptime + scanTime, // last_scan + scan.Success, Settings(scan.Settings), // settings + scan.Success, PriceTable(scan.PriceTable), // price_table + scan.Success, now, now, // price_table_expiry + scan.Success, // successful_interactions + !scan.Success, // failed_interactions + PublicKey(scan.HostKey), + ) + if err != nil { + return fmt.Errorf("failed to update host with scan: %w", err) + } + } + return nil +} + func RemoveOfflineHosts(ctx context.Context, tx sql.Tx, minRecentFailures uint64, maxDownTime time.Duration) (int64, error) { // fetch contracts rows, err := tx.Query(ctx, ` diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index 6e8e716b0..5f41cc77d 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -293,6 +293,10 @@ func (tx *MainDatabaseTx) DeleteObjects(ctx context.Context, bucket string, key } } +func (tx *MainDatabaseTx) HostsForScanning(ctx context.Context, maxLastScan time.Time, offset, limit int) ([]api.HostAddress, error) { + return ssql.HostsForScanning(ctx, tx, maxLastScan, offset, limit) +} + func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error { // get bucket id var bucketID int64 @@ -457,6 +461,10 @@ func (tx *MainDatabaseTx) PruneSlabs(ctx context.Context, limit int64) (int64, e return res.RowsAffected() } +func (tx *MainDatabaseTx) RecordHostScans(ctx context.Context, scans []api.HostScan) error { + return ssql.RecordHostScans(ctx, tx, scans) +} + func (tx *MainDatabaseTx) RemoveOfflineHosts(ctx context.Context, minRecentFailures uint64, maxDownTime time.Duration) (int64, error) { return ssql.RemoveOfflineHosts(ctx, tx, minRecentFailures, maxDownTime) } diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 23f27dcd5..de9476495 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -282,6 +282,10 @@ func (tx *MainDatabaseTx) DeleteObjects(ctx context.Context, bucket string, key } } +func (tx *MainDatabaseTx) HostsForScanning(ctx context.Context, maxLastScan time.Time, offset, limit int) ([]api.HostAddress, error) { + return ssql.HostsForScanning(ctx, tx, maxLastScan, offset, limit) +} + func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error { // get bucket id var bucketID int64 @@ -454,6 +458,10 @@ func (tx *MainDatabaseTx) PruneSlabs(ctx context.Context, limit int64) (int64, e return res.RowsAffected() } +func (tx *MainDatabaseTx) RecordHostScans(ctx context.Context, scans []api.HostScan) error { + return ssql.RecordHostScans(ctx, tx, scans) +} + func (tx *MainDatabaseTx) RemoveOfflineHosts(ctx context.Context, minRecentFailures uint64, maxDownTime time.Duration) (int64, error) { return ssql.RemoveOfflineHosts(ctx, tx, minRecentFailures, maxDownTime) }