Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve retry mechanism for transactions when WriteConflict error #27

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions sstxn/sstxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package sstxn
import (
"errors"
"fmt"
"time"

"github.com/juju/mgo/v3"
"github.com/juju/mgo/v3/bson"
Expand Down Expand Up @@ -54,6 +55,10 @@ func (nilLogger) Criticalf(message string, args ...interface{}) {}

var _ Logger = nilLogger{}

const TRANSACTION_TIMEOUT = 120 * time.Second

var ErrTimeout = fmt.Errorf("transaction failed after retrying for 120 seconds")

// A Runner applies operations as part of a transaction onto any number
// of collections within a database. See the Run method for details.
type Runner struct {
Expand Down Expand Up @@ -106,6 +111,9 @@ func NewRunner(db *mgo.Database, logger Logger) *Runner {
// Any number of transactions may be run concurrently, with one
// runner or many.
func (r *Runner) Run(ops []txn.Op, id bson.ObjectId, info interface{}) (err error) {
timeout := time.NewTimer(TRANSACTION_TIMEOUT)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern for this is that Juju is also going to be doing a retry (after rebuilding the TXN) on top of this layer. And IIRC our default ends up being 20 retries (Harry and I dug into it).
I don't want to end up with 20 * 120s of retries.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are returning a new type of error, juju should not be retrying this (failed) transactions, if it is then we should probably change that logic on the upper layers right?
Also, since we are dealing with an error that the driver layer is seeing and that we don't propagate (WriteConflict), the retries should be at this level IMO.
But I'm happy to go back to what I had done in juju/juju#16159 and retry only at juju level and only at EnterScope.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a context should be passed into here, then in the juju/txn package, the runner should start the timer there.

defer timeout.Stop()

const efmt = "error in transaction op %d: %s"
for i := range ops {
op := &ops[i]
Expand Down Expand Up @@ -133,21 +141,19 @@ func (r *Runner) Run(ops []txn.Op, id bson.ObjectId, info interface{}) (err erro
id = bson.NewObjectId()
}

// Sometimes the mongo server will return an error code 112 (write conflict).
// This is a signal the transaction needs to be retried.
// We'll retry 3 times but not forever.
for i := 0; i < 3; i++ {
err = r.runTxn(ops, id)
if err == errWriteConflict {
r.logger.Tracef("attempt %d retrying txn ops", i)
continue
for {
err := r.runTxn(ops, id)
if err != errWriteConflict {
return err
}
break
}
if err == errWriteConflict {
err = txn.ErrAborted
select {
case <-timeout.C:
r.logger.Debugf("transaction failed after retrying for 120 seconds, ops '%+v'", ops)
return ErrTimeout
default:
}
r.logger.Tracef("retrying txn ops '%+v'", ops)
}
return err
}

func (r *Runner) runTxn(ops []txn.Op, id bson.ObjectId) error {
Expand Down Expand Up @@ -474,7 +480,7 @@ func (r *Runner) updateLog(ops []txn.Op, revnos []int64, txnId bson.ObjectId) er
//
// Saved documents are in the format:
//
// {"_id": <txn id>, <collection>: {"d": [<doc id>, ...], "r": [<doc revno>, ...]}}
// {"_id": <txn id>, <collection>: {"d": [<doc id>, ...], "r": [<doc revno>, ...]}}
//
// The document revision is the value of the txn-revno field after
// the change has been applied. Negative values indicate the document
Expand Down
4 changes: 3 additions & 1 deletion sstxn/sstxn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,9 @@ func (s *S) TestConcurrentRemoveUpdatePostAssertFailure(c *C) {
Id: 0,
Remove: true,
}}, "", nil)
c.Assert(err, Equals, txn.ErrAborted)
// Since we are getting a WriteConflict, we retry for 120 seconds
// and then fail with timeout error.
c.Assert(err, Equals, sstxn.ErrTimeout)
}

type NotMarshallable struct {
Expand Down