Skip to content

Commit

Permalink
feat!: dont batch queries
Browse files Browse the repository at this point in the history
  • Loading branch information
samsiegart committed Dec 14, 2023
1 parent 42625b2 commit ebfc261
Show file tree
Hide file tree
Showing 7 changed files with 386 additions and 264 deletions.
3 changes: 3 additions & 0 deletions packages/rpc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
},
"dependencies": {
"@endo/marshal": "^0.8.9",
"axios": "^1.6.2",
"axios-retry": "^4.0.0",
"vite": "^4.3.2",
"vite-tsconfig-paths": "^4.2.0"
},
"devDependencies": {
"@typescript-eslint/eslint-plugin": "^5.35.1",
"@typescript-eslint/parser": "^5.35.1",
"@vitest/coverage-c8": "^0.25.3",
"axios-mock-adapter": "^1.22.0",
"eslint": "^8.22.0",
"eslint-plugin-import": "^2.26.0",
"happy-dom": "^9.20.3",
Expand Down
78 changes: 0 additions & 78 deletions packages/rpc/src/batchQuery.ts

This file was deleted.

185 changes: 77 additions & 108 deletions packages/rpc/src/chainStorageWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/* eslint-disable import/extensions */
import { makeClientMarshaller } from './marshal';
import { AgoricChainStoragePathKind } from './types';
import { batchVstorageQuery, keyToPath, pathToKey } from './batchQuery';
import { vstorageQuery, keyToPath, pathToKey } from './vstorageQuery';
import type { UpdateHandler } from './types';

type Subscriber<T> = {
Expand Down Expand Up @@ -35,13 +35,20 @@ export type ChainStorageWatcher = ReturnType<
typeof makeAgoricChainStorageWatcher
>;

/**
This is used to avoid notifying subscribers for already-seen
values. For `data` queries, it is the stringified blockheight of where the
value appeared. For `children` queries, it is the stringified array of
children.
*/
type LatestValueIdentifier = string;

/**
* Periodically queries the most recent data from chain storage.
* @param apiAddr API server URL
* @param chainId the chain id to use
* @param onError
* @param marshaller CapData marshal to use
* @param newPathQueryDelayMs
* @param refreshLowerBoundMs
* @param refreshUpperBoundMs
* @returns
Expand All @@ -51,136 +58,101 @@ export const makeAgoricChainStorageWatcher = (
chainId: string,
onError?: (e: Error) => void,
marshaller = makeClientMarshaller(),
newPathQueryDelayMs = defaults.newPathQueryDelayMs,
refreshLowerBoundMs = defaults.refreshLowerBoundMs,
refreshUpperBoundMs = defaults.refreshUpperBoundMs,
) => {
// Map of paths to [identifier, value] pairs of most recent response values.
//
// The 'identifier' is used to avoid notifying subscribers for already-seen
// values. For 'data' queries, 'identifier' is the blockheight of the
// response. For 'children' queries, 'identifier' is the stringified array
// of children.
/**
* Map from vstorage paths to `[identifier, value]` pairs containing their
* most recent response values and their {@link LatestValueIdentifier}.
*/
const latestValueCache = new Map<
string,
[identifier: string, value: unknown]
[identifier: LatestValueIdentifier, value: unknown]
>();

const watchedPathsToSubscribers = new Map<string, Set<Subscriber<unknown>>>();
let isNewPathWatched = false;
let isQueryInProgress = false;
let nextQueryTimeout: number | null = null;

const queueNextQuery = () => {
if (isQueryInProgress || !watchedPathsToSubscribers.size) {
return;
}
const watchedPathsToRefreshTimeouts = new Map<string, number>();

if (isNewPathWatched) {
// If there is any new path to watch, schedule another query very soon.
if (nextQueryTimeout) {
window.clearTimeout(nextQueryTimeout);
}
nextQueryTimeout = window.setTimeout(queryUpdates, newPathQueryDelayMs);
} else {
// Otherwise, refresh after a normal interval.
nextQueryTimeout = window.setTimeout(
queryUpdates,
const refreshDataForPath = async (
path: [AgoricChainStoragePathKind, string],
) => {
const queueNextRefresh = () => {
window.clearTimeout(watchedPathsToRefreshTimeouts.get(pathKey));
const timeout = window.setTimeout(
() => refreshDataForPath(path),
randomRefreshPeriod(refreshLowerBoundMs, refreshUpperBoundMs),
);
}
};
watchedPathsToRefreshTimeouts.set(pathKey, timeout);
};

const queryUpdates = async () => {
isQueryInProgress = true;
nextQueryTimeout = null;
isNewPathWatched = false;

const paths = [...watchedPathsToSubscribers.keys()].map(keyToPath);

if (!paths.length) {
isQueryInProgress = false;
const pathKey = pathToKey(path);
let response;
try {
response = await vstorageQuery(apiAddr, marshaller.fromCapData, path);
} catch (e: unknown) {
console.error(`Error querying vstorage for path ${path}:`, e);
if (onError && e instanceof Error) {
onError(e);
}
// Try again later until client tells us to stop.
queueNextRefresh();
return;
}

try {
const responses = await batchVstorageQuery(
apiAddr,
marshaller.fromCapData,
paths,
);
const data = Object.fromEntries(responses);
watchedPathsToSubscribers.forEach((subscribers, path) => {
// Path was watched after query fired, wait until next round.
if (!data[path]) return;

if (data[path].error) {
subscribers.forEach(s => {
if (s.onError) {
s.onError(harden(data[path].error));
}
});
return;
}

const { blockHeight, value } = data[path];
const lastValue = latestValueCache.get(path);

if (
lastValue &&
(blockHeight === lastValue[0] ||
(blockHeight === undefined &&
JSON.stringify(value) === lastValue[0]))
) {
// The value isn't new, don't emit.
return;
}

latestValueCache.set(path, [
blockHeight ?? JSON.stringify(value),
value,
]);

subscribers.forEach(s => {
s.onUpdate(harden(value));
});
});
} catch (e) {
onError && onError(e as Error);
} finally {
isQueryInProgress = false;
queueNextQuery();
const { value, blockHeight } = response;
const [latestValueIdentifier, latestValue] =
latestValueCache.get(pathKey) || [];

if (
latestValue &&
(blockHeight === latestValueIdentifier ||
// Blockheight is undefined so fallback to using the stringified value
// as the identifier, as is the case for `children` queries.
(blockHeight === undefined && JSON.stringify(value) === latestValue))
) {
// The value isn't new, don't emit.
queueNextRefresh();
return;
}

latestValueCache.set(pathKey, [
// Fallback to using stringified value as identifier if no blockHeight,
// as is the case for `children` queries.
blockHeight ?? JSON.stringify(value),
value,
]);

const subscribersForPath = watchedPathsToSubscribers.get(pathKey);
subscribersForPath?.forEach(s => {
s.onUpdate(harden(value));
});
queueNextRefresh();
};

const stopWatching = (pathKey: string, subscriber: Subscriber<unknown>) => {
const subscribersForPath = watchedPathsToSubscribers.get(pathKey);
if (!subscribersForPath?.size) {
throw new Error(`cannot unsubscribe from unwatched path ${pathKey}`);
throw new Error(
`already stopped watching path ${pathKey}, nothing to do`,
);
}

if (subscribersForPath.size === 1) {
watchedPathsToSubscribers.delete(pathKey);
latestValueCache.delete(pathKey);
window.clearTimeout(watchedPathsToRefreshTimeouts.get(pathKey));
watchedPathsToRefreshTimeouts.delete(pathKey);
} else {
subscribersForPath.delete(subscriber);
}
};

const queueNewPathForQuery = () => {
if (!isNewPathWatched) {
isNewPathWatched = true;
queueNextQuery();
}
};

const watchLatest = <T>(
path: [AgoricChainStoragePathKind, string],
onUpdate: (latestValue: T) => void,
onPathError?: (log: string) => void,
) => {
const pathKey = pathToKey(path);
const subscriber = makePathSubscriber(onUpdate, onPathError);
const subscriber = makePathSubscriber(onUpdate);

const latestValue = latestValueCache.get(pathKey);
if (latestValue) {
Expand All @@ -195,23 +167,20 @@ export const makeAgoricChainStorageWatcher = (
pathKey,
new Set([subscriber as Subscriber<unknown>]),
);
queueNewPathForQuery();
refreshDataForPath(path);
}

return () => stopWatching(pathKey, subscriber as Subscriber<unknown>);
};

const queryOnce = <T>(path: [AgoricChainStoragePathKind, string]) =>
new Promise<T>((res, rej) => {
const stop = watchLatest<T>(
path,
val => {
stop();
res(val);
},
e => rej(e),
);
});
const queryOnce = async <T>(path: [AgoricChainStoragePathKind, string]) => {
const { value } = await vstorageQuery<T>(
apiAddr,
marshaller.fromCapData,
path,
);
return value;
};

// Assumes argument is an unserialized presence.
const presenceToSlot = (o: unknown) => marshaller.toCapData(o).slots[0];
Expand Down
Loading

0 comments on commit ebfc261

Please sign in to comment.