Skip to content

🚀 The set of async flow control structures and promise utils.

License

Notifications You must be signed in to change notification settings

smikhalevski/parallel-universe

Repository files navigation

Spaceman

npm install --save-prod parallel-universe

🚀 API documentation is available here.

AbortablePromise

The promise that can be aborted.

const promise = new AbortablePromise((resolve, reject, signal) => {
  signal.addEventListener('abort', () => {
    // Listen to the signal being aborted
  });
  
  // Resolve or reject the promise
});

promise.abort();

When abort is called, the promise is instantly rejected with an AbortError if it isn't settled yet.

Provide a custom abort reason:

promise.abort(new Error('Operation aborted'));

Abort promise if an external signal is aborted:

promise.withSignal(signal);

Deferred

The promise that can be resolved externally.

const promise = new Deferred<string>();

promise.then(value => {
  doSomething(value);
});

promise.resolve('Mars');

AsyncQueue

Asynchronous queue decouples value providers and value consumers.

const queue = new AsyncQueue();

// Provider adds a value
queue.append('Mars');

// Consumer takes a value
queue.take();
// ⮕ AbortablePromise { 'Mars' }

append appends the value to the queue, while take removes the value from the queue as soon as it is available. If there are no values in the queue upon take call then the returned promise is resolved after the next append call.

const queue = new AsyncQueue();

// The returned promise would be resolved after the append call
queue.take();
// ⮕ AbortablePromise { 'Mars' }

queue.append('Mars');

Consumers receive values from the queue in the same order they were added by providers:

const queue = new AsyncQueue();

queue.append('Mars');
queue.append('Venus');

queue.take();
// ⮕ AbortablePromise { 'Mars' }

queue.take();
// ⮕ AbortablePromise { 'Venus' }

Acknowledgements

In some cases removing the value from the queue isn't the desirable behavior, since the consumer may not be able to process the taken value. Use takeAck to examine available value and acknowledge that it can be processed. takeAck returns a tuple of the available value and the acknowledgement callback. The consumer should call ack to notify the queue on weather to remove the value from the queue or to retain it.

queue.takeAck().then(([value, ack]) => {
  try {
    if (doSomeChecks()) {
      ack(true);
      doSomething(value);
    } 
  } finally {
    ack(false);
  }
});

To guarantee that consumers receive values in the same order as they were provided, acknowledgements prevent subsequent consumers from being fulfilled until ack is called. Be sure to call ack to prevent the queue from being stuck indefinitely.

Calling ack multiple times is safe, since only the first call would have an effect.

To acknowledge that the consumer can process the value, and the value must be removed from the queue use:

ack(true);

To acknowledge that the value should be retained by the queue use:

ack(false);

The value that was retained in the queue becomes available for the subsequent consumer.

const queue = new AsyncQueue();

queue.append('Pluto');

queue.takeAck(([value, ack]) => {
  ack(false); // Tells queue to retain the value
});

queue.take();
// ⮕ AbortablePromise { 'Pluto' }

WorkPool

The callback execution pool that executes the limited number of callbacks in parallel while other submitted callbacks wait in the queue.

// The pool that processes 5 callbacks in parallel at maximum
const pool = new WorkPool(5);

pool.submit(signal => {
  return Promise.resolve('Mars');
});
// ⮕ AbortablePromise<string>

You can change how many callbacks can the pool process in parallel:

pool.setSize(2);
// ⮕ Promise<void>

setSize returns the promise that is resolved when there are no excessive callbacks being processed in parallel.

If you resize the pool down, callbacks that are pending and exceed the new size limit, are notified via signal that they must be aborted.

To abort all callbacks that are being processed by the pool and wait for their completion use:

// Resolved when all pending callbacks are fulfilled
pool.setSize(0);
// ⮕ Promise<void>

Lock

Promise-based lock implementation.

When someone tries to acquire a Lock they receive a promise for a release callback that is resolved as soon as previous lock owner invokes their release callback.

const lock = new Lock();

lock.acquire();
// ⮕ Promise<() => void>

You can check that the lock is locked before acquiring a lock.

For example, if you want to force an async callback executions to be sequential you can use an external lock:

const lock = new Lock();

async function doSomething() {
  const release = await lock.acquire();
  try {
    // Long process is handled here
  } finally {
    release();
  }
}

// Long process would be executed three times sequentially
doSomething();
doSomething();
doSomething();

Blocker

Provides a mechanism for blocking an async process and unblocking it from the outside.

const blocker = new Blocker<string>();

blocker.block();
// ⮕ Promise<string>

You can later unblock it passing a value that would fulfill the promise returned from the block call:

blocker.unblock('Mars');

PubSub

Publish–subscribe pattern implementation:

const pubSub = new PubSub<string>();

pubSub.subscribe(message => {
  // Process the message
});

pubSub.publish('Pluto');

repeat

Invokes a callback periodically with the given delay between settlements of returned promises until the condition is met. By default, the callback is invoked indefinitely with no delay between settlements:

repeat(async () => {
  await doSomething();
});
// ⮕ AbortablePromise<void>

Specify a delay between invocations:

repeat(doSomething, 3000);
// ⮕ AbortablePromise<void>

Abort the loop:

const promise = repeat(doSomething, 3000);

promise.abort();

Specify the condition when the loop must be stopped. The example below randomly picks a planet name once every 3 seconds and fulfills the returned promise when 'Mars' is picked:

repeat(
  () => ['Mars', 'Pluto', 'Venus'][Math.floor(Math.random() * 3)],
  3000,
  value => value === 'Mars'
);
// ⮕ AbortablePromise<string>

You can combine repeat with timeout to limit the repeat duration:

timeout(
  repeat(async () => {
    await doSomething();
  }),
  5000
);

retry

Invokes a callback periodically until it successfully returns the result. If a callback throws an error or returns a promise that is rejected then it is invoked again after a delay.

retry(async () => {
  await doSomethingOrThrow();
});
// ⮕ AbortablePromise<void>

Specify a delay between tries:

retry(doSomethingOrThrow, 3000);
// ⮕ AbortablePromise<void>

Specify maximum number of tries:

retry(doSomethingOrThrow, 3000, 5);
// ⮕ AbortablePromise<void>

Abort the retry prematurely:

const promise = retry(doSomethingOrThrow, 3000);

promise.abort();

You can combine retry with timeout to limit the retry duration:

timeout(
  retry(async () => {
    await doSomethingOrThrow();
  }),
  5000
);

waitFor

Returns a promise that is fulfilled when a callback returns a truthy value:

waitFor(async () => doSomething());
// ⮕ AbortablePromise<ReturnType<typeof doSomething>>

If you don't want waitFor to invoke the callback too frequently, provide a delay in milliseconds:

waitFor(doSomething, 1_000);

delay

Returns a promise that resolves after a timeout. If signal is aborted then the returned promise is rejected with an error.

delay(100);
// ⮕ AbortablePromise<void>

Delay can be resolved with a value:

delay(100, 'Pluto');
// ⮕ AbortablePromise<string>

timeout

Rejects with an error if the execution time exceeds the timeout.

timeout(async signal => doSomething(), 100);
// ⮕ Promise<ReturnType<typeof doSomething>>

timeout(
  new AbortablePromise(resolve => {
    // Resolve the promise value
  }),
  100
);