Skip to content

Commit

Permalink
feat: add end-key to pool (#133)
Browse files Browse the repository at this point in the history
* chore: generate new lcd/client

* feat: skipUploaderRole when endKey is reached

* chore: add tests

* chore: properly implemented end key on protocol node

* fix: yarn.lock

---------

Co-authored-by: Troy Kessler <[email protected]>
  • Loading branch information
shifty11 and troykessler authored Jun 12, 2024
1 parent 96837bc commit 52fb22e
Show file tree
Hide file tree
Showing 10 changed files with 2,093 additions and 1,621 deletions.
3 changes: 3 additions & 0 deletions common/protocol/src/methods/checks/isPoolActive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ export function isPoolActive(this: Validator): boolean {
case PoolStatus.POOL_STATUS_UNSPECIFIED:
this.logger.info("Pool status is currently unspecified. Idling ...");
return false;
case PoolStatus.POOL_STATUS_END_KEY_REACHED:
this.logger.info("End key reached. Idling ...");
return false;
default:
this.logger.info("Pool status is currently unknown. Idling ...");
return false;
Expand Down
14 changes: 9 additions & 5 deletions common/protocol/src/methods/main/runCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ export async function runCache(this: Validator): Promise<void> {
this.m.cache_index_tail.set(Math.max(0, currentIndex - 1));

for (let i = currentIndex; i < targetIndex; i++) {
// if the end key is not empty and we have reached the end key of the pool
// we do not sync past this key
if (this.pool.data?.end_key && this.pool.data.end_key === key) {
this.logger.info(
`Reached pool end key "${key}", the node will not continue collecting data past this key.`
);
break;
}

// check if data item was already collected. If it was
// already collected we don't need to retrieve it again
this.logger.debug(`this.cacheProvider.exists(${i.toString()})`);
Expand Down Expand Up @@ -223,11 +232,6 @@ export async function runCache(this: Validator): Promise<void> {
key = nextKey;
}

// indicate that current caching round is done
this.logger.debug(
`Finished caching from index ${currentIndex} to ${targetIndex}. Waiting for next round ...`
);

// wait until a new bundle proposal is available. We don't need
// to sync the pool here because the pool state already gets
// synced in the other main function "runNode" so we only listen
Expand Down
6 changes: 6 additions & 0 deletions common/protocol/src/methods/main/runNode.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { PoolStatus } from "@kyvejs/types/lcd/kyve/pool/v1beta1/pool";
import { Validator } from "../..";
import { IDLE_TIME, sleep } from "../../utils";

Expand Down Expand Up @@ -33,6 +34,11 @@ export async function runNode(this: Validator): Promise<void> {
process.exit(1);
}

if (this.pool.status === PoolStatus.POOL_STATUS_END_KEY_REACHED) {
this.logger.info(`Reached pool end key. Shutting down node ...`);
process.exit(0);
}

// log warnings if storage provider balance is low
await this.isStorageBalanceLow();

Expand Down
13 changes: 13 additions & 0 deletions common/protocol/src/methods/validate/validateBundleProposal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,19 @@ export async function validateBundleProposal(
// validate each data item in bundle with custom runtime validation
for (let i = 0; i < proposedBundle.length; i++) {
if (valid) {
// if the pool has an end key and we find out that a data item
// has the end key and it is not the last data item in the bundle
// we consider the bundle invalid
if (this.pool.data?.end_key) {
if (
i < proposedBundle.length - 1 &&
proposedBundle[i].key === this.pool.data?.end_key
) {
valid = false;
break;
}
}

this.logger.debug(
`this.runtime.validateDataItem($THIS, $PROPOSED_DATA_ITEM, $VALIDATION_DATA_ITEM)`
);
Expand Down
188 changes: 168 additions & 20 deletions common/protocol/test/cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ TEST CASES - cache tests
* start caching from a pool where transformDataItem fails
* start caching from a pool where nextKey fails once
* start caching from a pool where cache methods fail
* TODO: test with pool config that has no source object
* TODO: test with pool config that has zero sources
* TODO: start caching from a pool where node has not cached anything yet
* start caching from a pool which has an endKey
*/

Expand Down Expand Up @@ -249,8 +247,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool which has a bundle proposal ongoing", async () => {
Expand Down Expand Up @@ -423,8 +419,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("continue caching from a pool which has a bundle proposal ongoing", async () => {
Expand Down Expand Up @@ -602,8 +596,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool where last bundle proposal was dropped", async () => {
Expand Down Expand Up @@ -767,8 +759,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool where getNextDataItem fails once", async () => {
Expand Down Expand Up @@ -931,8 +921,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool where getNextDataItem fails multiple times", async () => {
Expand Down Expand Up @@ -1175,8 +1163,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool where transformDataItem fails", async () => {
Expand Down Expand Up @@ -1407,8 +1393,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test.skip("start caching from a pool where nextKey fails once", async () => {
Expand Down Expand Up @@ -1527,8 +1511,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(0);

// TODO: assert timeouts
});

test("start caching from a pool where cache methods fail", async () => {
Expand Down Expand Up @@ -1665,7 +1647,173 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(0);
});

test("start caching from a pool which has an endKey", async () => {
// ARRANGE
v.pool = {
...genesis_pool,
data: {
...genesis_pool.data,
current_key: "99",
current_index: "100",
end_key: "152",
},
bundle_proposal: {
...genesis_pool.bundle_proposal,
storage_id: "test_storage_id",
uploader: "test_staker",
next_uploader: "test_staker",
data_size: "123456789",
data_hash: "test_bundle_hash",
bundle_size: "50",
from_key: "100",
to_key: "149",
bundle_summary: "test_summary",
updated_at: "0",
voters_valid: ["test_staker"],
},
} as any;

// ACT
await runCache.call(v);

// ASSERT
const txs = v["client"][0].kyve.bundles.v1beta1;
const queries = v["lcd"][0].kyve.query.v1beta1;
const cacheProvider = v["cacheProvider"];
const runtime = v["runtime"];

// ========================
// ASSERT CLIENT INTERFACES
// ========================

expect(txs.claimUploaderRole).toHaveBeenCalledTimes(0);

expect(txs.voteBundleProposal).toHaveBeenCalledTimes(0);

expect(txs.submitBundleProposal).toHaveBeenCalledTimes(0);

expect(txs.skipUploaderRole).toHaveBeenCalledTimes(0);

// =====================
// ASSERT LCD INTERFACES
// =====================

expect(queries.canVote).toHaveBeenCalledTimes(0);

expect(queries.canPropose).toHaveBeenCalledTimes(0);

// =========================
// ASSERT STORAGE INTERFACES
// =========================

expect(storageProvider.saveBundle).toHaveBeenCalledTimes(0);

expect(storageProvider.retrieveBundle).toHaveBeenCalledTimes(0);

// =======================
// ASSERT CACHE INTERFACES
// =======================

// we use 50 + 3 here because the current bundle is 50 items big
// and because of the end key we only index the next 3 items and stop
// afterwards
expect(cacheProvider.put).toHaveBeenCalledTimes(50 + 3);

for (let n = 0; n < 50 + 3; n++) {
const item = {
key: (n + parseInt(genesis_pool.data.max_bundle_size)).toString(),
value: `${
n + parseInt(genesis_pool.data.max_bundle_size)
}-value-transform`,
};
expect(cacheProvider.put).toHaveBeenNthCalledWith(
n + 1,
(n + parseInt(genesis_pool.data.max_bundle_size)).toString(),
item
);
}

expect(cacheProvider.get).toHaveBeenCalledTimes(0);

expect(cacheProvider.exists).toHaveBeenCalledTimes(
parseInt(genesis_pool.data.max_bundle_size) + 50 + 3
);

for (let n = 0; n < parseInt(genesis_pool.data.max_bundle_size); n++) {
expect(cacheProvider.exists).toHaveBeenNthCalledWith(n + 1, n.toString());
}

for (
let n = parseInt(genesis_pool.data.max_bundle_size);
n < parseInt(genesis_pool.data.max_bundle_size) + 50 + 3;
n++
) {
expect(cacheProvider.exists).toHaveBeenNthCalledWith(n + 1, n.toString());
}

expect(cacheProvider.del).toHaveBeenCalledTimes(0);

expect(cacheProvider.drop).toHaveBeenCalledTimes(0);

// =============================
// ASSERT COMPRESSION INTERFACES
// =============================

expect(compression.compress).toHaveBeenCalledTimes(0);

expect(compression.decompress).toHaveBeenCalledTimes(0);

// =========================
// ASSERT RUNTIME INTERFACES
// =========================

// TODO: assert timeouts
expect(runtime.getDataItem).toHaveBeenCalledTimes(50 + 3);

for (let n = 0; n < 50 + 3; n++) {
expect(runtime.getDataItem).toHaveBeenNthCalledWith(
n + 1,
v,
(n + parseInt(genesis_pool.data.max_bundle_size)).toString()
);
}

expect(runtime.transformDataItem).toHaveBeenCalledTimes(50 + 3);

for (let n = 0; n < 50 + 3; n++) {
const item = {
key: (n + parseInt(genesis_pool.data.max_bundle_size)).toString(),
value: `${n + parseInt(genesis_pool.data.max_bundle_size)}-value`,
};
expect(runtime.transformDataItem).toHaveBeenNthCalledWith(
n + 1,
expect.any(Validator),
item
);
}

expect(runtime.validateDataItem).toHaveBeenCalledTimes(0);

expect(runtime.nextKey).toHaveBeenCalledTimes(50 + 3);

// here we subtract the key - 1 because we start using the
// current key
for (let n = 0; n < 50 + 3; n++) {
expect(runtime.nextKey).toHaveBeenNthCalledWith(
n + 1,
expect.any(Validator),
(n + parseInt(genesis_pool.data.max_bundle_size) - 1).toString()
);
}

expect(runtime.summarizeDataBundle).toHaveBeenCalledTimes(0);

// ========================
// ASSERT NODEJS INTERFACES
// ========================

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);
});
});
14 changes: 13 additions & 1 deletion common/protocol/test/checks/is_pool_active.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Logger } from "tslog";
import { Validator } from "../../src/index";
import { setupMetrics, isPoolActive } from "../../src/methods";
import { isPoolActive, setupMetrics } from "../../src/methods";
import { register } from "prom-client";
import { TestRuntime } from "../mocks/runtime.mock";
import { genesis_pool } from "../mocks/constants";
Expand All @@ -13,6 +13,7 @@ TEST CASES - isPoolActive
* assert if pool status is POOL_STATUS_ACTIVE
* assert if pool status is POOL_STATUS_NO_FUNDS
* assert if pool status is POOL_STATUS_DISABLED
* assert if pool status is POOL_STATUS_END_KEY_REACHED
* assert if pool status is POOL_STATUS_NOT_ENOUGH_DELEGATION
* assert if pool status is POOL_STATUS_UPGRADING
* assert if pool status is POOL_STATUS_UNSPECIFIED
Expand Down Expand Up @@ -80,6 +81,17 @@ describe("isPoolActive", () => {
expect(result).toBeFalsy();
});

test("assert if pool status is POOL_STATUS_END_KEY_REACHED", async () => {
// ARRANGE
v.pool.status = PoolStatus.POOL_STATUS_END_KEY_REACHED;

// ACT
const result = isPoolActive.call(v);

// ASSERT
expect(result).toBeFalsy();
});

test("assert if pool status is POOL_STATUS_NOT_ENOUGH_DELEGATION", async () => {
// ARRANGE
v.pool.status = PoolStatus.POOL_STATUS_NOT_ENOUGH_DELEGATION;
Expand Down
1 change: 1 addition & 0 deletions common/protocol/test/mocks/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export const genesis_pool = {
logo: "ar://9FJDam56yBbmvn8rlamEucATH5UcYqSBw468rlCXn8E",
config: "ar://bYvlKiVLb1YY0Gx4mjUhpWBkztqmA4uEN97kTaZtBfY",
start_key: "0",
end_key: "",
current_key: "",
current_summary: "",
current_index: "0",
Expand Down
Loading

0 comments on commit 52fb22e

Please sign in to comment.