Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for volatile functions #1

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
use flake
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ jspm_packages
# Optional npm cache directory
.npm

# Optional direnv cache directory
.direnv

# Optional REPL history
.node_repl_history

Expand Down
27 changes: 27 additions & 0 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
description = "Development environment";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";

outputs =
{ self, nixpkgs }:
let
inherit (nixpkgs) lib;

systems = [
"aarch64-linux"
"aarch64-darwin"
"x86_64-darwin"
"x86_64-linux"
];

eachSystem = lib.flip lib.mapAttrs (
lib.genAttrs systems (system: nixpkgs.legacyPackages.${system})
);
in
{
devShell = eachSystem (
system: pkgs:
pkgs.mkShell {
packages = [
pkgs.nodejs
pkgs.pnpm_9
];
}
);
};
}
49 changes: 31 additions & 18 deletions src/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export const REACTIVE_NODE: ReactiveNode = {
version: 0 as Version,
lastCleanEpoch: 0 as Version,
dirty: false,
volatile: false,
producerNode: undefined,
producerLastReadVersion: undefined,
producerIndexOfThis: undefined,
Expand Down Expand Up @@ -109,6 +110,12 @@ export interface ReactiveNode {
*/
dirty: boolean;

/**
* Whether this consumer depends on a volatile (i.e. untrackable) source and
* must always recompute.
*/
volatile: boolean;

/**
* Producers which are dependencies of this consumer.
*
Expand Down Expand Up @@ -272,28 +279,31 @@ export function producerIncrementEpoch(): void {
}

/**
* Ensure this producer's `version` is up-to-date.
* Ensure this producer's `version` is up-to-date. Note that we always recompute the value if the
* node depends on volatile sources.
*/
export function producerUpdateValueVersion(node: ReactiveNode): void {
if (consumerIsLive(node) && !node.dirty) {
// A live consumer will be marked dirty by producers, so a clean state means that its version
// is guaranteed to be up-to-date.
return;
}
if (!node.volatile) {
if (consumerIsLive(node) && !node.dirty) {
// A live consumer will be marked dirty by producers, so a clean state means that its version
// is guaranteed to be up-to-date.
return;
}

if (!node.dirty && node.lastCleanEpoch === epoch) {
// Even non-live consumers can skip polling if they previously found themselves to be clean at
// the current epoch, since their dependencies could not possibly have changed (such a change
// would've increased the epoch).
return;
}
if (!node.dirty && node.lastCleanEpoch === epoch) {
// Even non-live consumers can skip polling if they previously found themselves to be clean at
// the current epoch, since their dependencies could not possibly have changed (such a change
// would've increased the epoch).
return;
}

if (!node.producerMustRecompute(node) && !consumerPollProducersForChange(node)) {
// None of our producers report a change since the last time they were read, so no
// recomputation of our value is necessary, and we can consider ourselves clean.
node.dirty = false;
node.lastCleanEpoch = epoch;
return;
if (!node.producerMustRecompute(node) && !consumerPollProducersForChange(node)) {
// None of our producers report a change since the last time they were read, so no
// recomputation of our value is necessary, and we can consider ourselves clean.
node.dirty = false;
node.lastCleanEpoch = epoch;
return;
}
}

node.producerRecomputeValue(node);
Expand Down Expand Up @@ -387,6 +397,9 @@ export function consumerAfterComputation(
node.producerLastReadVersion.pop();
node.producerIndexOfThis.pop();
}

// Depending on a volatile producer makes the consumer volatile also.
node.volatile = node.producerNode.some((producer) => producer.volatile);
}

/**
Expand Down
104 changes: 104 additions & 0 deletions src/volatile.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import type {Signal} from './wrapper';
import {
REACTIVE_NODE,
type ReactiveNode,
producerAccessed,
producerUpdateValueVersion,
producerIncrementEpoch,
producerNotifyConsumers,
consumerBeforeComputation,
consumerAfterComputation,
} from './graph';

/**
* Volatile functions read from external sources. They can change at any time
* without notifying the graph. If the source supports it, optionally we can
* subscribe to changes while observed.
*
* Unless the external source is actively being observed, we have to assume
* it's stale and bust the cache of everything downstream.
*/
export function createVolatile<T>(getSnapshot: () => T): VolatileNode<T> {
const node: VolatileNode<T> = Object.create(VOLATILE_NODE);
node.getSnapshot = getSnapshot;

return node;
}

export function volatileGetFn<T>(node: VolatileNode<T>): T {
// Update the cache if necessary.
producerUpdateValueVersion(node);

// Track who accessed the signal.
producerAccessed(node);

return node.value as T;
}

export interface VolatileNode<T> extends ReactiveNode {
/** Read state from the outside world. May be expensive. */
getSnapshot: () => T;

/** Invalidates the cached value when subscribed to an external source. */
onChange(this: VolatileNode<T>): void;

/**
* Cached value. Only used when being watched and a subscriber is provided.
* Otherwise values are pulled from `getSnapshot`.
*/
value: T;

/**
* If the volatile source supports it, a `subscribe` callback may be
* provided that upgrades to a non-volatile source by tracking changes in
* a versioned cache.
*
* The return value may be an `unsubscribe` callback.
*/
subscribe?: (
this: Signal.Volatile<T>,
onChange: () => void,
) => void | ((this: Signal.Volatile<T>) => void);

/**
* Returned by the `subscribe` callback. Invoked when the volatile source is
* no longer being observed.
*/
unsubscribe(): void;
}

// Note: Using an IIFE here to ensure that the spread assignment is not considered
// a side-effect, ending up preserving `VOLATILE_NODE` and `REACTIVE_NODE`.
// TODO: remove when https://github.com/evanw/esbuild/issues/3392 is resolved.
const VOLATILE_NODE = /* @__PURE__ */ (() => ({
...REACTIVE_NODE,
dirty: true,
volatile: true,

unsubscribe() {},

onChange() {
this.version++;
this.dirty = true;
producerIncrementEpoch();
producerNotifyConsumers(this);
},

producerMustRecompute(node: VolatileNode<unknown>): boolean {
return node.dirty;
},

producerRecomputeValue(node: VolatileNode<unknown>): void {
if (node.volatile || node.dirty) {
const alreadyVolatile = node.volatile;
const prevConsumer = consumerBeforeComputation(node);

try {
node.value = node.getSnapshot();
} finally {
consumerAfterComputation(node, prevConsumer);
node.volatile ||= alreadyVolatile;
}
}
},
}))();
75 changes: 67 additions & 8 deletions src/wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ import {
producerRemoveLiveConsumerAtIndex,
} from './graph.js';
import {createSignal, signalGetFn, signalSetFn, type SignalNode} from './signal.js';
import {createVolatile, volatileGetFn, type VolatileNode} from './volatile';

const NODE: unique symbol = Symbol('node');

let isState: (s: any) => boolean, isComputed: (s: any) => boolean, isWatcher: (s: any) => boolean;
let isState: (s: any) => boolean,
isVolatile: (s: any) => boolean,
isComputed: (s: any) => boolean,
isWatcher: (s: any) => boolean;

// eslint-disable-next-line @typescript-eslint/no-namespace
export namespace Signal {
Expand Down Expand Up @@ -110,10 +114,65 @@ export namespace Signal {
}
}

export class Volatile<T> {
readonly [NODE]: VolatileNode<T>;

#brand() {}

static {
isVolatile = (v: any): v is Volatile<any> => #brand in v;
}

constructor(
getSnapshot: () => T,
options?: {
subscribe?: (onChange: () => void) => void | (() => void);
},
) {
const node = createVolatile(getSnapshot);
node.wrapper = this;
node.subscribe = options?.subscribe;
node.watched = this.#watched;
node.unwatched = this.#unwatched;
this[NODE] = node;
}

get(): T {
if (!isVolatile(this))
throw new TypeError('Wrong receiver type for Signal.Volatile.prototype.get');

return volatileGetFn(this[NODE]);
}

#watched() {
const node = this[NODE];
node.dirty = true; // Force a fresh read before trusting cache.
node.volatile = false;

let unsubscribed = false;
const unsubscribe = node.subscribe?.call(this, () => {
if (!unsubscribed) {
node.onChange.call(node);
}
});

node.unsubscribe = () => {
unsubscribed = true;
unsubscribe?.call(this);
};
}

#unwatched() {
const node = this[NODE];
node.unsubscribe();
node.volatile = true;
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type AnySignal<T = any> = State<T> | Computed<T>;
type AnySignal<T = any> = State<T> | Computed<T> | Volatile<T>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type AnySink = Computed<any> | subtle.Watcher;
type AnySink = Computed<any> | Volatile<any> | subtle.Watcher;

// eslint-disable-next-line @typescript-eslint/no-namespace
export namespace subtle {
Expand All @@ -133,7 +192,7 @@ export namespace Signal {
// Returns ordered list of all signals which this one referenced
// during the last time it was evaluated
export function introspectSources(sink: AnySink): AnySignal[] {
if (!isComputed(sink) && !isWatcher(sink)) {
if (!isComputed(sink) && !isWatcher(sink) && !isVolatile(sink)) {
throw new TypeError('Called introspectSources without a Computed or Watcher argument');
}
return sink[NODE].producerNode?.map((n) => n.wrapper) ?? [];
Expand All @@ -143,15 +202,15 @@ export namespace Signal {
// lead to an Effect which has not been disposed
// Note: Only watched Computed signals will be in this list.
export function introspectSinks(signal: AnySignal): AnySink[] {
if (!isComputed(signal) && !isState(signal)) {
if (!isComputed(signal) && !isState(signal) && !isVolatile(signal)) {
throw new TypeError('Called introspectSinks without a Signal argument');
}
return signal[NODE].liveConsumerNode?.map((n) => n.wrapper) ?? [];
}

// True iff introspectSinks() is non-empty
export function hasSinks(signal: AnySignal): boolean {
if (!isComputed(signal) && !isState(signal)) {
if (!isComputed(signal) && !isState(signal) && !isVolatile(signal)) {
throw new TypeError('Called hasSinks without a Signal argument');
}
const liveConsumerNode = signal[NODE].liveConsumerNode;
Expand All @@ -161,7 +220,7 @@ export namespace Signal {

// True iff introspectSources() is non-empty
export function hasSources(signal: AnySink): boolean {
if (!isComputed(signal) && !isWatcher(signal)) {
if (!isComputed(signal) && !isWatcher(signal) && !isVolatile(signal)) {
throw new TypeError('Called hasSources without a Computed or Watcher argument');
}
const producerNode = signal[NODE].producerNode;
Expand Down Expand Up @@ -192,7 +251,7 @@ export namespace Signal {

#assertSignals(signals: AnySignal[]): void {
for (const signal of signals) {
if (!isComputed(signal) && !isState(signal)) {
if (!isComputed(signal) && !isState(signal) && !isVolatile(signal)) {
throw new TypeError('Called watch/unwatch without a Computed or State argument');
}
}
Expand Down
Loading