-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(job-distributor): add exp. backoff retry to feeds.SyncNodeInfo()
#15752
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"chainlink": patch | ||
--- | ||
|
||
#added add exponential backoff retry to feeds.SyncNodeInfo() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,9 @@ import ( | |
"database/sql" | ||
"encoding/hex" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/avast/retry-go/v4" | ||
"github.com/ethereum/go-ethereum/common" | ||
"github.com/google/uuid" | ||
"github.com/lib/pq" | ||
|
@@ -141,6 +143,7 @@ type service struct { | |
lggr logger.Logger | ||
version string | ||
loopRegistrarConfig plugins.RegistrarConfig | ||
syncNodeInfoCancel context.CancelFunc | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think instead of using this to pass context, we should just have |
||
} | ||
|
||
// NewService constructs a new feeds service | ||
|
@@ -183,6 +186,7 @@ func NewService( | |
lggr: lggr, | ||
version: version, | ||
loopRegistrarConfig: rc, | ||
syncNodeInfoCancel: func() {}, | ||
} | ||
|
||
return svc | ||
|
@@ -254,8 +258,45 @@ func (s *service) RegisterManager(ctx context.Context, params RegisterManagerPar | |
return id, nil | ||
} | ||
|
||
// SyncNodeInfo syncs the node's information with FMS | ||
// syncNodeInfoWithRetry syncs the node's information with FMS using a goroutine. | ||
// In case of failures, it retries with an exponential backoff for up to 24h. | ||
func (s *service) syncNodeInfoWithRetry(id int64) { | ||
// cancel the previous context -- and, by extension, the existing goroutine -- | ||
// so that we can start anew | ||
s.syncNodeInfoCancel() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dont think we need to do this right? If the caller of |
||
|
||
var ctx context.Context | ||
ctx, s.syncNodeInfoCancel = context.WithCancel(context.Background()) | ||
|
||
retryOpts := []retry.Option{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason we didn use |
||
retry.Context(ctx), | ||
retry.Delay(5 * time.Second), | ||
retry.Delay(10 * time.Second), | ||
Comment on lines
+273
to
+274
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Delay is configured twice. |
||
retry.MaxDelay(30 * time.Minute), | ||
retry.Attempts(48 + 8), // 30m * 48 =~ 24h; plus the initial 8 shorter retries | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where did you derive the 8? where do we configure the shorter retries? |
||
retry.LastErrorOnly(true), | ||
retry.OnRetry(func(attempt uint, err error) { | ||
s.lggr.Info("failed to sync node info", "attempt", attempt, "err", err) | ||
}), | ||
} | ||
|
||
go func() { | ||
err := retry.Do(func() error { return s.SyncNodeInfo(ctx, id) }, retryOpts...) | ||
if err != nil { | ||
s.lggr.Errorw("failed to sync node info; aborting", "err", err) | ||
} else { | ||
s.lggr.Info("successfully synced node info") | ||
} | ||
|
||
s.syncNodeInfoCancel() | ||
s.syncNodeInfoCancel = func() {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm wont this introduce race condition as each request that wants to update node info will try to set this variable? |
||
}() | ||
} | ||
|
||
func (s *service) SyncNodeInfo(ctx context.Context, id int64) error { | ||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second) | ||
defer cancel() | ||
|
||
// Get the FMS RPC client | ||
fmsClient, err := s.connMgr.GetClient(id) | ||
if err != nil { | ||
|
@@ -401,9 +442,7 @@ func (s *service) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64 | |
return 0, errors.Wrap(err, "CreateChainConfig: failed to fetch manager") | ||
} | ||
|
||
if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil { | ||
s.lggr.Infof("FMS: Unable to sync node info: %v", err) | ||
} | ||
s.syncNodeInfoWithRetry(mgr.ID) | ||
|
||
return id, nil | ||
} | ||
|
@@ -425,9 +464,7 @@ func (s *service) DeleteChainConfig(ctx context.Context, id int64) (int64, error | |
return 0, errors.Wrap(err, "DeleteChainConfig: failed to fetch manager") | ||
} | ||
|
||
if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil { | ||
s.lggr.Infof("FMS: Unable to sync node info: %v", err) | ||
} | ||
s.syncNodeInfoWithRetry(mgr.ID) | ||
|
||
return id, nil | ||
} | ||
|
@@ -466,9 +503,7 @@ func (s *service) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64 | |
return 0, errors.Wrap(err, "UpdateChainConfig failed: could not get chain config") | ||
} | ||
|
||
if err := s.SyncNodeInfo(ctx, ccfg.FeedsManagerID); err != nil { | ||
s.lggr.Infof("FMS: Unable to sync node info: %v", err) | ||
} | ||
s.syncNodeInfoWithRetry(ccfg.FeedsManagerID) | ||
|
||
return id, nil | ||
} | ||
|
@@ -1145,6 +1180,8 @@ func (s *service) Close() error { | |
// This blocks until it finishes | ||
s.connMgr.Close() | ||
|
||
s.syncNodeInfoCancel() | ||
|
||
return nil | ||
}) | ||
} | ||
|
@@ -1162,10 +1199,7 @@ func (s *service) connectFeedManager(ctx context.Context, mgr FeedsManager, priv | |
}, | ||
OnConnect: func(pb.FeedsManagerClient) { | ||
// Sync the node's information with FMS once connected | ||
err := s.SyncNodeInfo(ctx, mgr.ID) | ||
if err != nil { | ||
s.lggr.Infof("Error syncing node info: %v", err) | ||
} | ||
s.syncNodeInfoWithRetry(mgr.ID) | ||
}, | ||
}) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unit tests were skipped in this draft PR as I want to get some feedback about the approach before finishing the PR