From 2481de9221bf0110e9edbae2249b16e6dd8c3663 Mon Sep 17 00:00:00 2001 From: Christian Kvalheim Date: Tue, 3 Mar 2015 10:35:11 +0100 Subject: [PATCH] Added better handling of errors thrown in bulk operations --- lib/bulk/ordered.js | 19 +++++++---- lib/bulk/unordered.js | 19 +++++++---- test/functional/bulk_tests.js | 59 +++++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 12 deletions(-) diff --git a/lib/bulk/ordered.js b/lib/bulk/ordered.js index aa7849afe3..814a261ebf 100644 --- a/lib/bulk/ordered.js +++ b/lib/bulk/ordered.js @@ -405,12 +405,19 @@ var executeCommands = function(self, callback) { finalOptions.writeConcern = self.s.writeConcern; } - if(batch.batchType == common.INSERT) { - self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler); - } else if(batch.batchType == common.UPDATE) { - self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler); - } else if(batch.batchType == common.REMOVE) { - self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler); + try { + if(batch.batchType == common.INSERT) { + self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler); + } else if(batch.batchType == common.UPDATE) { + self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler); + } else if(batch.batchType == common.REMOVE) { + self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler); + } + } catch(err) { + // Force top level error + err.ok = 0; + // Merge top level error and return + callback(null, mergeBatchResults(false, batch, self.s.bulkResult, err, null)); } } diff --git a/lib/bulk/unordered.js b/lib/bulk/unordered.js index 225f498d4a..e0dac77302 100644 --- a/lib/bulk/unordered.js +++ b/lib/bulk/unordered.js @@ -419,12 +419,19 @@ var executeBatch = function(self, batch, callback) { callback(null, mergeBatchResults(false, batch, self.s.bulkResult, err, result)); } - if(batch.batchType == common.INSERT) { - self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler); - } else if(batch.batchType == common.UPDATE) { - self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler); - } else if(batch.batchType == common.REMOVE) { - self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler); + try { + if(batch.batchType == common.INSERT) { + self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler); + } else if(batch.batchType == common.UPDATE) { + self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler); + } else if(batch.batchType == common.REMOVE) { + self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler); + } + } catch(err) { + // Force top level error + err.ok = 0; + // Merge top level error and return + callback(null, mergeBatchResults(false, batch, self.s.bulkResult, err, null)); } } diff --git a/test/functional/bulk_tests.js b/test/functional/bulk_tests.js index e2be0925c0..d9f59a02fc 100644 --- a/test/functional/bulk_tests.js +++ b/test/functional/bulk_tests.js @@ -1143,3 +1143,62 @@ exports['should correctly return the number of operations in the bulk'] = { }); } } + +exports['should correctly split unordered bulk batch'] = { + metadata: { requires: { topology: 'single', mongodb: '>2.5.4' }}, + + // The actual test we wish to run + test: function(configuration, test) { + var db = configuration.newDbInstance({w:1}, {poolSize:1, auto_reconnect:false}); + db.open(function(err, db) { + var insertFirst = false; + var batchSize = 1000; + // Get the collection + var collection = db.collection('batch_write_unordered_split_test'); + // Create an unordered bulk + var operation = collection.initializeUnorderedBulkOp(), + documents = []; + + for(var i = 0; i < 10000; i++) { + var document = { name: 'bob' + i }; + documents.push(document); + operation.insert(document); + } + + operation.execute(function(err, result) { + test.equal(null, err); + + operation = collection.initializeUnorderedBulkOp(); + + if(insertFirst) { + // if you add the inserts to the batch first, it works fine. + insertDocuments(); + replaceDocuments(); + } else { + // if you add the updates to the batch first, it fails with the error "insert must contain at least one document" + replaceDocuments(); + insertDocuments(); + } + + operation.execute(function(err, result) { + test.equal(null, err); + + db.close(); + test.done(); + }); + }); + + function insertDocuments() { + for(i = 10000; i < 10200; i++) { + operation.insert({name: 'bob' + i}); + } + } + + function replaceDocuments() { + for(var i = 0; i < batchSize; i++) { + operation.find({_id: documents[i]._id}).replaceOne({name: 'joe' + i}); + } + } + }); + } +}