Skip to content

Commit

Permalink
save b4 thunder and lightning
Browse files Browse the repository at this point in the history
  • Loading branch information
jacoobes committed May 17, 2024
1 parent d3227e5 commit ca9b84b
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 66 deletions.
2 changes: 1 addition & 1 deletion src/core/module-loading.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export async function* readRecursive(dir: string): AsyncGenerator<string> {
const files = await readdir(dir, { withFileTypes: true });

for (const file of files) {
const fullPath = path.posix.resolve(dir, file.name);
const fullPath = path.posix.join(dir, file.name);
if (file.isDirectory()) {
if (!file.name.startsWith('!')) {
yield* readRecursive(fullPath);
Expand Down
19 changes: 1 addition & 18 deletions src/core/operators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,11 @@ export function filterMapTo<V>(item: () => V): OperatorFunction<boolean, V> {
interface PluginExecutable {
execute: (...args: unknown[]) => PluginResult;
};
/**
* Calls any plugin with {args}.
* @param args if an array, its spread and plugin called.
*/
export function callPlugin(plugin: PluginExecutable, args: unknown)
{
if (Array.isArray(args)) {
return plugin.execute(...args);
}
return plugin.execute(args);
}


export const arrayifySource = <T>(src: T) =>
Array.isArray(src) ? src : [src];

/**
* Checks if the stream of results is all ok.
*/
export const everyPluginOk: OperatorFunction<VoidResult, boolean> =
pipe(every(result => result.isOk()), //this shortcircuits
defaultIfEmpty(true));

export const sharedEventStream = <T>(e: Emitter, eventName: string) =>
(fromEvent(e, eventName) as Observable<T>).pipe(share());

Expand Down
12 changes: 5 additions & 7 deletions src/core/structures/default-services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,11 @@ export class Cron implements Emitter {
this.sanityCheck(eventName);
const retrievedModule = this.modules.get(eventName);
if(!retrievedModule) throw Error("Adding task: module " +eventName +"was not found");

cron.schedule(retrievedModule.pattern,
(date) => listener({ date, deps: this.deps }),
{ name: retrievedModule?.name!,
runOnInit: retrievedModule.runOnInit,
timezone: retrievedModule.timezone,
});
const { pattern, name, runOnInit, timezone } = retrievedModule;
const task = cron.schedule(pattern,
(date) => listener({ date, deps: this.deps }),
{ name, runOnInit, timezone, scheduled: true });
task.on('task-failed', console.error)
return this;
}
removeListener(eventName: string | symbol, listener: AnyFunction) {
Expand Down
64 changes: 30 additions & 34 deletions src/handlers/event-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import {
Observable,
concatMap,
filter,
from,
of,
throwError,
tap,
fromEvent, map, OperatorFunction,
catchError,
finalize,
Expand All @@ -26,7 +24,7 @@ import { CommandType } from '../core/structures/enums'
import type { Args } from '../types/utility';
import { inspect } from 'node:util'
import { disposeAll } from '../core/ioc/base';
import { arrayifySource, callPlugin, everyPluginOk, filterMapTo, handleError } from '../core/operators';
import { arrayifySource, handleError } from '../core/operators';
import { resultPayload, isAutocomplete, treeSearch } from '../core/functions'

function contextArgs(wrappable: Message | BaseInteraction, messageArgs?: string[]) {
Expand All @@ -44,12 +42,8 @@ const createResult = createResultResolver<
{ module: Processed<Module>; args: unknown[] },
unknown[]
>({
createStream: async function* ({ module, args }) {
for(const plugin of module.onEvent) {

}
//from(module.onEvent).pipe(callPlugin(args))
},
//@ts-ignore fix later
callPlugins,
onNext: ({ args }) => args,
});
/**
Expand All @@ -63,6 +57,7 @@ export function eventDispatcher(module: Module, source: unknown) {
concatMap(async args => module.execute(...args));
//@ts-ignore
return fromEvent(source, module.name!)

//@ts-ignore
.pipe(intoPayload(module),
concatMap(createResult),
Expand Down Expand Up @@ -166,17 +161,17 @@ interface ExecutePayload {
* Wraps the task in a Result as a try / catch.
* if the task is ok, an event is emitted and the stream becomes empty
* if the task is an error, throw an error down the stream which will be handled by catchError
* thank u kingomes
* @param emitter reference to SernEmitter that will emit a successful execution of module
* @param module the module that will be executed with task
* @param task the deferred execution which will be called
*/
export async function executeModule(
export function executeModule(
emitter: Emitter,
logger: Logging|undefined,
errHandler: ErrorHandling,
{ module, task, args }: ExecutePayload,
) {
const wrappedTask = await Result.wrapAsync(async () => task());
return of(module).pipe(
//converting the task into a promise so rxjs can resolve the Awaitable properly
concatMap(() => Result.wrapAsync(async () => task())),
Expand All @@ -186,8 +181,7 @@ export async function executeModule(
return EMPTY;
}
return throwError(() => resultPayload(PayloadType.Failure, module, result.error));
}),
);
}));
};

/**
Expand All @@ -206,19 +200,32 @@ export function createResultResolver<
>(config: {
onStop?: (module: T) => unknown;
onNext: (args: Args) => Output;
createStream: (args: Args) => AsyncGenerator<VoidResult>;
}) {
return (args: Args) => {
const task = config.createStream(args);
return from(task).pipe(
tap(result => {
result.isErr() && config.onStop?.(args.module);
}),
everyPluginOk,
filterMapTo(() => config.onNext(args)));
return async (args: Args) => {
//@ts-ignore fix later
const task = await callPlugins(args);
if(task.isOk()) {
return config.onNext(args) as ExecutePayload;
} else {
config.onStop?.(args.module);
}
};
};

async function callPlugins({ args, module }: { args: unknown[], module: Module }) {
let state = {};
for(const plugin of module.onEvent) {
const result = await plugin.execute.apply(null, !Array.isArray(args) ? args : args);
if(result.isErr()) {
return result
}
if(typeof result.value === 'object') {
//@ts-ignore TODO
state = { ...result.value, ...state };
}
}
return Ok(state);
}
/**
* Creates an executable task ( execute the command ) if all control plugins are successful
* @param onStop emits a failure response to the SernEmitter
Expand All @@ -230,18 +237,7 @@ export function makeModuleExecutor< M extends Processed<Module>, Args extends {
module,
args
});
return createResultResolver({
onStop,
createStream: async function* ({ args, module }) {
for(const plugin of module.onEvent) {
const result = await callPlugin(plugin, args);
if(result.isErr()) {
return result.error
}
}
},
onNext,
})
return createResultResolver({ onStop, onNext })
}

export const handleCrash = ({ "@sern/errors": err,
Expand Down
8 changes: 6 additions & 2 deletions src/handlers/interaction.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Interaction } from 'discord.js';
import { mergeMap, merge, concatMap } from 'rxjs';
import { mergeMap, merge, concatMap, EMPTY } from 'rxjs';
import { PayloadType } from '../core/structures/enums';
import { filterTap, sharedEventStream } from '../core/operators'
import { createInteractionHandler, executeModule, makeModuleExecutor } from './event-utils';
Expand All @@ -26,5 +26,9 @@ export default function interactionHandler(deps: UnpackedDependencies) {
.pipe(filterTap(e => emitter.emit('warning', resultPayload(PayloadType.Warning, undefined, e))),
concatMap(makeModuleExecutor(module =>
emitter.emit('module.activate', resultPayload(PayloadType.Failure, module, SernError.PluginFailure)))),
mergeMap(payload => executeModule(emitter, log, err, payload)));
mergeMap(payload => {
if(payload)
return executeModule(emitter, log, err, payload)
return EMPTY;
}));
}
8 changes: 6 additions & 2 deletions src/handlers/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { PayloadType, SernError } from '../core/structures/enums'
import { resultPayload } from '../core/functions'
import { filterTap, sharedEventStream } from '../core/operators'
import { UnpackedDependencies } from '../types/utility';
import { Emitter } from '..';
import type { Emitter } from '../core/interfaces';

/**
* Ignores messages from any person / bot except itself
Expand Down Expand Up @@ -40,5 +40,9 @@ export default function message(
const result = resultPayload(PayloadType.Failure, module, SernError.PluginFailure);
emitter.emit('module.activate', result);
})),
mergeMap(payload => executeModule(emitter, log, err, payload)));
mergeMap(payload => {
if(payload)
executeModule(emitter, log, err, payload)
return EMPTY;
}));
}
5 changes: 3 additions & 2 deletions test/handlers/dispatchers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import { Module } from '../../src/types/core-modules';
import { Processed } from '../../src/types/core-modules';
import { CommandType } from '../../src/core/structures/enums';
import { EventEmitter } from 'events';
import { EventType } from '../../dist/core/structures/enums';

function createRandomModule(): Processed<Module> {
return {
type: faker.number.int({
min: CommandType.Text,
max: CommandType.ChannelSelect,
min: EventType.Discord,
max: EventType.Cron,
}),
meta: { id:"", absPath: faker.system.directoryPath() },
description: faker.string.alpha(),
Expand Down

0 comments on commit ca9b84b

Please sign in to comment.