Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add end-key to pool #133

Merged
merged 7 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/protocol/src/methods/checks/isPoolActive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ export function isPoolActive(this: Validator): boolean {
case PoolStatus.POOL_STATUS_UNSPECIFIED:
this.logger.info("Pool status is currently unspecified. Idling ...");
return false;
case PoolStatus.POOL_STATUS_END_KEY_REACHED:
this.logger.info("End key reached. Idling ...");
return false;
default:
this.logger.info("Pool status is currently unknown. Idling ...");
return false;
Expand Down
14 changes: 9 additions & 5 deletions common/protocol/src/methods/main/runCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ export async function runCache(this: Validator): Promise<void> {
this.m.cache_index_tail.set(Math.max(0, currentIndex - 1));

for (let i = currentIndex; i < targetIndex; i++) {
// if the end key is not empty and we have reached the end key of the pool
// we do not sync past this key
if (this.pool.data?.end_key && this.pool.data.end_key === key) {
this.logger.info(
`Reached pool end key "${key}", the node will not continue collecting data past this key.`
);
break;
}

// check if data item was already collected. If it was
// already collected we don't need to retrieve it again
this.logger.debug(`this.cacheProvider.exists(${i.toString()})`);
Expand Down Expand Up @@ -223,11 +232,6 @@ export async function runCache(this: Validator): Promise<void> {
key = nextKey;
}

// indicate that current caching round is done
this.logger.debug(
`Finished caching from index ${currentIndex} to ${targetIndex}. Waiting for next round ...`
);

// wait until a new bundle proposal is available. We don't need
// to sync the pool here because the pool state already gets
// synced in the other main function "runNode" so we only listen
Expand Down
6 changes: 6 additions & 0 deletions common/protocol/src/methods/main/runNode.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { PoolStatus } from "@kyvejs/types/lcd/kyve/pool/v1beta1/pool";
import { Validator } from "../..";
import { IDLE_TIME, sleep } from "../../utils";

Expand Down Expand Up @@ -33,6 +34,11 @@ export async function runNode(this: Validator): Promise<void> {
process.exit(1);
}

if (this.pool.status === PoolStatus.POOL_STATUS_END_KEY_REACHED) {
this.logger.info(`Reached pool end key. Shutting down node ...`);
process.exit(0);
}

// log warnings if storage provider balance is low
await this.isStorageBalanceLow();

Expand Down
13 changes: 13 additions & 0 deletions common/protocol/src/methods/validate/validateBundleProposal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,19 @@ export async function validateBundleProposal(
// validate each data item in bundle with custom runtime validation
for (let i = 0; i < proposedBundle.length; i++) {
if (valid) {
// if the pool has an end key and we find out that a data item
// has the end key and it is not the last data item in the bundle
// we consider the bundle invalid
if (this.pool.data?.end_key) {
if (
i < proposedBundle.length - 1 &&
proposedBundle[i].key === this.pool.data?.end_key
) {
valid = false;
break;
}
}

this.logger.debug(
`this.runtime.validateDataItem($THIS, $PROPOSED_DATA_ITEM, $VALIDATION_DATA_ITEM)`
);
Expand Down
188 changes: 168 additions & 20 deletions common/protocol/test/cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ TEST CASES - cache tests
* start caching from a pool where transformDataItem fails
* start caching from a pool where nextKey fails once
* start caching from a pool where cache methods fail
* TODO: test with pool config that has no source object
* TODO: test with pool config that has zero sources
* TODO: start caching from a pool where node has not cached anything yet
* start caching from a pool which has an endKey

*/

Expand Down Expand Up @@ -249,8 +247,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool which has a bundle proposal ongoing", async () => {
Expand Down Expand Up @@ -423,8 +419,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("continue caching from a pool which has a bundle proposal ongoing", async () => {
Expand Down Expand Up @@ -602,8 +596,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool where last bundle proposal was dropped", async () => {
Expand Down Expand Up @@ -767,8 +759,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool where getNextDataItem fails once", async () => {
Expand Down Expand Up @@ -931,8 +921,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool where getNextDataItem fails multiple times", async () => {
Expand Down Expand Up @@ -1175,8 +1163,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool where transformDataItem fails", async () => {
Expand Down Expand Up @@ -1407,8 +1393,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test.skip("start caching from a pool where nextKey fails once", async () => {
Expand Down Expand Up @@ -1527,8 +1511,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(0);

// TODO: assert timeouts
});

test("start caching from a pool where cache methods fail", async () => {
Expand Down Expand Up @@ -1665,7 +1647,173 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(0);
});

test("start caching from a pool which has an endKey", async () => {
// ARRANGE
v.pool = {
...genesis_pool,
data: {
...genesis_pool.data,
current_key: "99",
current_index: "100",
end_key: "152",
},
bundle_proposal: {
...genesis_pool.bundle_proposal,
storage_id: "test_storage_id",
uploader: "test_staker",
next_uploader: "test_staker",
data_size: "123456789",
data_hash: "test_bundle_hash",
bundle_size: "50",
from_key: "100",
to_key: "149",
bundle_summary: "test_summary",
updated_at: "0",
voters_valid: ["test_staker"],
},
} as any;

// ACT
await runCache.call(v);

// ASSERT
const txs = v["client"][0].kyve.bundles.v1beta1;
const queries = v["lcd"][0].kyve.query.v1beta1;
const cacheProvider = v["cacheProvider"];
const runtime = v["runtime"];

// ========================
// ASSERT CLIENT INTERFACES
// ========================

expect(txs.claimUploaderRole).toHaveBeenCalledTimes(0);

expect(txs.voteBundleProposal).toHaveBeenCalledTimes(0);

expect(txs.submitBundleProposal).toHaveBeenCalledTimes(0);

expect(txs.skipUploaderRole).toHaveBeenCalledTimes(0);

// =====================
// ASSERT LCD INTERFACES
// =====================

expect(queries.canVote).toHaveBeenCalledTimes(0);

expect(queries.canPropose).toHaveBeenCalledTimes(0);

// =========================
// ASSERT STORAGE INTERFACES
// =========================

expect(storageProvider.saveBundle).toHaveBeenCalledTimes(0);

expect(storageProvider.retrieveBundle).toHaveBeenCalledTimes(0);

// =======================
// ASSERT CACHE INTERFACES
// =======================

// we use 50 + 3 here because the current bundle is 50 items big
// and because of the end key we only index the next 3 items and stop
// afterwards
expect(cacheProvider.put).toHaveBeenCalledTimes(50 + 3);

for (let n = 0; n < 50 + 3; n++) {
const item = {
key: (n + parseInt(genesis_pool.data.max_bundle_size)).toString(),
value: `${
n + parseInt(genesis_pool.data.max_bundle_size)
}-value-transform`,
};
expect(cacheProvider.put).toHaveBeenNthCalledWith(
n + 1,
(n + parseInt(genesis_pool.data.max_bundle_size)).toString(),
item
);
}

expect(cacheProvider.get).toHaveBeenCalledTimes(0);

expect(cacheProvider.exists).toHaveBeenCalledTimes(
parseInt(genesis_pool.data.max_bundle_size) + 50 + 3
);

for (let n = 0; n < parseInt(genesis_pool.data.max_bundle_size); n++) {
expect(cacheProvider.exists).toHaveBeenNthCalledWith(n + 1, n.toString());
}

for (
let n = parseInt(genesis_pool.data.max_bundle_size);
n < parseInt(genesis_pool.data.max_bundle_size) + 50 + 3;
n++
) {
expect(cacheProvider.exists).toHaveBeenNthCalledWith(n + 1, n.toString());
}

expect(cacheProvider.del).toHaveBeenCalledTimes(0);

expect(cacheProvider.drop).toHaveBeenCalledTimes(0);

// =============================
// ASSERT COMPRESSION INTERFACES
// =============================

expect(compression.compress).toHaveBeenCalledTimes(0);

expect(compression.decompress).toHaveBeenCalledTimes(0);

// =========================
// ASSERT RUNTIME INTERFACES
// =========================

// TODO: assert timeouts
expect(runtime.getDataItem).toHaveBeenCalledTimes(50 + 3);

for (let n = 0; n < 50 + 3; n++) {
expect(runtime.getDataItem).toHaveBeenNthCalledWith(
n + 1,
v,
(n + parseInt(genesis_pool.data.max_bundle_size)).toString()
);
}

expect(runtime.transformDataItem).toHaveBeenCalledTimes(50 + 3);

for (let n = 0; n < 50 + 3; n++) {
const item = {
key: (n + parseInt(genesis_pool.data.max_bundle_size)).toString(),
value: `${n + parseInt(genesis_pool.data.max_bundle_size)}-value`,
};
expect(runtime.transformDataItem).toHaveBeenNthCalledWith(
n + 1,
expect.any(Validator),
item
);
}

expect(runtime.validateDataItem).toHaveBeenCalledTimes(0);

expect(runtime.nextKey).toHaveBeenCalledTimes(50 + 3);

// here we subtract the key - 1 because we start using the
// current key
for (let n = 0; n < 50 + 3; n++) {
expect(runtime.nextKey).toHaveBeenNthCalledWith(
n + 1,
expect.any(Validator),
(n + parseInt(genesis_pool.data.max_bundle_size) - 1).toString()
);
}

expect(runtime.summarizeDataBundle).toHaveBeenCalledTimes(0);

// ========================
// ASSERT NODEJS INTERFACES
// ========================

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);
});
});
14 changes: 13 additions & 1 deletion common/protocol/test/checks/is_pool_active.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Logger } from "tslog";
import { Validator } from "../../src/index";
import { setupMetrics, isPoolActive } from "../../src/methods";
import { isPoolActive, setupMetrics } from "../../src/methods";
import { register } from "prom-client";
import { TestRuntime } from "../mocks/runtime.mock";
import { genesis_pool } from "../mocks/constants";
Expand All @@ -13,6 +13,7 @@ TEST CASES - isPoolActive
* assert if pool status is POOL_STATUS_ACTIVE
* assert if pool status is POOL_STATUS_NO_FUNDS
* assert if pool status is POOL_STATUS_DISABLED
* assert if pool status is POOL_STATUS_END_KEY_REACHED
* assert if pool status is POOL_STATUS_NOT_ENOUGH_DELEGATION
* assert if pool status is POOL_STATUS_UPGRADING
* assert if pool status is POOL_STATUS_UNSPECIFIED
Expand Down Expand Up @@ -80,6 +81,17 @@ describe("isPoolActive", () => {
expect(result).toBeFalsy();
});

test("assert if pool status is POOL_STATUS_END_KEY_REACHED", async () => {
// ARRANGE
v.pool.status = PoolStatus.POOL_STATUS_END_KEY_REACHED;

// ACT
const result = isPoolActive.call(v);

// ASSERT
expect(result).toBeFalsy();
});

test("assert if pool status is POOL_STATUS_NOT_ENOUGH_DELEGATION", async () => {
// ARRANGE
v.pool.status = PoolStatus.POOL_STATUS_NOT_ENOUGH_DELEGATION;
Expand Down
1 change: 1 addition & 0 deletions common/protocol/test/mocks/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export const genesis_pool = {
logo: "ar://9FJDam56yBbmvn8rlamEucATH5UcYqSBw468rlCXn8E",
config: "ar://bYvlKiVLb1YY0Gx4mjUhpWBkztqmA4uEN97kTaZtBfY",
start_key: "0",
end_key: "",
current_key: "",
current_summary: "",
current_index: "0",
Expand Down
Loading
Loading