A Hazelcast Jet connector for MongoDB which enables Hazelcast Jet pipelines to read/write data points from/to MongoDB.
Attribute | Value |
---|---|
Has Source | Yes |
Batch | Yes |
Stream | Yes |
Distributed | No |
Attribute | Value |
---|---|
Has Sink | Yes |
Distributed | Yes |
The MongoDB Connector artifacts are published on the Maven repositories.
Add the following lines to your pom.xml to include it as a dependency to your project:
<dependency>
<groupId>com.hazelcast.jet.contrib</groupId>
<artifactId>mongodb</artifactId>
<version>${version}</version>
</dependency>
or if you are using Gradle:
compile group: 'com.hazelcast.jet.contrib', name: 'mongodb', version: ${version}
MongoDB batch source (MongoDBSources.mongodb()
) executes the
query and emits the results as they arrive.
Here's an example which queries documents in a collection having the field
age
with a value greater than 10
and applies a projection so that only
the age
field is returned in the emitted document.
BatchSource<Document> batchSource =
MongoDBSources.batch(
"batch-source",
"mongodb://127.0.0.1:27017",
"myDatabase",
"myCollection",
new Document("age", new Document("$gt", 10)),
new Document("age", 1)
);
Pipeline p = Pipeline.create();
BatchStage<Document> srcStage = p.readFrom(batchSource);
For more detail check out MongoDBSources, MongoDBSourceBuilder and MongoDBSourceTest.
MongoDB stream source (MongoDBSources.streamMongodb()
) watches the changes to
documents in a collection and emits these changes as they arrive. Source uses
( ChangeStreamDocument.getClusterTime()
) as native timestamp.
Change stream is available for replica sets and sharded clusters that use WiredTiger storage engine and replica set protocol version 1 (pv1). Change streams can also be used on deployments which employ MongoDB's encryption-at-rest feature. Without enabling change streams, the source will not work. See MongoDB Change Streams for more information.
You can watch the changes on a single collection, on all the collections in a single database or on all collections across all databases. You cannot watch on system collections and collections in admin, local and config databases.
Following is an example pipeline which watches changes on myCollection
.
Source filters the changes so that only insert
s which has the val
field
greater than or equal to 10
will be fetched, applies the projection so that
only the val
and _id
fields are returned.
Here's an example which streams inserts on a collection having the field age
with a value greater than 10
and applies a projection so that only the age
field is returned in the emitted document.
StreamSource<? extends Document> streamSource =
MongoDBSources.stream(
"stream-source",
"mongodb://127.0.0.1:27017",
"myDatabase",
"myCollection",
new Document("fullDocument.age", new Document("$gt", 10))
.append("operationType", "insert"),
new Document("fullDocument.age", 1)
);
Pipeline p = Pipeline.create();
StreamSourceStage<? extends Document> srcStage = p.readFrom(streamSource);
For more detail check out MongoDBSources, MongoDBSourceBuilder and MongoDBSourceTest.
MongoDB sink (MongoDBSinks.mongodb()
) is used to write documents from
Hazelcast Jet Pipeline to MongoDB.
Following is an example pipeline which reads out items from Hazelcast
List, maps them to Document
instances and writes them to MongoDB.
Pipeline p = Pipeline.create();
p.readFrom(Sources.list(list))
.map(i -> new Document("key", i))
.writeTo(
MongoDBSinks.mongodb(
"sink",
"mongodb://localhost:27017",
"myDatabase",
"myCollection"
)
);
For more detail check out MongoDBSinks, MongoDBSinkBuilder and MongoDBSinkTest.
MongoDB stream source saves the resume-token of the last emitted item as a state to the snapshot. In case of a job restarted, source will resume from the resume-token.
To run the tests run the command below:
./gradlew test