From b00bc037917af6aafc1635ddb178888bd0ebbb67 Mon Sep 17 00:00:00 2001 From: Cosimo Matteini Date: Mon, 11 Nov 2024 17:07:37 +0100 Subject: [PATCH] Collection+DocumentCollection: add aggregate --- .../effect-mongodb/src/AggregationCursor.ts | 37 +++++++++++ packages/effect-mongodb/src/Collection.ts | 30 +++++++++ .../src/DocumentAggregationCursor.ts | 30 +++++++++ .../effect-mongodb/src/DocumentCollection.ts | 26 ++++++++ packages/effect-mongodb/src/FindCursor.ts | 2 + packages/effect-mongodb/src/index.ts | 10 +++ .../effect-mongodb/test/Collection.test.ts | 62 +++++++++++++++++++ 7 files changed, 197 insertions(+) create mode 100644 packages/effect-mongodb/src/AggregationCursor.ts create mode 100644 packages/effect-mongodb/src/DocumentAggregationCursor.ts diff --git a/packages/effect-mongodb/src/AggregationCursor.ts b/packages/effect-mongodb/src/AggregationCursor.ts new file mode 100644 index 0000000..83c7ff8 --- /dev/null +++ b/packages/effect-mongodb/src/AggregationCursor.ts @@ -0,0 +1,37 @@ +/** + * @since 0.0.1 + */ +import type * as ParseResult from "@effect/schema/ParseResult" +import * as Schema from "@effect/schema/Schema" +import * as Data from "effect/Data" +import * as Effect from "effect/Effect" +import * as F from "effect/Function" +import * as Stream from "effect/Stream" +import type { AggregationCursor as MongoAggregationCursor } from "mongodb" +import * as MongoError from "./MongoError.js" + +export class AggregationCursor extends Data.TaggedClass("AggregationCursor")< + { cursor: MongoAggregationCursor; schema: Schema.Schema } +> { +} + +export const toArray = ( + cursor: AggregationCursor +): Effect.Effect, MongoError.MongoError | ParseResult.ParseError, R> => { + const decode = Schema.decodeUnknown(cursor.schema) + return Effect.tryPromise({ try: () => cursor.cursor.toArray(), catch: F.identity }).pipe( + Effect.catchAll(MongoError.mongoErrorDie>("Unable to get array from mongodb aggregate cursor")), + Effect.flatMap(Effect.forEach((x) => decode(x))) + ) +} + +export const toStream = ( + cursor: AggregationCursor +): Stream.Stream => { + const decode = Schema.decodeUnknown(cursor.schema) + return F.pipe( + Stream.fromAsyncIterable(cursor.cursor, F.identity), + Stream.catchAll(MongoError.mongoErrorStream("Unable to get stream from mongodb aggregate cursor")), + Stream.mapEffect((x) => decode(x)) + ) +} diff --git a/packages/effect-mongodb/src/Collection.ts b/packages/effect-mongodb/src/Collection.ts index 7c71d4a..8e7f160 100644 --- a/packages/effect-mongodb/src/Collection.ts +++ b/packages/effect-mongodb/src/Collection.ts @@ -8,6 +8,7 @@ import * as Effect from "effect/Effect" import * as F from "effect/Function" import * as O from "effect/Option" import type { + AggregateOptions, BulkWriteOptions, Collection as MongoCollection, CreateIndexesOptions, @@ -25,6 +26,7 @@ import type { UpdateOptions, UpdateResult } from "mongodb" +import * as AggregationCursor from "./AggregationCursor.js" import * as FindCursor from "./FindCursor.js" import type { Filter } from "./internal/filter.js" import * as MongoError from "./MongoError.js" @@ -299,4 +301,32 @@ export const createIndexes: { ) ) +export const aggregate: { + ( + pipeline: Array, + schema: Schema.Schema, + options?: AggregateOptions + ): ( + collection: Collection + ) => AggregationCursor.AggregationCursor + ( + collection: Collection, + pipeline: Array, + schema: Schema.Schema, + options?: AggregateOptions + ): AggregationCursor.AggregationCursor +} = F.dual( + (args) => isCollection(args[0]), + ( + collection: Collection, + pipeline: Array, + schema: Schema.Schema, + options?: AggregateOptions + ): AggregationCursor.AggregationCursor => + new AggregationCursor.AggregationCursor({ + cursor: collection.collection.aggregate(pipeline, options), + schema + }) +) + const isCollection = (x: unknown) => x instanceof Collection diff --git a/packages/effect-mongodb/src/DocumentAggregationCursor.ts b/packages/effect-mongodb/src/DocumentAggregationCursor.ts new file mode 100644 index 0000000..00b9827 --- /dev/null +++ b/packages/effect-mongodb/src/DocumentAggregationCursor.ts @@ -0,0 +1,30 @@ +/** + * @since 0.0.1 + */ +import * as Data from "effect/Data" +import * as Effect from "effect/Effect" +import * as F from "effect/Function" +import * as Stream from "effect/Stream" +import type { AggregationCursor as MongoAggregationCursor } from "mongodb" +import * as MongoError from "./MongoError.js" + +export class DocumentAggregationCursor + extends Data.TaggedClass("DocumentAggregationCursor")<{ cursor: MongoAggregationCursor }> +{ +} + +export const toArray = ( + cursor: DocumentAggregationCursor +): Effect.Effect, MongoError.MongoError> => + F.pipe( + Effect.tryPromise({ try: () => cursor.cursor.toArray(), catch: F.identity }), + Effect.catchAll(MongoError.mongoErrorDie>(`${cursor._tag}.toArray error`)) + ) + +export const toStream = ( + cursor: DocumentAggregationCursor +): Stream.Stream => + F.pipe( + Stream.fromAsyncIterable(cursor.cursor, F.identity), + Stream.catchAll(MongoError.mongoErrorStream(`${cursor._tag}.toStream error`)) + ) diff --git a/packages/effect-mongodb/src/DocumentCollection.ts b/packages/effect-mongodb/src/DocumentCollection.ts index 1a754ad..66bf333 100644 --- a/packages/effect-mongodb/src/DocumentCollection.ts +++ b/packages/effect-mongodb/src/DocumentCollection.ts @@ -7,6 +7,7 @@ import * as Effect from "effect/Effect" import * as F from "effect/Function" import * as O from "effect/Option" import type { + AggregateOptions, BulkWriteOptions, Collection as MongoCollection, CreateIndexesOptions, @@ -28,6 +29,7 @@ import type { WithoutId } from "mongodb" import * as Collection from "./Collection.js" +import * as DocumentAggregationCursor from "./DocumentAggregationCursor.js" import * as DocumentFindCursor from "./DocumentFindCursor.js" import * as MongoError from "./MongoError.js" @@ -278,6 +280,30 @@ export const createIndexes: { ) ) +export const aggregate: { + ( + pipeline?: Array, + options?: AggregateOptions + ): ( + collection: DocumentCollection + ) => DocumentAggregationCursor.DocumentAggregationCursor + ( + collection: DocumentCollection, + pipeline?: Array, + options?: AggregateOptions + ): DocumentAggregationCursor.DocumentAggregationCursor +} = F.dual( + (args) => isDocumentCollection(args[0]), + ( + collection: DocumentCollection, + pipeline?: Array, + options?: AggregateOptions + ): DocumentAggregationCursor.DocumentAggregationCursor => + new DocumentAggregationCursor.DocumentAggregationCursor({ + cursor: collection.collection.aggregate(pipeline, options) + }) +) + export const typed: { ( schema: Schema.Schema diff --git a/packages/effect-mongodb/src/FindCursor.ts b/packages/effect-mongodb/src/FindCursor.ts index eea2756..c3df419 100644 --- a/packages/effect-mongodb/src/FindCursor.ts +++ b/packages/effect-mongodb/src/FindCursor.ts @@ -82,6 +82,7 @@ export const limit: { new FindCursor({ cursor: cursor.cursor.limit(value), schema: cursor.schema }) ) +// TODO: add explicit return type for public API export const toArray = (cursor: FindCursor) => { const decode = Schema.decodeUnknown(cursor.schema) return Effect.tryPromise({ try: () => cursor.cursor.toArray(), catch: F.identity }).pipe( @@ -90,6 +91,7 @@ export const toArray = (cursor: FindCursor) => { ) } +// TODO: add explicit return type for public API export const toArrayEither = (cursor: FindCursor) => { const decode = Schema.decodeUnknown(cursor.schema) return Effect.tryPromise({ try: () => cursor.cursor.toArray(), catch: F.identity }).pipe( diff --git a/packages/effect-mongodb/src/index.ts b/packages/effect-mongodb/src/index.ts index 3bf3bf4..6c4e9c8 100644 --- a/packages/effect-mongodb/src/index.ts +++ b/packages/effect-mongodb/src/index.ts @@ -1,3 +1,8 @@ +/** + * @since 0.0.1 + */ +export * as AggregationCursor from "./AggregationCursor.js" + /** * @since 0.0.1 */ @@ -8,6 +13,11 @@ export * as Collection from "./Collection.js" */ export * as Db from "./Db.js" +/** + * @since 0.0.1 + */ +export * as DocumentAggregationCursor from "./DocumentAggregationCursor.js" + /** * @since 0.0.1 */ diff --git a/packages/effect-mongodb/test/Collection.test.ts b/packages/effect-mongodb/test/Collection.test.ts index 1c74c8d..c1d58c8 100644 --- a/packages/effect-mongodb/test/Collection.test.ts +++ b/packages/effect-mongodb/test/Collection.test.ts @@ -1,4 +1,5 @@ import * as Schema from "@effect/schema/Schema" +import * as AggregationCursor from "effect-mongodb/AggregationCursor" import * as Collection from "effect-mongodb/Collection" import * as Db from "effect-mongodb/Db" import * as Effect from "effect/Effect" @@ -25,9 +26,70 @@ describeMongo("Collection", (ctx) => { expect(result).toEqual(O.some(user)) }) + + test("aggregate", async () => { + const user1 = User.make({ name: "user1", birthday: new Date(1977, 11, 27) }) + const user2 = User.make({ name: "user2", birthday: new Date(1977, 11, 27) }) + const user3 = User.make({ name: "user3", birthday: new Date(1985, 6, 16) }) + const user4 = User.make({ name: "user4", birthday: new Date(1989, 11, 28) }) + const user5 = User.make({ name: "user5", birthday: new Date(1995, 3, 21) }) + const user6 = User.make({ name: "user6", birthday: new Date(2000, 5, 30) }) + + const program = Effect.gen(function*(_) { + const db = yield* _(ctx.database) + const collection = Db.collection(db, "aggregate", User) + + yield* _( + Collection.insertMany(collection, [user1, user2, user3, user4, user5, user6]) + ) + + const _1990 = "1990-01-01T00:00:00.000Z" + return yield* _( + Collection.aggregate(collection, [ + { + $match: { + birthday: { $lt: _1990 } + } + }, + { + $group: { + _id: "$birthday", + names: { $addToSet: "$name" } + } + }, + { + $sort: { _id: 1 } + } + ], UserAggregation), + AggregationCursor.toArray + ) + }) + + const result = await Effect.runPromise(program) + + expect(result).toEqual([ + { + _id: new Date(1977, 11, 27), + names: expect.arrayContaining(["user1", "user2"]) + }, + { + _id: new Date(1985, 6, 16), + names: ["user3"] + }, + { + _id: new Date(1989, 11, 28), + names: ["user4"] + } + ]) + }) }) const User = Schema.Struct({ name: Schema.String, birthday: Schema.Date }) + +const UserAggregation = Schema.Struct({ + _id: Schema.Date, + names: Schema.Array(Schema.String) +})