diff --git a/export_test.go b/export_test.go index 788af16..bd5a788 100644 --- a/export_test.go +++ b/export_test.go @@ -19,6 +19,12 @@ func SetRunnerFunc(r Runner, f func() TxnRunner) { } } +// Specify the transaction timeout for some tests. +func SetTxnTimeout(r Runner, t time.Duration) { + inner := r.(*transactionRunner) + inner.txnTimeout = t +} + var CheckMongoSupportsOut = checkMongoSupportsOut // NewDBOracleNoOut is only used for testing. It forces the DBOracle to not ask diff --git a/incrementalprune_test.go b/incrementalprune_test.go index 969c940..0afe516 100644 --- a/incrementalprune_test.go +++ b/incrementalprune_test.go @@ -4,6 +4,7 @@ package txn import ( + "context" "fmt" "time" @@ -302,7 +303,7 @@ func (s *TxnSuite) TearDownTest(c *gc.C) { func (s *TxnSuite) runTxn(c *gc.C, ops ...txn.Op) bson.ObjectId { txnId := bson.NewObjectId() - err := s.runner.Run(ops, txnId, nil) + err := s.runner.Run(context.Background(), ops, txnId, nil) c.Assert(err, jc.ErrorIsNil) return txnId } @@ -314,7 +315,7 @@ func (s *TxnSuite) runInterruptedTxn(c *gc.C, breakpoint string, ops ...txn.Op) KillChance: 1, Breakpoint: breakpoint, }) - err := s.runner.Run(ops, txnId, nil) + err := s.runner.Run(context.Background(), ops, txnId, nil) c.Assert(err, gc.Equals, txn.ErrChaos) txn.SetChaos(txn.Chaos{}) return txnId diff --git a/txn.go b/txn.go index 7b35e58..a4e4e6a 100755 --- a/txn.go +++ b/txn.go @@ -13,6 +13,7 @@ package txn import ( + "context" "math/rand" "strings" "time" @@ -29,14 +30,6 @@ import ( var logger = loggo.GetLogger("juju.txn") const ( - // defaultClientTxnRetries is the default number of times a transaction will be retried - // when there is an invariant assertion failure (for client side transactions). - defaultClientTxnRetries = 3 - - // defaultServerTxnRetries is the default number of times a transaction will be retried - // when there is an invariant assertion failure (for server side transactions). - defaultServerTxnRetries = 50 - // defaultRetryBackoff is the default interval used to pause between // unsuccessful transaction operations. defaultRetryBackoff = 1 * time.Millisecond @@ -51,6 +44,10 @@ const ( // defaultChangeLogName is the default mgo transaction runner change log. defaultChangeLogName = "txns.log" + + // defaultTxnTimeoutSeconds is the default time length for a + // transaction to finish before it gets cancelled. + defaultTxnTimeoutSeconds = 120 ) const ( @@ -139,7 +136,7 @@ type Runner interface { } type txnRunner interface { - Run([]txn.Op, bson.ObjectId, interface{}) error + Run(context.Context, []txn.Op, bson.ObjectId, interface{}) error ChangeLog(*mgo.Collection) ResumeAll() error } @@ -160,12 +157,13 @@ type transactionRunner struct { clock Clock serverSideTransactions bool - nrRetries int retryBackoff time.Duration retryFuzzPercent int pauseFunc func(duration time.Duration) newRunner func() txnRunner + + txnTimeout time.Duration } var _ Runner = (*transactionRunner)(nil) @@ -248,7 +246,6 @@ func NewRunner(params RunnerParams) Runner { runTransactionObserver: params.RunTransactionObserver, clock: params.Clock, serverSideTransactions: sstxn, - nrRetries: params.MaxRetryAttempts, retryBackoff: params.RetryBackoff, retryFuzzPercent: params.RetryFuzzPercent, pauseFunc: params.PauseFunc, @@ -261,12 +258,6 @@ func NewRunner(params RunnerParams) Runner { } else if txnRunner.changeLogName == "" { txnRunner.changeLogName = defaultChangeLogName } - if txnRunner.nrRetries == 0 { - txnRunner.nrRetries = defaultClientTxnRetries - if txnRunner.serverSideTransactions { - txnRunner.nrRetries = defaultServerTxnRetries - } - } if txnRunner.retryBackoff == 0 { txnRunner.retryBackoff = defaultRetryBackoff } @@ -284,6 +275,7 @@ func NewRunner(params RunnerParams) Runner { // they also specify a RunTransactionObserver. txnRunner.clock = clock.WallClock } + txnRunner.txnTimeout = defaultTxnTimeoutSeconds * time.Second return txnRunner } @@ -301,53 +293,68 @@ func (tr *transactionRunner) newRunnerImpl() txnRunner { return runner } -// Run is defined on Runner. +// Run is defined on Runner. After timeout the transaction gets cancelled and +// the last returned error by the transaction will be returned. func (tr *transactionRunner) Run(transactions TransactionSource) error { - var lastErr error - for i := 0; i < tr.nrRetries; i++ { - // If we are retrying, give other txns a chance to have a go. - if i > 0 && tr.serverSideTransactions { - tr.backoff(i) - } - ops, err := transactions(i) - if err == ErrTransientFailure { - continue - } - if err == ErrNoOperations { - return nil - } - if err != nil { - return err - } - if len(ops) == 0 { - // Treat this the same as ErrNoOperations but don't suppress other errors. - return nil - } - if err = tr.RunTransaction(&Transaction{ - Ops: ops, - Attempt: i, - }); err == nil { - return nil - } else if err != txn.ErrAborted && !mgo.IsRetryable(err) && !mgo.IsSnapshotError(err) { - // Mongo very occasionally returns an intermittent - // "unexpected message" error. Retry those. - // Also mongo sometimes gets very busy and we get an - // i/o timeout. We retry those too. - // However if this is the last time, return that error - // rather than the excessive contention error. - msg := err.Error() - retryErr := strings.HasSuffix(msg, "unexpected message") || - strings.HasSuffix(msg, "i/o timeout") - if !retryErr || i == (tr.nrRetries-1) { + ctx, cancel := context.WithTimeout(context.TODO(), tr.txnTimeout) + defer cancel() + + var ( + lastErr error + i int + ) + for { + select { + case <-ctx.Done(): + if lastErr == txn.ErrAborted { + return ErrExcessiveContention + } + return lastErr + default: + // If we are retrying, give other txns a chance to have a go. + if i > 0 && tr.serverSideTransactions { + tr.backoff(i) + } + ops, err := transactions(i) + if err == ErrTransientFailure { + i++ + continue + } + if err == ErrNoOperations { + return nil + } + if err != nil { return err } + if len(ops) == 0 { + // Treat this the same as ErrNoOperations but don't suppress other errors. + return nil + } + if err = tr.runTransaction( + ctx, + &Transaction{ + Ops: ops, + Attempt: i, + }); err == nil { + return nil + } else if err != txn.ErrAborted && !mgo.IsRetryable(err) && !mgo.IsSnapshotError(err) { + // Mongo very occasionally returns an intermittent + // "unexpected message" error. Retry those. + // Also mongo sometimes gets very busy and we get an + // i/o timeout. We retry those too. + // However if this is the last time, return that error + // rather than the excessive contention error. + msg := err.Error() + retryErr := strings.HasSuffix(msg, "unexpected message") || + strings.HasSuffix(msg, "i/o timeout") + if !retryErr { + return err + } + } + lastErr = err + i++ } - lastErr = err } - if lastErr == txn.ErrAborted { - return ErrExcessiveContention - } - return lastErr } func (tr *transactionRunner) backoff(attempt int) { @@ -370,6 +377,14 @@ func (tr *transactionRunner) pause(dur time.Duration) { // RunTransaction is defined on Runner. func (tr *transactionRunner) RunTransaction(transaction *Transaction) error { + ctx, cancel := context.WithTimeout(context.TODO(), tr.txnTimeout) + defer cancel() + + return tr.runTransaction(ctx, transaction) +} + +// RunTransaction is defined on Runner. +func (tr *transactionRunner) runTransaction(ctx context.Context, transaction *Transaction) error { testHooks := <-tr.testHooks tr.testHooks <- nil if len(testHooks) > 0 { @@ -421,7 +436,7 @@ func (tr *transactionRunner) RunTransaction(transaction *Transaction) error { } } } - err := runner.Run(transaction.Ops, "", nil) + err := runner.Run(ctx, transaction.Ops, "", nil) if tr.runTransactionObserver != nil { transaction.Error = err transaction.Duration = tr.clock.Now().Sub(start) diff --git a/txn_test.go b/txn_test.go index 5ece369..64594c2 100644 --- a/txn_test.go +++ b/txn_test.go @@ -4,6 +4,7 @@ package txn_test import ( + "context" "errors" "fmt" "time" @@ -59,6 +60,8 @@ func (s *txnSuite) SetUpTest(c *gc.C) { s.backoffs = append(s.backoffs, dur) }, }) + // Set a smaller txn timeout than the default one for tests. + jujutxn.SetTxnTimeout(s.txnRunner, 100*time.Millisecond) s.supportsSST = false } @@ -123,6 +126,8 @@ func (s *sstxnSuite) SetUpTest(c *gc.C) { s.backoffs = append(s.backoffs, dur) }, }) + // Set a smaller txn timeout than the default one for tests. + jujutxn.SetTxnTimeout(s.txnRunner, 100*time.Millisecond) s.supportsSST = true } @@ -433,10 +438,8 @@ func (s *txnSuite) TestRetryHooks(c *gc.C) { } func (s *txnSuite) TestExcessiveContention(c *gc.C) { - maxAttempt := 0 // This keeps failing because the Assert is wrong. buildTxn := func(attempt int) ([]txn.Op, error) { - maxAttempt = attempt ops := []txn.Op{{ C: s.collection.Name, Id: "1", @@ -447,14 +450,9 @@ func (s *txnSuite) TestExcessiveContention(c *gc.C) { } err := s.txnRunner.Run(buildTxn) c.Assert(err, gc.Equals, jujutxn.ErrExcessiveContention) - if s.supportsSST { - c.Assert(maxAttempt, gc.Equals, 49) - } else { - c.Assert(maxAttempt, gc.Equals, 2) - } } -func (s *txnSuite) TestPause(c *gc.C) { +func (s *txnSuite) TestBackoff(c *gc.C) { buildTxn := func(attempt int) ([]txn.Op, error) { ops := []txn.Op{{ C: s.collection.Name, @@ -467,7 +465,7 @@ func (s *txnSuite) TestPause(c *gc.C) { err := s.txnRunner.Run(buildTxn) c.Assert(err, gc.Equals, jujutxn.ErrExcessiveContention) if s.supportsSST { - c.Assert(s.backoffs, gc.HasLen, 49) + // c.Assert(s.backoffs, gc.HasLen, 49) c.Assert(s.backoffs[48], jc.DurationLessThan, 50*time.Millisecond) for i := 0; i < len(s.backoffs); i++ { c.Assert(s.backoffs[i], jc.GreaterThan, 0) @@ -586,27 +584,6 @@ func (s *txnSuite) TestRunFailureIntermittentUnexpectedMessage(c *gc.C) { c.Check(tries, gc.Equals, 2) } -func (s *txnSuite) TestRunFailureAlwaysUnexpectedMessage(c *gc.C) { - runner := jujutxn.NewRunner(jujutxn.RunnerParams{}) - fake := &fakeRunner{errors: []error{ - errors.New("unexpected message"), - errors.New("unexpected message"), - errors.New("unexpected message"), - errors.New("unexpected message"), - }} - jujutxn.SetRunnerFunc(runner, fake.new) - tries := 0 - // Doesn't matter what this returns as long as it isn't an error. - buildTxn := func(attempt int) ([]txn.Op, error) { - tries++ - // return 1 op that happens to do nothing - return []txn.Op{{}}, nil - } - err := runner.Run(buildTxn) - c.Check(err, gc.ErrorMatches, "unexpected message") - c.Check(tries, gc.Equals, 3) -} - func (s *txnSuite) TestRunFailureIOTimeout(c *gc.C) { runner := jujutxn.NewRunner(jujutxn.RunnerParams{}) fake := &fakeRunner{errors: []error{errors.New("i/o timeout")}} @@ -623,27 +600,6 @@ func (s *txnSuite) TestRunFailureIOTimeout(c *gc.C) { c.Check(tries, gc.Equals, 2) } -func (s *txnSuite) TestRunFailureAlwaysIOTimeout(c *gc.C) { - runner := jujutxn.NewRunner(jujutxn.RunnerParams{}) - fake := &fakeRunner{errors: []error{ - errors.New("i/o timeout"), - errors.New("i/o timeout"), - errors.New("i/o timeout"), - errors.New("i/o timeout"), - }} - jujutxn.SetRunnerFunc(runner, fake.new) - tries := 0 - // Doesn't matter what this returns as long as it isn't an error. - buildTxn := func(attempt int) ([]txn.Op, error) { - tries++ - // return 1 op that happens to do nothing - return []txn.Op{{}}, nil - } - err := runner.Run(buildTxn) - c.Check(err, gc.ErrorMatches, "i/o timeout") - c.Check(tries, gc.Equals, 3) -} - func (s *txnSuite) TestRunTransactionObserver(c *gc.C) { var calls []jujutxn.Transaction clock := testclock.NewClock(time.Now()) @@ -701,7 +657,7 @@ func (f *fakeRunner) new() jujutxn.TxnRunner { return f } -func (f *fakeRunner) Run([]txn.Op, bson.ObjectId, interface{}) error { +func (f *fakeRunner) Run(context.Context, []txn.Op, bson.ObjectId, interface{}) error { if len(f.durations) > 0 && f.clock != nil { f.clock.Advance(f.durations[0]) f.durations = f.durations[1:] diff --git a/txnsuite_test.go b/txnsuite_test.go index 3a302eb..faba183 100644 --- a/txnsuite_test.go +++ b/txnsuite_test.go @@ -4,6 +4,7 @@ package txn_test import ( + "context" "sync/atomic" "time" @@ -55,7 +56,7 @@ func (s *TxnSuite) TearDownTest(c *gc.C) { func (s *TxnSuite) runTxn(c *gc.C, ops ...txn.Op) bson.ObjectId { txnId := bson.NewObjectId() - err := s.runner.Run(ops, txnId, nil) + err := s.runner.Run(context.Background(), ops, txnId, nil) c.Assert(err, jc.ErrorIsNil) return txnId } @@ -81,14 +82,14 @@ func timestampBasedTxnId(timestamp time.Time) bson.ObjectId { func (s *TxnSuite) runTxnWithTimestamp(c *gc.C, expectedErr error, timestamp time.Time, ops ...txn.Op) bson.ObjectId { txnId := timestampBasedTxnId(timestamp) c.Logf("generated txn %v from timestamp %v", txnId, timestamp) - err := s.runner.Run(ops, txnId, nil) + err := s.runner.Run(context.Background(), ops, txnId, nil) c.Assert(err, gc.Equals, expectedErr) return txnId } func (s *TxnSuite) runFailingTxn(c *gc.C, expectedErr error, ops ...txn.Op) bson.ObjectId { txnId := bson.NewObjectId() - err := s.runner.Run(ops, txnId, nil) + err := s.runner.Run(context.Background(), ops, txnId, nil) c.Assert(err, gc.Equals, expectedErr) return txnId }