Skip to content

Commit

Permalink
stores: fix deadlock in RefreshHealth
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Feb 28, 2024
1 parent 1ef39bc commit 555d963
Showing 1 changed file with 22 additions and 35 deletions.
57 changes: 22 additions & 35 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1961,8 +1961,8 @@ func (s *SQLStore) RefreshHealth(ctx context.Context) error {
// Update slab health in batches.
now := time.Now()

for {
healthQuery := s.db.Raw(`
// build health query
healthQuery := s.db.Raw(`
SELECT slabs.id, slabs.db_contract_set_id, CASE WHEN (slabs.min_shards = slabs.total_shards)
THEN
CASE WHEN (COUNT(DISTINCT(CASE WHEN cs.name IS NULL THEN NULL ELSE c.host_id END)) < slabs.min_shards)
Expand All @@ -1981,50 +1981,37 @@ WHERE slabs.health_valid_until <= ?
GROUP BY slabs.id
LIMIT ?
`, now.Unix(), refreshHealthBatchSize)

for {
var rowsAffected int64
err := s.retryTransaction(func(tx *gorm.DB) error {
// create temp table from the health query since we will reuse it
if err := tx.Exec("DROP TABLE IF EXISTS src").Error; err != nil {
return err
} else if err = tx.Exec("CREATE TEMPORARY TABLE src AS ?", healthQuery).Error; err != nil {
return err
} else if err = tx.Exec("CREATE INDEX src_id ON src (id)").Error; err != nil {
return err
}

var res *gorm.DB
if isSQLite(s.db) {
res = tx.Exec("UPDATE slabs SET health = src.health, health_valid_until = (?) FROM src WHERE slabs.id=src.id", sqlRandomTimestamp(s.db, now, refreshHealthMinHealthValidity, refreshHealthMaxHealthValidity))
res = tx.Exec("UPDATE slabs SET health = inner.health, health_valid_until = (?) FROM (?) AS inner WHERE slabs.id=inner.id", sqlRandomTimestamp(s.db, now, refreshHealthMinHealthValidity, refreshHealthMaxHealthValidity), healthQuery)
} else {
res = tx.Exec("UPDATE slabs sla INNER JOIN src h ON sla.id = h.id SET sla.health = h.health, health_valid_until = (?)", sqlRandomTimestamp(s.db, now, refreshHealthMinHealthValidity, refreshHealthMaxHealthValidity))
res = tx.Exec("UPDATE slabs sla INNER JOIN (?) h ON sla.id = h.id SET sla.health = h.health, health_valid_until = (?)", healthQuery, sqlRandomTimestamp(s.db, now, refreshHealthMinHealthValidity, refreshHealthMaxHealthValidity))
}
if res.Error != nil {
return res.Error
}
rowsAffected = res.RowsAffected

// Update the health of the objects associated with the updated slabs.
if isSQLite(s.db) {
return tx.Exec(`UPDATE objects SET health = i.health FROM (
SELECT slices.db_object_id, MIN(s.health) AS health
FROM slices
INNER JOIN src s ON s.id = slices.db_slab_id
INNER JOIN objects o ON o.id = slices.db_object_id
GROUP BY slices.db_object_id
) i
WHERE i.db_object_id = objects.id AND objects.health != i.health`).Error
} else {
return tx.Exec(`UPDATE objects
INNER JOIN (
SELECT slices.db_object_id, MIN(s.health) as health
FROM slices
INNER JOIN src s ON s.id = slices.db_slab_id
GROUP BY slices.db_object_id
) i ON objects.id = i.db_object_id
SET objects.health = i.health
WHERE objects.health != i.health
`).Error
}
// Update the health of objects with outdated health.
return tx.Exec(`
UPDATE objects
SET health = (
SELECT MIN(slabs.health)
FROM slabs
INNER JOIN slices ON slices.db_slab_id = slabs.id
INNER JOIN objects ON slices.db_object_id = objects.id
)
WHERE EXISTS (
SELECT 1 FROM slabs
INNER JOIN slices ON slices.db_slab_id = slabs.id
INNER JOIN objects ON slices.db_object_id = objects.id
WHERE slabs.health < objects.health
)
`).Error
})
if err != nil {
return err
Expand Down

0 comments on commit 555d963

Please sign in to comment.