Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
zorancv committed Dec 18, 2024
1 parent 270356d commit 5910d01
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 38 deletions.
4 changes: 1 addition & 3 deletions chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use graph::{
subgraph::{MappingError, ProofOfIndexingEvent, SharedProofOfIndexing},
trigger_processor::HostedTrigger,
},
data::store::EntityV,
prelude::{
anyhow, async_trait, BlockHash, BlockNumber, BlockState, CheapClone, RuntimeHostBuilder,
},
Expand Down Expand Up @@ -238,8 +237,7 @@ where
logger,
);

let vid = state.next_vid(block.number);
state.entity_cache.set(key, EntityV::new(entity, vid))?;
state.entity_cache.set(key, entity, block.number)?;
}
ParsedChanges::Delete(entity_key) => {
let entity_type = entity_key.entity_type.cheap_clone();
Expand Down
7 changes: 4 additions & 3 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use graph::components::{
subgraph::{MappingError, PoICausalityRegion, ProofOfIndexing, SharedProofOfIndexing},
};
use graph::data::store::scalar::Bytes;
use graph::data::store::EntityV;
use graph::data::subgraph::{
schema::{SubgraphError, SubgraphHealth},
SubgraphFeature,
Expand Down Expand Up @@ -1604,6 +1603,7 @@ async fn update_proof_of_indexing(
key: EntityKey,
digest: Bytes,
block_time: BlockTime,
block: BlockNumber,
) -> Result<(), Error> {
let digest_name = entity_cache.schema.poi_digest();
let mut data = vec![
Expand All @@ -1618,12 +1618,12 @@ async fn update_proof_of_indexing(
data.push((entity_cache.schema.poi_block_time(), block_time));
}
let poi = entity_cache.make_entity(data)?;
// VID is autogenerated for POI table and our input is ignored
entity_cache.set(key, EntityV::new(poi, 0))
entity_cache.set(key, poi, block)
}

let _section_guard = stopwatch.start_section("update_proof_of_indexing");

let block_number = proof_of_indexing.get_block();
let mut proof_of_indexing = proof_of_indexing.take();

for (causality_region, stream) in proof_of_indexing.drain() {
Expand Down Expand Up @@ -1659,6 +1659,7 @@ async fn update_proof_of_indexing(
entity_key,
updated_proof_of_indexing,
block_time,
block_number,
)?;
}

Expand Down
20 changes: 18 additions & 2 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ pub struct EntityCache {
/// generated IDs, the `EntityCache` needs to be newly instantiated for
/// each block
seq: u32,

// Sequence number of the next VID value for this block. The value written
// in the database consist of a block number and this SEQ number.
pub vid_seq: i32,
}

impl Debug for EntityCache {
Expand Down Expand Up @@ -132,6 +136,7 @@ impl EntityCache {
schema: store.input_schema(),
store,
seq: 0,
vid_seq: 0,
}
}

Expand All @@ -152,6 +157,7 @@ impl EntityCache {
schema: store.input_schema(),
store,
seq: 0,
vid_seq: 0,
}
}

Expand Down Expand Up @@ -349,9 +355,19 @@ impl EntityCache {
/// with existing data. The entity will be validated against the
/// subgraph schema, and any errors will result in an `Err` being
/// returned.
pub fn set(&mut self, key: EntityKey, entity: EntityV) -> Result<(), anyhow::Error> {
pub fn set(
&mut self,
key: EntityKey,
entity: Entity,
block: BlockNumber,
) -> Result<(), anyhow::Error> {
// check the validate for derived fields
let is_valid = entity.e.validate(&key).is_ok();
let is_valid = entity.validate(&key).is_ok();

//The next VID is based on a block number and a sequence withing the block
let vid = ((block as i64) << 32) + self.vid_seq as i64;
self.vid_seq += 1;
let entity = EntityV::new(entity, vid);

self.entity_op(key.clone(), EntityOp::Update(entity));

Expand Down
10 changes: 0 additions & 10 deletions graph/src/components/subgraph/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ pub struct BlockState {
// data source that have been processed.
pub processed_data_sources: Vec<StoredDynamicDataSource>,

pub vid_seq: i32,

// Marks whether a handler is currently executing.
in_handler: bool,

Expand All @@ -95,7 +93,6 @@ impl BlockState {
persisted_data_sources: Vec::new(),
handler_created_data_sources: Vec::new(),
processed_data_sources: Vec::new(),
vid_seq: 0,
in_handler: false,
metrics: BlockStateMetrics::new(),
}
Expand All @@ -113,7 +110,6 @@ impl BlockState {
persisted_data_sources,
handler_created_data_sources,
processed_data_sources,
vid_seq: _,
in_handler,
metrics,
} = self;
Expand Down Expand Up @@ -183,10 +179,4 @@ impl BlockState {
pub fn persist_data_source(&mut self, ds: StoredDynamicDataSource) {
self.persisted_data_sources.push(ds)
}

pub fn next_vid(&mut self, block_number: BlockNumber) -> i64 {
let vid = ((block_number as i64) << 32) + self.vid_seq as i64;
self.vid_seq += 1;
vid
}
}
4 changes: 4 additions & 0 deletions graph/src/components/subgraph/proof_of_indexing/online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ impl ProofOfIndexing {
pub fn take(self) -> HashMap<Id, BlockEventStream> {
self.per_causality_region
}

pub fn get_block(&self) -> BlockNumber {
self.block_number
}
}

pub struct ProofOfIndexingFinisher {
Expand Down
5 changes: 2 additions & 3 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use graph::components::store::{EnsLookup, GetScope, LoadRelatedRequest};
use graph::components::subgraph::{
InstanceDSTemplate, PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing,
};
use graph::data::store::{self, EntityV};
use graph::data::store::{self};
use graph::data_source::{CausalityRegion, DataSource, EntityTypeAccess};
use graph::ensure;
use graph::prelude::ethabi::param_type::Reader;
Expand Down Expand Up @@ -248,7 +248,6 @@ impl HostExports {
gas: &GasCounter,
) -> Result<(), HostExportError> {
let entity_type = state.entity_cache.schema.entity_type(&entity_type)?;
let vid = state.next_vid(block);

Self::expect_object_type(&entity_type, "set")?;

Expand Down Expand Up @@ -351,7 +350,7 @@ impl HostExports {

state.metrics.track_entity_write(&entity_type, &entity);

state.entity_cache.set(key, EntityV::new(entity, vid))?;
state.entity_cache.set(key, entity, block)?;

Ok(())
}
Expand Down
28 changes: 11 additions & 17 deletions store/test-store/tests/graph/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,21 +210,21 @@ fn insert_modifications() {
let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai" };
let mogwai_key = make_band_key("mogwai");
cache
.set(mogwai_key.clone(), EntityV::new(mogwai_data.clone(), 0))
.set(mogwai_key.clone(), mogwai_data.clone(), 0)
.unwrap();

let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros" };
let sigurros_key = make_band_key("sigurros");
cache
.set(sigurros_key.clone(), EntityV::new(sigurros_data.clone(), 0))
.set(sigurros_key.clone(), sigurros_data.clone(), 0)
.unwrap();

let result = cache.as_modifications(0);
assert_eq!(
sort_by_entity_key(result.unwrap().modifications),
sort_by_entity_key(vec![
EntityModification::insert(mogwai_key, mogwai_data, 0, 0),
EntityModification::insert(sigurros_key, sigurros_data, 0, 0)
EntityModification::insert(sigurros_key, sigurros_data, 0, 1)
])
);
}
Expand Down Expand Up @@ -256,21 +256,21 @@ fn overwrite_modifications() {
let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 };
let mogwai_key = make_band_key("mogwai");
cache
.set(mogwai_key.clone(), EntityV::new(mogwai_data.clone(), 0))
.set(mogwai_key.clone(), mogwai_data.clone(), 0)
.unwrap();

let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros", founded: 1994 };
let sigurros_key = make_band_key("sigurros");
cache
.set(sigurros_key.clone(), EntityV::new(sigurros_data.clone(), 0))
.set(sigurros_key.clone(), sigurros_data.clone(), 0)
.unwrap();

let result = cache.as_modifications(0);
assert_eq!(
sort_by_entity_key(result.unwrap().modifications),
sort_by_entity_key(vec![
EntityModification::overwrite(mogwai_key, mogwai_data, 0, 0),
EntityModification::overwrite(sigurros_key, sigurros_data, 0, 0)
EntityModification::overwrite(sigurros_key, sigurros_data, 0, 1)
])
);
}
Expand All @@ -293,14 +293,12 @@ fn consecutive_modifications() {
let update_data =
entity! { SCHEMA => id: "mogwai", founded: 1995, label: "Rock Action Records" };
let update_key = make_band_key("mogwai");
cache.set(update_key, EntityV::new(update_data, 0)).unwrap();
cache.set(update_key, update_data, 0).unwrap();

// Then, just reset the "label".
let update_data = entity! { SCHEMA => id: "mogwai", label: Value::Null };
let update_key = make_band_key("mogwai");
cache
.set(update_key.clone(), EntityV::new(update_data, 0))
.unwrap();
cache.set(update_key.clone(), update_data, 0).unwrap();

// We expect a single overwrite modification for the above that leaves "id"
// and "name" untouched, sets "founded" and removes the "label" field.
Expand Down Expand Up @@ -721,9 +719,7 @@ fn scoped_get() {
let account5 = ACCOUNT_TYPE.parse_id("5").unwrap();
let wallet5 = create_wallet_entity("5", &account5, 100);
let key5 = WALLET_TYPE.parse_key("5").unwrap();
cache
.set(key5.clone(), EntityV::new(wallet5.clone(), 5))
.unwrap();
cache.set(key5.clone(), wallet5.clone(), 0).unwrap();

// For the new entity, we can retrieve it with either scope
let act5 = cache.get(&key5, GetScope::InBlock).unwrap();
Expand All @@ -744,9 +740,7 @@ fn scoped_get() {
// But if it gets updated, it becomes visible with either scope
let mut wallet1 = wallet1;
wallet1.set("balance", 70).unwrap();
cache
.set(key1.clone(), EntityV::new(wallet1.clone(), 1))
.unwrap();
cache.set(key1.clone(), wallet1.clone(), 0).unwrap();
let act1 = cache.get(&key1, GetScope::InBlock).unwrap();
assert_eq!(Some(&wallet1), act1.as_ref().map(|e| e.as_ref()));
let act1 = cache.get(&key1, GetScope::Store).unwrap();
Expand Down Expand Up @@ -793,6 +787,6 @@ fn no_interface_mods() {

let entity = entity! { LOAD_RELATED_SUBGRAPH => id: "1", balance: 100 };

cache.set(key, EntityV::new(entity, 0)).unwrap_err();
cache.set(key, entity, 0).unwrap_err();
})
}

0 comments on commit 5910d01

Please sign in to comment.