Skip to content

Commit

Permalink
Added better handling of errors thrown in bulk operations
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Mar 3, 2015
1 parent 0642f18 commit 2481de9
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 12 deletions.
19 changes: 13 additions & 6 deletions lib/bulk/ordered.js
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,19 @@ var executeCommands = function(self, callback) {
finalOptions.writeConcern = self.s.writeConcern;
}

if(batch.batchType == common.INSERT) {
self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
} else if(batch.batchType == common.UPDATE) {
self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
} else if(batch.batchType == common.REMOVE) {
self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
try {
if(batch.batchType == common.INSERT) {
self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
} else if(batch.batchType == common.UPDATE) {
self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
} else if(batch.batchType == common.REMOVE) {
self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
}
} catch(err) {
// Force top level error
err.ok = 0;
// Merge top level error and return
callback(null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
}
}

Expand Down
19 changes: 13 additions & 6 deletions lib/bulk/unordered.js
Original file line number Diff line number Diff line change
Expand Up @@ -419,12 +419,19 @@ var executeBatch = function(self, batch, callback) {
callback(null, mergeBatchResults(false, batch, self.s.bulkResult, err, result));
}

if(batch.batchType == common.INSERT) {
self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
} else if(batch.batchType == common.UPDATE) {
self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
} else if(batch.batchType == common.REMOVE) {
self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
try {
if(batch.batchType == common.INSERT) {
self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
} else if(batch.batchType == common.UPDATE) {
self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
} else if(batch.batchType == common.REMOVE) {
self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
}
} catch(err) {
// Force top level error
err.ok = 0;
// Merge top level error and return
callback(null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
}
}

Expand Down
59 changes: 59 additions & 0 deletions test/functional/bulk_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1143,3 +1143,62 @@ exports['should correctly return the number of operations in the bulk'] = {
});
}
}

exports['should correctly split unordered bulk batch'] = {
metadata: { requires: { topology: 'single', mongodb: '>2.5.4' }},

// The actual test we wish to run
test: function(configuration, test) {
var db = configuration.newDbInstance({w:1}, {poolSize:1, auto_reconnect:false});
db.open(function(err, db) {
var insertFirst = false;
var batchSize = 1000;
// Get the collection
var collection = db.collection('batch_write_unordered_split_test');
// Create an unordered bulk
var operation = collection.initializeUnorderedBulkOp(),
documents = [];

for(var i = 0; i < 10000; i++) {
var document = { name: 'bob' + i };
documents.push(document);
operation.insert(document);
}

operation.execute(function(err, result) {
test.equal(null, err);

operation = collection.initializeUnorderedBulkOp();

if(insertFirst) {
// if you add the inserts to the batch first, it works fine.
insertDocuments();
replaceDocuments();
} else {
// if you add the updates to the batch first, it fails with the error "insert must contain at least one document"
replaceDocuments();
insertDocuments();
}

operation.execute(function(err, result) {
test.equal(null, err);

db.close();
test.done();
});
});

function insertDocuments() {
for(i = 10000; i < 10200; i++) {
operation.insert({name: 'bob' + i});
}
}

function replaceDocuments() {
for(var i = 0; i < batchSize; i++) {
operation.find({_id: documents[i]._id}).replaceOne({name: 'joe' + i});
}
}
});
}
}

0 comments on commit 2481de9

Please sign in to comment.