From ca9b84ba21d590e62fe594b13be4a621b54176d0 Mon Sep 17 00:00:00 2001 From: Jacob Nguyen <76754747+jacoobes@users.noreply.github.com> Date: Thu, 16 May 2024 23:41:44 -0500 Subject: [PATCH] save b4 thunder and lightning --- src/core/module-loading.ts | 2 +- src/core/operators.ts | 19 +------- src/core/structures/default-services.ts | 12 ++--- src/handlers/event-utils.ts | 64 ++++++++++++------------- src/handlers/interaction.ts | 8 +++- src/handlers/message.ts | 8 +++- test/handlers/dispatchers.test.ts | 5 +- 7 files changed, 52 insertions(+), 66 deletions(-) diff --git a/src/core/module-loading.ts b/src/core/module-loading.ts index 35bd9aa4..4fd2d831 100644 --- a/src/core/module-loading.ts +++ b/src/core/module-loading.ts @@ -59,7 +59,7 @@ export async function* readRecursive(dir: string): AsyncGenerator { const files = await readdir(dir, { withFileTypes: true }); for (const file of files) { - const fullPath = path.posix.resolve(dir, file.name); + const fullPath = path.posix.join(dir, file.name); if (file.isDirectory()) { if (!file.name.startsWith('!')) { yield* readRecursive(fullPath); diff --git a/src/core/operators.ts b/src/core/operators.ts index 31780a99..2e08b2c9 100644 --- a/src/core/operators.ts +++ b/src/core/operators.ts @@ -31,28 +31,11 @@ export function filterMapTo(item: () => V): OperatorFunction { interface PluginExecutable { execute: (...args: unknown[]) => PluginResult; }; -/** - * Calls any plugin with {args}. - * @param args if an array, its spread and plugin called. - */ -export function callPlugin(plugin: PluginExecutable, args: unknown) -{ - if (Array.isArray(args)) { - return plugin.execute(...args); - } - return plugin.execute(args); -} + export const arrayifySource = (src: T) => Array.isArray(src) ? src : [src]; -/** - * Checks if the stream of results is all ok. - */ -export const everyPluginOk: OperatorFunction = - pipe(every(result => result.isOk()), //this shortcircuits - defaultIfEmpty(true)); - export const sharedEventStream = (e: Emitter, eventName: string) => (fromEvent(e, eventName) as Observable).pipe(share()); diff --git a/src/core/structures/default-services.ts b/src/core/structures/default-services.ts index 95e2b47c..e30a6f5b 100644 --- a/src/core/structures/default-services.ts +++ b/src/core/structures/default-services.ts @@ -65,13 +65,11 @@ export class Cron implements Emitter { this.sanityCheck(eventName); const retrievedModule = this.modules.get(eventName); if(!retrievedModule) throw Error("Adding task: module " +eventName +"was not found"); - - cron.schedule(retrievedModule.pattern, - (date) => listener({ date, deps: this.deps }), - { name: retrievedModule?.name!, - runOnInit: retrievedModule.runOnInit, - timezone: retrievedModule.timezone, - }); + const { pattern, name, runOnInit, timezone } = retrievedModule; + const task = cron.schedule(pattern, + (date) => listener({ date, deps: this.deps }), + { name, runOnInit, timezone, scheduled: true }); + task.on('task-failed', console.error) return this; } removeListener(eventName: string | symbol, listener: AnyFunction) { diff --git a/src/handlers/event-utils.ts b/src/handlers/event-utils.ts index 2ea4b193..1c79e17a 100644 --- a/src/handlers/event-utils.ts +++ b/src/handlers/event-utils.ts @@ -4,10 +4,8 @@ import { Observable, concatMap, filter, - from, of, throwError, - tap, fromEvent, map, OperatorFunction, catchError, finalize, @@ -26,7 +24,7 @@ import { CommandType } from '../core/structures/enums' import type { Args } from '../types/utility'; import { inspect } from 'node:util' import { disposeAll } from '../core/ioc/base'; -import { arrayifySource, callPlugin, everyPluginOk, filterMapTo, handleError } from '../core/operators'; +import { arrayifySource, handleError } from '../core/operators'; import { resultPayload, isAutocomplete, treeSearch } from '../core/functions' function contextArgs(wrappable: Message | BaseInteraction, messageArgs?: string[]) { @@ -44,12 +42,8 @@ const createResult = createResultResolver< { module: Processed; args: unknown[] }, unknown[] >({ - createStream: async function* ({ module, args }) { - for(const plugin of module.onEvent) { - - } - //from(module.onEvent).pipe(callPlugin(args)) - }, + //@ts-ignore fix later + callPlugins, onNext: ({ args }) => args, }); /** @@ -63,6 +57,7 @@ export function eventDispatcher(module: Module, source: unknown) { concatMap(async args => module.execute(...args)); //@ts-ignore return fromEvent(source, module.name!) + //@ts-ignore .pipe(intoPayload(module), concatMap(createResult), @@ -166,17 +161,17 @@ interface ExecutePayload { * Wraps the task in a Result as a try / catch. * if the task is ok, an event is emitted and the stream becomes empty * if the task is an error, throw an error down the stream which will be handled by catchError + * thank u kingomes * @param emitter reference to SernEmitter that will emit a successful execution of module * @param module the module that will be executed with task * @param task the deferred execution which will be called */ -export async function executeModule( +export function executeModule( emitter: Emitter, logger: Logging|undefined, errHandler: ErrorHandling, { module, task, args }: ExecutePayload, ) { - const wrappedTask = await Result.wrapAsync(async () => task()); return of(module).pipe( //converting the task into a promise so rxjs can resolve the Awaitable properly concatMap(() => Result.wrapAsync(async () => task())), @@ -186,8 +181,7 @@ export async function executeModule( return EMPTY; } return throwError(() => resultPayload(PayloadType.Failure, module, result.error)); - }), - ); + })); }; /** @@ -206,19 +200,32 @@ export function createResultResolver< >(config: { onStop?: (module: T) => unknown; onNext: (args: Args) => Output; - createStream: (args: Args) => AsyncGenerator; }) { - return (args: Args) => { - const task = config.createStream(args); - return from(task).pipe( - tap(result => { - result.isErr() && config.onStop?.(args.module); - }), - everyPluginOk, - filterMapTo(() => config.onNext(args))); + return async (args: Args) => { + //@ts-ignore fix later + const task = await callPlugins(args); + if(task.isOk()) { + return config.onNext(args) as ExecutePayload; + } else { + config.onStop?.(args.module); + } }; }; +async function callPlugins({ args, module }: { args: unknown[], module: Module }) { + let state = {}; + for(const plugin of module.onEvent) { + const result = await plugin.execute.apply(null, !Array.isArray(args) ? args : args); + if(result.isErr()) { + return result + } + if(typeof result.value === 'object') { + //@ts-ignore TODO + state = { ...result.value, ...state }; + } + } + return Ok(state); +} /** * Creates an executable task ( execute the command ) if all control plugins are successful * @param onStop emits a failure response to the SernEmitter @@ -230,18 +237,7 @@ export function makeModuleExecutor< M extends Processed, Args extends { module, args }); - return createResultResolver({ - onStop, - createStream: async function* ({ args, module }) { - for(const plugin of module.onEvent) { - const result = await callPlugin(plugin, args); - if(result.isErr()) { - return result.error - } - } - }, - onNext, - }) + return createResultResolver({ onStop, onNext }) } export const handleCrash = ({ "@sern/errors": err, diff --git a/src/handlers/interaction.ts b/src/handlers/interaction.ts index 9adf3fb5..db1f4354 100644 --- a/src/handlers/interaction.ts +++ b/src/handlers/interaction.ts @@ -1,5 +1,5 @@ import type { Interaction } from 'discord.js'; -import { mergeMap, merge, concatMap } from 'rxjs'; +import { mergeMap, merge, concatMap, EMPTY } from 'rxjs'; import { PayloadType } from '../core/structures/enums'; import { filterTap, sharedEventStream } from '../core/operators' import { createInteractionHandler, executeModule, makeModuleExecutor } from './event-utils'; @@ -26,5 +26,9 @@ export default function interactionHandler(deps: UnpackedDependencies) { .pipe(filterTap(e => emitter.emit('warning', resultPayload(PayloadType.Warning, undefined, e))), concatMap(makeModuleExecutor(module => emitter.emit('module.activate', resultPayload(PayloadType.Failure, module, SernError.PluginFailure)))), - mergeMap(payload => executeModule(emitter, log, err, payload))); + mergeMap(payload => { + if(payload) + return executeModule(emitter, log, err, payload) + return EMPTY; + })); } diff --git a/src/handlers/message.ts b/src/handlers/message.ts index 26023474..f0563f06 100644 --- a/src/handlers/message.ts +++ b/src/handlers/message.ts @@ -5,7 +5,7 @@ import { PayloadType, SernError } from '../core/structures/enums' import { resultPayload } from '../core/functions' import { filterTap, sharedEventStream } from '../core/operators' import { UnpackedDependencies } from '../types/utility'; -import { Emitter } from '..'; +import type { Emitter } from '../core/interfaces'; /** * Ignores messages from any person / bot except itself @@ -40,5 +40,9 @@ export default function message( const result = resultPayload(PayloadType.Failure, module, SernError.PluginFailure); emitter.emit('module.activate', result); })), - mergeMap(payload => executeModule(emitter, log, err, payload))); + mergeMap(payload => { + if(payload) + executeModule(emitter, log, err, payload) + return EMPTY; + })); } diff --git a/test/handlers/dispatchers.test.ts b/test/handlers/dispatchers.test.ts index dd62d52e..c8bb8ad8 100644 --- a/test/handlers/dispatchers.test.ts +++ b/test/handlers/dispatchers.test.ts @@ -5,12 +5,13 @@ import { Module } from '../../src/types/core-modules'; import { Processed } from '../../src/types/core-modules'; import { CommandType } from '../../src/core/structures/enums'; import { EventEmitter } from 'events'; +import { EventType } from '../../dist/core/structures/enums'; function createRandomModule(): Processed { return { type: faker.number.int({ - min: CommandType.Text, - max: CommandType.ChannelSelect, + min: EventType.Discord, + max: EventType.Cron, }), meta: { id:"", absPath: faker.system.directoryPath() }, description: faker.string.alpha(),