Skip to content

Commit

Permalink
Collection+DocumentCollection: add aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
devmatteini committed Nov 11, 2024
1 parent efa6c05 commit b00bc03
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 0 deletions.
37 changes: 37 additions & 0 deletions packages/effect-mongodb/src/AggregationCursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* @since 0.0.1
*/
import type * as ParseResult from "@effect/schema/ParseResult"
import * as Schema from "@effect/schema/Schema"
import * as Data from "effect/Data"
import * as Effect from "effect/Effect"
import * as F from "effect/Function"
import * as Stream from "effect/Stream"
import type { AggregationCursor as MongoAggregationCursor } from "mongodb"
import * as MongoError from "./MongoError.js"

export class AggregationCursor<A, I = A, R = never> extends Data.TaggedClass("AggregationCursor")<
{ cursor: MongoAggregationCursor<unknown>; schema: Schema.Schema<A, I, R> }
> {
}

export const toArray = <A, I, R>(
cursor: AggregationCursor<A, I, R>
): Effect.Effect<ReadonlyArray<A>, MongoError.MongoError | ParseResult.ParseError, R> => {
const decode = Schema.decodeUnknown(cursor.schema)
return Effect.tryPromise({ try: () => cursor.cursor.toArray(), catch: F.identity }).pipe(
Effect.catchAll(MongoError.mongoErrorDie<ReadonlyArray<A>>("Unable to get array from mongodb aggregate cursor")),
Effect.flatMap(Effect.forEach((x) => decode(x)))
)
}

export const toStream = <A, I, R>(
cursor: AggregationCursor<A, I, R>
): Stream.Stream<A, MongoError.MongoError | ParseResult.ParseError, R> => {
const decode = Schema.decodeUnknown(cursor.schema)
return F.pipe(
Stream.fromAsyncIterable(cursor.cursor, F.identity),
Stream.catchAll(MongoError.mongoErrorStream<A>("Unable to get stream from mongodb aggregate cursor")),
Stream.mapEffect((x) => decode(x))
)
}
30 changes: 30 additions & 0 deletions packages/effect-mongodb/src/Collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as Effect from "effect/Effect"
import * as F from "effect/Function"
import * as O from "effect/Option"
import type {
AggregateOptions,
BulkWriteOptions,
Collection as MongoCollection,
CreateIndexesOptions,
Expand All @@ -25,6 +26,7 @@ import type {
UpdateOptions,
UpdateResult
} from "mongodb"
import * as AggregationCursor from "./AggregationCursor.js"
import * as FindCursor from "./FindCursor.js"
import type { Filter } from "./internal/filter.js"
import * as MongoError from "./MongoError.js"
Expand Down Expand Up @@ -299,4 +301,32 @@ export const createIndexes: {
)
)

export const aggregate: {
<B extends Document, BI extends Document, BR>(
pipeline: Array<Document>,
schema: Schema.Schema<B, BI, BR>,
options?: AggregateOptions
): <A extends Document, I extends Document, R>(
collection: Collection<A, I, R>
) => AggregationCursor.AggregationCursor<B, BI, BR>
<A extends Document, I extends Document, R, B extends Document, BI extends Document, BR>(
collection: Collection<A, I, R>,
pipeline: Array<Document>,
schema: Schema.Schema<B, BI, BR>,
options?: AggregateOptions
): AggregationCursor.AggregationCursor<B, BI, BR>
} = F.dual(
(args) => isCollection(args[0]),
<A extends Document, I extends Document, R, B extends Document, BI extends Document, BR>(
collection: Collection<A, I, R>,
pipeline: Array<Document>,
schema: Schema.Schema<B, BI, BR>,
options?: AggregateOptions
): AggregationCursor.AggregationCursor<B, BI, BR> =>
new AggregationCursor.AggregationCursor<B, BI, BR>({
cursor: collection.collection.aggregate(pipeline, options),
schema
})
)

const isCollection = (x: unknown) => x instanceof Collection
30 changes: 30 additions & 0 deletions packages/effect-mongodb/src/DocumentAggregationCursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* @since 0.0.1
*/
import * as Data from "effect/Data"
import * as Effect from "effect/Effect"
import * as F from "effect/Function"
import * as Stream from "effect/Stream"
import type { AggregationCursor as MongoAggregationCursor } from "mongodb"
import * as MongoError from "./MongoError.js"

export class DocumentAggregationCursor
extends Data.TaggedClass("DocumentAggregationCursor")<{ cursor: MongoAggregationCursor<Document> }>
{
}

export const toArray = (
cursor: DocumentAggregationCursor
): Effect.Effect<ReadonlyArray<Document>, MongoError.MongoError> =>
F.pipe(
Effect.tryPromise({ try: () => cursor.cursor.toArray(), catch: F.identity }),
Effect.catchAll(MongoError.mongoErrorDie<ReadonlyArray<Document>>(`${cursor._tag}.toArray error`))
)

export const toStream = (
cursor: DocumentAggregationCursor
): Stream.Stream<Document, MongoError.MongoError> =>
F.pipe(
Stream.fromAsyncIterable(cursor.cursor, F.identity),
Stream.catchAll(MongoError.mongoErrorStream<Document>(`${cursor._tag}.toStream error`))
)
26 changes: 26 additions & 0 deletions packages/effect-mongodb/src/DocumentCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as Effect from "effect/Effect"
import * as F from "effect/Function"
import * as O from "effect/Option"
import type {
AggregateOptions,
BulkWriteOptions,
Collection as MongoCollection,
CreateIndexesOptions,
Expand All @@ -28,6 +29,7 @@ import type {
WithoutId
} from "mongodb"
import * as Collection from "./Collection.js"
import * as DocumentAggregationCursor from "./DocumentAggregationCursor.js"
import * as DocumentFindCursor from "./DocumentFindCursor.js"
import * as MongoError from "./MongoError.js"

Expand Down Expand Up @@ -278,6 +280,30 @@ export const createIndexes: {
)
)

export const aggregate: {
(
pipeline?: Array<Document>,
options?: AggregateOptions
): (
collection: DocumentCollection
) => DocumentAggregationCursor.DocumentAggregationCursor
(
collection: DocumentCollection,
pipeline?: Array<Document>,
options?: AggregateOptions
): DocumentAggregationCursor.DocumentAggregationCursor
} = F.dual(
(args) => isDocumentCollection(args[0]),
(
collection: DocumentCollection,
pipeline?: Array<Document>,
options?: AggregateOptions
): DocumentAggregationCursor.DocumentAggregationCursor =>
new DocumentAggregationCursor.DocumentAggregationCursor({
cursor: collection.collection.aggregate(pipeline, options)
})
)

export const typed: {
<A extends Document, I extends Document = A, R = never>(
schema: Schema.Schema<A, I, R>
Expand Down
2 changes: 2 additions & 0 deletions packages/effect-mongodb/src/FindCursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export const limit: {
new FindCursor({ cursor: cursor.cursor.limit(value), schema: cursor.schema })
)

// TODO: add explicit return type for public API
export const toArray = <A, I, R>(cursor: FindCursor<A, I, R>) => {
const decode = Schema.decodeUnknown(cursor.schema)
return Effect.tryPromise({ try: () => cursor.cursor.toArray(), catch: F.identity }).pipe(
Expand All @@ -90,6 +91,7 @@ export const toArray = <A, I, R>(cursor: FindCursor<A, I, R>) => {
)
}

// TODO: add explicit return type for public API
export const toArrayEither = <A, I, R>(cursor: FindCursor<A, I, R>) => {
const decode = Schema.decodeUnknown(cursor.schema)
return Effect.tryPromise({ try: () => cursor.cursor.toArray(), catch: F.identity }).pipe(
Expand Down
10 changes: 10 additions & 0 deletions packages/effect-mongodb/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/**
* @since 0.0.1
*/
export * as AggregationCursor from "./AggregationCursor.js"

/**
* @since 0.0.1
*/
Expand All @@ -8,6 +13,11 @@ export * as Collection from "./Collection.js"
*/
export * as Db from "./Db.js"

/**
* @since 0.0.1
*/
export * as DocumentAggregationCursor from "./DocumentAggregationCursor.js"

/**
* @since 0.0.1
*/
Expand Down
62 changes: 62 additions & 0 deletions packages/effect-mongodb/test/Collection.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as Schema from "@effect/schema/Schema"
import * as AggregationCursor from "effect-mongodb/AggregationCursor"
import * as Collection from "effect-mongodb/Collection"
import * as Db from "effect-mongodb/Db"
import * as Effect from "effect/Effect"
Expand All @@ -25,9 +26,70 @@ describeMongo("Collection", (ctx) => {

expect(result).toEqual(O.some(user))
})

test("aggregate", async () => {
const user1 = User.make({ name: "user1", birthday: new Date(1977, 11, 27) })
const user2 = User.make({ name: "user2", birthday: new Date(1977, 11, 27) })
const user3 = User.make({ name: "user3", birthday: new Date(1985, 6, 16) })
const user4 = User.make({ name: "user4", birthday: new Date(1989, 11, 28) })
const user5 = User.make({ name: "user5", birthday: new Date(1995, 3, 21) })
const user6 = User.make({ name: "user6", birthday: new Date(2000, 5, 30) })

const program = Effect.gen(function*(_) {
const db = yield* _(ctx.database)
const collection = Db.collection(db, "aggregate", User)

yield* _(
Collection.insertMany(collection, [user1, user2, user3, user4, user5, user6])
)

const _1990 = "1990-01-01T00:00:00.000Z"
return yield* _(
Collection.aggregate(collection, [
{
$match: {
birthday: { $lt: _1990 }
}
},
{
$group: {
_id: "$birthday",
names: { $addToSet: "$name" }
}
},
{
$sort: { _id: 1 }
}
], UserAggregation),
AggregationCursor.toArray
)
})

const result = await Effect.runPromise(program)

expect(result).toEqual([
{
_id: new Date(1977, 11, 27),
names: expect.arrayContaining(["user1", "user2"])
},
{
_id: new Date(1985, 6, 16),
names: ["user3"]
},
{
_id: new Date(1989, 11, 28),
names: ["user4"]
}
])
})
})

const User = Schema.Struct({
name: Schema.String,
birthday: Schema.Date
})

const UserAggregation = Schema.Struct({
_id: Schema.Date,
names: Schema.Array(Schema.String)
})

0 comments on commit b00bc03

Please sign in to comment.