Skip to content

Commit

Permalink
add deps to plugin calls and execute
Browse files Browse the repository at this point in the history
  • Loading branch information
jacoobes committed May 20, 2024
1 parent 699adf2 commit 0beeb4c
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 102 deletions.
1 change: 0 additions & 1 deletion src/core/operators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export function filterMapTo<V>(item: () => V): OperatorFunction<boolean, V> {
return concatMap(keep => keep ? of(item()) : EMPTY);
}


export const arrayifySource = <T>(src: T) =>
Array.isArray(src) ? src : [src];

Expand Down
2 changes: 1 addition & 1 deletion src/core/structures/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export class Context extends CoreContext<Message, ChatInputCommandInteraction> {
);
}

static override wrap(wrappable: BaseInteraction | Message, prefix?: string): Context {
static wrap(wrappable: BaseInteraction | Message, prefix?: string): Context {
if ('interaction' in wrappable) {
return new Context(Ok(wrappable), prefix);
}
Expand Down
6 changes: 0 additions & 6 deletions src/core/structures/core-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,4 @@ export abstract class CoreContext<M, I> {
public isSlash(): this is CoreContext<never, I> {
return !this.isMessage();
}
//todo: add agnostic options resolver for Context
abstract get options(): unknown;

static wrap(_: unknown): unknown {
throw Error('You need to override this method; cannot wrap an abstract class');
}
}
7 changes: 3 additions & 4 deletions src/core/structures/default-services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,9 @@ export class Cron implements Emitter {
const retrievedModule = this.modules.get(eventName);
if(!retrievedModule) throw Error("Adding task: module " +eventName +"was not found");
const { pattern, name, runOnInit, timezone } = retrievedModule;
const task = cron.schedule(pattern,
(date) => listener({ date, deps: this.deps }),
{ name, runOnInit, timezone, scheduled: true });
task.on('task-failed', console.error)
cron.schedule(pattern,
(date) => listener({ date, deps: this.deps }),
{ name, runOnInit, timezone, scheduled: true });
return this;
}
removeListener(eventName: string | symbol, listener: AnyFunction) {
Expand Down
137 changes: 71 additions & 66 deletions src/handlers/event-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ import {
Observable,
concatMap,
filter,
of,
throwError,
fromEvent, map, OperatorFunction,
fromEvent,
map,
type OperatorFunction,
catchError,
finalize,
pipe,
from,
} from 'rxjs';
import * as Id from '../core/id'
import type { Emitter, ErrorHandling, Logging } from '../core/interfaces';
import type { Emitter } from '../core/interfaces';
import { PayloadType, SernError } from '../core/structures/enums'
import { Err, Ok, Result } from 'ts-results-es';
import type { Awaitable, UnpackedDependencies, VoidResult } from '../types/utility';
import type { ControlPlugin } from '../types/core-plugin';
import type { UnpackedDependencies, VoidResult } from '../types/utility';
import type { CommandModule, Module, Processed } from '../types/core-modules';
import * as assert from 'node:assert';
import { Context } from '../core/structures/context';
Expand All @@ -26,59 +27,75 @@ import { disposeAll } from '../core/ioc/base';
import { arrayifySource, handleError } from '../core/operators';
import { resultPayload, isAutocomplete, treeSearch } from '../core/functions'

function intoPayload(module: Module) {
interface ExecutePayload {
module: Module;
args: unknown[];
deps: Dependencies
[key: string]: unknown
}

function intoPayload(module: Module, deps: Dependencies) {
return pipe(map(arrayifySource),
map(args => ({ module, args })));
map(args => ({ module, args, deps })));
}
const createResult = createResultResolver<
Processed<Module>,
{ module: Processed<Module>; args: unknown[] },
unknown[]
>({ onNext: (p) => p.args, });
const createResult = (deps: Dependencies) =>
createResultResolver<unknown[]>({
onNext: (p) => p.args,
onStop: (module) => {
//maybe do something when plugins fail?
}
});
/**
* Creates an observable from { source }
* @param module
* @param source
*/
export function eventDispatcher(module: Module, source: unknown) {
export function eventDispatcher(deps: Dependencies, module: Module, source: unknown) {
assert.ok(source && typeof source === 'object',
`${source} cannot be constructed into an event listener`);
const execute: OperatorFunction<unknown[], unknown> =
concatMap(async args => module.execute(...args));
const execute: OperatorFunction<unknown[]|undefined, unknown> =
concatMap(async args => {
if(args)
return module.execute.apply(null, args);
});
//@ts-ignore
return fromEvent(source, module.name!)
//@ts-ignore
.pipe(intoPayload(module),
concatMap(createResult),
.pipe(intoPayload(module, deps),
concatMap(createResult(deps)),
execute);
}

interface DispatchPayload {
module: Processed<CommandModule>;
event: BaseInteraction;
defaultPrefix?: string
defaultPrefix?: string;
deps: Dependencies
};
export function createDispatcher({ module, event, defaultPrefix }: DispatchPayload) {
export function createDispatcher({ module, event, defaultPrefix, deps }: DispatchPayload): ExecutePayload {
assert.ok(CommandType.Text !== module.type,
SernError.MismatchEvent + 'Found text command in interaction stream');

if(isAutocomplete(event)) {
assert.ok(module.type === CommandType.Slash
|| module.type === CommandType.Both);
|| module.type === CommandType.Both, "Autocomplete option on non command interaction");
const option = treeSearch(event, module.options);
assert.ok(option, SernError.NotSupportedInteraction + ` There is no autocomplete tag for ` + inspect(module));
const { command } = option;
return { module: command as Processed<Module>, //autocomplete is not a true "module" warning cast!
args: [event] };
args: [event],
deps };
}

switch (module.type) {
case CommandType.Slash:
case CommandType.Both: {
return {
module,
args: [Context.wrap(event, defaultPrefix)]
args: [Context.wrap(event, defaultPrefix)],
deps
};
}
default: return { module, args: [event] };
default: return { module, args: [event], deps };
}
}
function createGenericHandler<Source, Narrowed extends Source, Output>(
Expand Down Expand Up @@ -114,50 +131,48 @@ export function fmt(msg: string, prefix: string): string[] {
*/
export function createInteractionHandler<T extends Interaction>(
source: Observable<Interaction>,
mg: Map<string, Module>,
deps: Dependencies,
defaultPrefix?: string
) {
const mg = deps['@sern/modules']
return createGenericHandler<Interaction, T, Result<ReturnType<typeof createDispatcher>, void>>(
source,
async event => {
const possibleIds = Id.reconstruct(event);
let fullPaths= possibleIds
let modules = possibleIds
.map(id => mg.get(id))
.filter((id): id is Module => id !== undefined);

if(fullPaths.length == 0) {
if(modules.length == 0) {
return Err.EMPTY;
}
const [ path ] = fullPaths;
const [ module ] = modules;
return Ok(createDispatcher({
module: path as Processed<CommandModule>,
module: module as Processed<CommandModule>,
event,
defaultPrefix
defaultPrefix,
deps
}));
});
}

export function createMessageHandler(
source: Observable<Message>,
defaultPrefix: string,
mg: Map<string, Module>,
deps: Dependencies
) {
const mg = deps['@sern/modules'];
return createGenericHandler(source, async event => {
const [prefix] = fmt(event.content, defaultPrefix);
let module= mg.get(`${prefix}_T`) ?? mg.get(`${prefix}_B`) as Module;
if(!module) {
return Err('Possibly undefined behavior: could not find a static id to resolve');
}
return Ok({ args: [Context.wrap(event, defaultPrefix)], module })
return Ok({ args: [Context.wrap(event, defaultPrefix)], module, deps })
});
}



interface ExecutePayload {
module: Processed<Module>;
task: () => Awaitable<unknown>;
}
/**
* Wraps the task in a Result as a try / catch.
* if the task is ok, an event is emitted and the stream becomes empty
Expand All @@ -169,20 +184,16 @@ interface ExecutePayload {
*/
export function executeModule(
emitter: Emitter,
logger: Logging|undefined,
errHandler: ErrorHandling,
{ module, task }: ExecutePayload,
{ module, args }: ExecutePayload,
) {
return of(module).pipe(
//converting the task into a promise so rxjs can resolve the Awaitable properly
concatMap(() => Result.wrapAsync(async () => task())),
concatMap(result => {
return from(Result.wrapAsync(async () => module.execute(...args)))
.pipe(concatMap(result => {
if (result.isOk()) {
emitter.emit('module.activate', resultPayload(PayloadType.Success, module));
return EMPTY;
}
}
return throwError(() => resultPayload(PayloadType.Failure, module, result.error));
}));
}))
};

/**
Expand All @@ -194,29 +205,25 @@ export function executeModule(
* @param config
* @returns receiver function for flattening a stream of data
*/
export function createResultResolver<
T extends { execute: (...args: any[]) => any; onEvent: ControlPlugin[] },
Args extends { module: T; [key: string]: unknown },
Output,
>(config: {
onStop?: (module: T) => unknown;
onNext: (args: Args, map: Record<string, unknown>) => Output;
export function createResultResolver<Output>(config: {
onStop?: (module: Module) => unknown;
onNext: (args: ExecutePayload, map: Record<string, unknown>) => Output;
}) {
return async (payload: Args) => {
//@ts-ignore fix later
const { onStop, onNext } = config;
return async (payload: ExecutePayload) => {
const task = await callPlugins(payload);
if(task.isOk()) {
return config.onNext(payload, task.value) as ExecutePayload;
return onNext(payload, task.value) as Output;
} else {
config.onStop?.(payload.module);
onStop?.(payload.module);
}
};
};

async function callPlugins({ args, module }: { args: unknown[], module: Module }) {
async function callPlugins({ args, module, deps }: ExecutePayload) {
let state = {};
for(const plugin of module.onEvent) {
const result = await plugin.execute.apply(null, arrayifySource(args));
const result = await plugin.execute(...args, { state, deps });
if(result.isErr()) {
return result;
}
Expand All @@ -230,11 +237,11 @@ async function callPlugins({ args, module }: { args: unknown[], module: Module }
* Creates an executable task ( execute the command ) if all control plugins are successful
* @param onStop emits a failure response to the SernEmitter
*/
export function makeModuleExecutor<M extends Module, Args extends { module: M; args: unknown[]; }>
(onStop: (m: M) => unknown) {
const onNext = ({ args, module }: Args, state: Record<string, unknown>) => ({
task: () => module.execute(...args, state),
export function intoTask(onStop: (m: Module) => unknown) {
const onNext = ({ args, module, deps }: ExecutePayload, state: Record<string, unknown>) => ({
module,
args: [...args, { state }],
deps
});
return createResultResolver({ onStop, onNext })
}
Expand All @@ -243,8 +250,6 @@ export const handleCrash =
({ "@sern/errors": err, '@sern/emitter': sem, '@sern/logger': log } : UnpackedDependencies) =>
pipe(catchError(handleError(err, sem, log)),
finalize(() => {
log?.info({
message: 'A stream closed or reached end of lifetime',
});
log?.info({ message: 'A stream closed or reached end of lifetime' });
disposeAll(log);
}))
16 changes: 7 additions & 9 deletions src/handlers/interaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,31 @@ import type { Interaction } from 'discord.js';
import { mergeMap, merge, concatMap, EMPTY } from 'rxjs';
import { PayloadType } from '../core/structures/enums';
import { filterTap, sharedEventStream } from '../core/operators'
import { createInteractionHandler, executeModule, makeModuleExecutor } from './event-utils';
import { createInteractionHandler, executeModule, intoTask, } from './event-utils';
import { SernError } from '../core/structures/enums'
import { isAutocomplete, isCommand, isMessageComponent, isModal, resultPayload, } from '../core/functions'
import { UnpackedDependencies } from '../types/utility';
import { Emitter } from '../core/interfaces';

export default function interactionHandler(deps: UnpackedDependencies, defaultPrefix?: string) {
//i wish javascript had clojure destructuring
const { '@sern/modules': modules,
'@sern/client': client,
'@sern/logger': log,
'@sern/errors': err,
const { '@sern/client': client,
'@sern/emitter': emitter } = deps
const interactionStream$ = sharedEventStream<Interaction>(client as unknown as Emitter, 'interactionCreate');
const handle = createInteractionHandler(interactionStream$, modules, defaultPrefix);
const handle = createInteractionHandler(interactionStream$, deps, defaultPrefix);

const interactionHandler$ = merge(handle(isMessageComponent),
handle(isAutocomplete),
handle(isCommand),
handle(isModal));
return interactionHandler$
.pipe(filterTap(e => emitter.emit('warning', resultPayload(PayloadType.Warning, undefined, e))),
concatMap(makeModuleExecutor(module =>
emitter.emit('module.activate', resultPayload(PayloadType.Failure, module, SernError.PluginFailure)))),
concatMap(intoTask(module => {
emitter.emit('module.activate', resultPayload(PayloadType.Failure, module, SernError.PluginFailure))
})),
mergeMap(payload => {
if(payload)
return executeModule(emitter, log, err, payload)
return executeModule(emitter, payload)
return EMPTY;
}));
}
Loading

0 comments on commit 0beeb4c

Please sign in to comment.