Skip to content

Commit

Permalink
Improvements on the stores
Browse files Browse the repository at this point in the history
  • Loading branch information
guillemcordoba committed Feb 7, 2024
1 parent e5e4937 commit 7db81fa
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 36 deletions.
40 changes: 35 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions packages/stores/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@holochain-open-dev/stores",
"version": "0.8.10",
"version": "0.8.11",
"description": "Re-export of svelte/store, with additional utilities to build reusable holochain-open-dev modules",
"author": "[email protected]",
"main": "dist/index.js",
Expand All @@ -19,7 +19,7 @@
},
"dependencies": {
"@alenaksu/json-viewer": "^2.0.1",
"@holochain-open-dev/utils": "^0.16.2",
"@holochain-open-dev/utils": "^0.16.5",
"@holochain/client": "^0.16.6",
"@scoped-elements/cytoscape": "^0.2.0",
"@shoelace-style/shoelace": "^2.11.2",
Expand Down
81 changes: 56 additions & 25 deletions packages/stores/src/holochain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import cloneDeep from "lodash-es/cloneDeep.js";
import { asyncReadable, AsyncReadable, AsyncStatus } from "./async-readable.js";
import { retryUntilSuccess } from "./retry-until-success.js";

const DEFAULT_POLL_INTERVAL_MS = 20_000; // 20 seconds

export function createLinkToLink(
createLink: SignedActionHashed<CreateLink>
): Link {
Expand Down Expand Up @@ -55,9 +57,11 @@ export function collectionStore<
>(
client: ZomeClient<S>,
fetchCollection: () => Promise<Link[]>,
linkType: string
linkType: string,
pollIntervalMs: number = DEFAULT_POLL_INTERVAL_MS
): AsyncReadable<Array<Link>> {
return asyncReadable<Link[]>(async (set) => {
let active = true;
let links: Link[];

const maybeSet = (newLinksValue: Link[]) => {
Expand All @@ -77,11 +81,14 @@ export function collectionStore<
};

const fetch = async () => {
const nlinks = await fetchCollection();
const nlinks = await fetchCollection().finally(() => {
if (active) {
setTimeout(() => fetch(), pollIntervalMs);
}
});
maybeSet(nlinks);
};
await fetch();
const interval = setInterval(() => fetch(), 4000);
const unsubs = client.onSignal((originalSignal) => {
if (!(originalSignal as ActionCommittedSignal<any, any>).type) return;
const signal = originalSignal as ActionCommittedSignal<any, any>;
Expand All @@ -103,7 +110,7 @@ export function collectionStore<
}
});
return () => {
clearInterval(interval);
active = false;
unsubs();
};
});
Expand Down Expand Up @@ -151,9 +158,11 @@ export function latestVersionOfEntryStore<
S extends ActionCommittedSignal<any, any> & any
>(
client: ZomeClient<S>,
fetchLatestVersion: () => Promise<EntryRecord<T> | undefined>
fetchLatestVersion: () => Promise<EntryRecord<T> | undefined>,
pollIntervalMs: number = DEFAULT_POLL_INTERVAL_MS
): AsyncReadable<EntryRecord<T>> {
return readable<AsyncStatus<EntryRecord<T>>>({ status: "pending" }, (set) => {
let active = true;
let latestVersion: EntryRecord<T> | undefined;
const fetch = async () => {
try {
Expand All @@ -180,10 +189,13 @@ export function latestVersionOfEntryStore<
status: "error",
error: e,
});
} finally {
if (active) {
setTimeout(() => fetch(), pollIntervalMs);
}
}
};
fetch();
const interval = setInterval(() => fetch(), 4000);
const unsubs = client.onSignal((originalSignal) => {
if (!(originalSignal as ActionCommittedSignal<any, any>).type) return;
const signal = originalSignal as ActionCommittedSignal<any, any>;
Expand Down Expand Up @@ -211,7 +223,7 @@ export function latestVersionOfEntryStore<
});
return () => {
set({ status: "pending" });
clearInterval(interval);
active = false;
unsubs();
};
});
Expand All @@ -231,12 +243,18 @@ export function allRevisionsOfEntryStore<
S extends ActionCommittedSignal<any, any> & any
>(
client: ZomeClient<S>,
fetchAllRevisions: () => Promise<Array<EntryRecord<T>>>
fetchAllRevisions: () => Promise<Array<EntryRecord<T>>>,
pollIntervalMs: number = DEFAULT_POLL_INTERVAL_MS
): AsyncReadable<Array<EntryRecord<T>>> {
return asyncReadable(async (set) => {
let active = true;
let allRevisions: Array<EntryRecord<T>>;
const fetch = async () => {
const nAllRevisions = await fetchAllRevisions();
const nAllRevisions = await fetchAllRevisions().finally(() => {
if (active) {
setTimeout(() => fetch(), pollIntervalMs);
}
});
if (
allRevisions === undefined ||
!areArrayHashesEqual(
Expand All @@ -249,7 +267,6 @@ export function allRevisionsOfEntryStore<
}
};
await fetch();
const interval = setInterval(() => fetch(), 4000);
const unsubs = client.onSignal(async (originalSignal) => {
if (!(originalSignal as ActionCommittedSignal<any, any>).type) return;
const signal = originalSignal as ActionCommittedSignal<any, any>;
Expand Down Expand Up @@ -277,7 +294,7 @@ export function allRevisionsOfEntryStore<
}
});
return () => {
clearInterval(interval);
active = false;
unsubs();
};
});
Expand All @@ -297,12 +314,18 @@ export function deletesForEntryStore<
>(
client: ZomeClient<S>,
originalActionHash: ActionHash,
fetchDeletes: () => Promise<Array<SignedActionHashed<Delete>>>
fetchDeletes: () => Promise<Array<SignedActionHashed<Delete>>>,
pollIntervalMs: number = DEFAULT_POLL_INTERVAL_MS
): AsyncReadable<Array<SignedActionHashed<Delete>>> {
return asyncReadable(async (set) => {
let active = true;
let deletes: Array<SignedActionHashed<Delete>>;
const fetch = async () => {
const ndeletes = await fetchDeletes();
const ndeletes = await fetchDeletes().finally(() => {
if (active) {
setTimeout(() => fetch(), pollIntervalMs);
}
});
if (
deletes === undefined ||
!areArrayHashesEqual(
Expand All @@ -315,7 +338,6 @@ export function deletesForEntryStore<
}
};
await fetch();
const interval = setInterval(() => fetch(), 4000);
const unsubs = client.onSignal((originalSignal) => {
if (!(originalSignal as ActionCommittedSignal<any, any>).type) return;
const signal = originalSignal as ActionCommittedSignal<any, any>;
Expand All @@ -330,7 +352,7 @@ export function deletesForEntryStore<
}
});
return () => {
clearInterval(interval);
active = false;
unsubs();
};
});
Expand Down Expand Up @@ -392,7 +414,7 @@ function uniquifyActions<T extends Action>(
* Keeps an up to date list of the links for the non-deleted links in this DHT
* Makes requests only while it has some subscriber
*
* Will do so by calling the given every 4 seconds calling the given fetch function,
* Will do so by calling the given fetch callback every 20 seconds,
* and listening to `LinkCreated` and `LinkDeleted` signals
*
* Useful for link types
Expand All @@ -404,14 +426,16 @@ export function liveLinksStore<
client: ZomeClient<S>,
baseAddress: BASE,
fetchLinks: () => Promise<Array<Link>>,
linkType: LinkTypeForSignal<S>
linkType: LinkTypeForSignal<S>,
pollIntervalMs: number = DEFAULT_POLL_INTERVAL_MS
): AsyncReadable<Array<Link>> {
let innerBaseAddress = baseAddress;
if (getHashType(innerBaseAddress) === HashType.AGENT) {
innerBaseAddress = retype(innerBaseAddress, HashType.ENTRY) as BASE;
}
return asyncReadable(async (set) => {
let links: Link[];
let active = true;

const maybeSet = (newLinksValue: Link[]) => {
const orderedNewLinks = uniquifyLinks(newLinksValue).sort(
Expand All @@ -430,13 +454,15 @@ export function liveLinksStore<
}
};
const fetch = async () => {
const nlinks = await fetchLinks();
const nlinks = await fetchLinks().finally(() => {
if (active) {
setTimeout(() => fetch(), pollIntervalMs);
}
});
maybeSet(nlinks);
};

await fetch();

const interval = setInterval(() => fetch(), 4000);
const unsubs = client.onSignal((originalSignal) => {
if (!(originalSignal as ActionCommittedSignal<any, any>).type) return;
const signal = originalSignal as ActionCommittedSignal<any, any>;
Expand Down Expand Up @@ -466,7 +492,7 @@ export function liveLinksStore<
}
});
return () => {
clearInterval(interval);
active = false;
unsubs();
};
});
Expand All @@ -492,7 +518,8 @@ export function deletedLinksStore<
[SignedActionHashed<CreateLink>, Array<SignedActionHashed<DeleteLink>>]
>
>,
linkType: LinkTypeForSignal<S>
linkType: LinkTypeForSignal<S>,
pollIntervalMs: number = DEFAULT_POLL_INTERVAL_MS
): AsyncReadable<
Array<[SignedActionHashed<CreateLink>, Array<SignedActionHashed<DeleteLink>>]>
> {
Expand All @@ -504,6 +531,7 @@ export function deletedLinksStore<
let deletedLinks: Array<
[SignedActionHashed<CreateLink>, Array<SignedActionHashed<DeleteLink>>]
>;
let active = true;

const maybeSet = (
newDeletedLinks: Array<
Expand Down Expand Up @@ -539,11 +567,14 @@ export function deletedLinksStore<
}
};
const fetch = async () => {
const ndeletedLinks = await fetchDeletedLinks();
const ndeletedLinks = await fetchDeletedLinks().finally(() => {
if (active) {
setTimeout(() => fetch(), pollIntervalMs);
}
});
maybeSet(ndeletedLinks);
};
await fetch();
const interval = setInterval(() => fetch(), 4000);
const unsubs = client.onSignal((originalSignal) => {
if (!(originalSignal as ActionCommittedSignal<any, any>).type) return;
const signal = originalSignal as ActionCommittedSignal<any, any>;
Expand Down Expand Up @@ -582,7 +613,7 @@ export function deletedLinksStore<
}
});
return () => {
clearInterval(interval);
active = true;
unsubs();
};
});
Expand Down
Loading

0 comments on commit 7db81fa

Please sign in to comment.