From 990ef4d65810c4d982a4bed9f24a913dae32fe90 Mon Sep 17 00:00:00 2001 From: encalypto Date: Fri, 20 Sep 2024 13:31:26 -0400 Subject: [PATCH] Store `synced_at_block_number` when a deployment syncs (#5610) This is a follow-up to #5566 --- graph/src/components/store/traits.rs | 2 +- .../down.sql | 2 ++ .../2024-08-14-205601_store_synced_at_block/up.sql | 2 ++ store/postgres/src/deployment.rs | 12 ++++++++++-- store/postgres/src/deployment_store.rs | 8 ++++++-- store/postgres/src/detail.rs | 1 + store/postgres/src/primary.rs | 3 +++ store/postgres/src/writable.rs | 11 ++++++----- store/test-store/tests/graph/entity_cache.rs | 2 +- store/test-store/tests/postgres/graft.rs | 4 ++-- store/test-store/tests/postgres/subgraph.rs | 14 +++++++++----- store/test-store/tests/postgres/writable.rs | 2 +- 12 files changed, 44 insertions(+), 19 deletions(-) create mode 100644 store/postgres/migrations/2024-08-14-205601_store_synced_at_block/down.sql create mode 100644 store/postgres/migrations/2024-08-14-205601_store_synced_at_block/up.sql diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 69ed67c16b2..0ac80902a66 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -350,7 +350,7 @@ pub trait WritableStore: ReadStore + DeploymentCursorTracker { ) -> Result<(), StoreError>; /// Force synced status, used for testing. - fn deployment_synced(&self) -> Result<(), StoreError>; + fn deployment_synced(&self, block_ptr: BlockPtr) -> Result<(), StoreError>; /// Return true if the deployment with the given id is fully synced, and return false otherwise. /// Cheap, cached operation. diff --git a/store/postgres/migrations/2024-08-14-205601_store_synced_at_block/down.sql b/store/postgres/migrations/2024-08-14-205601_store_synced_at_block/down.sql new file mode 100644 index 00000000000..5229dc8f425 --- /dev/null +++ b/store/postgres/migrations/2024-08-14-205601_store_synced_at_block/down.sql @@ -0,0 +1,2 @@ +ALTER TABLE subgraphs.subgraph_deployment DROP COLUMN synced_at_block_number; +ALTER TABLE unused_deployments DROP COLUMN synced_at_block_number; diff --git a/store/postgres/migrations/2024-08-14-205601_store_synced_at_block/up.sql b/store/postgres/migrations/2024-08-14-205601_store_synced_at_block/up.sql new file mode 100644 index 00000000000..8f5dcaffe4c --- /dev/null +++ b/store/postgres/migrations/2024-08-14-205601_store_synced_at_block/up.sql @@ -0,0 +1,2 @@ +ALTER TABLE subgraphs.subgraph_deployment ADD COLUMN synced_at_block_number INT4; +ALTER TABLE unused_deployments ADD COLUMN synced_at_block_number INT4; diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 05fc3d59ca1..998070658eb 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -133,6 +133,7 @@ table! { failed -> Bool, health -> crate::deployment::SubgraphHealthMapping, synced_at -> Nullable, + synced_at_block_number -> Nullable, fatal_error -> Nullable, non_fatal_errors -> Array, earliest_block_number -> Integer, @@ -731,7 +732,11 @@ pub fn state(conn: &mut PgConnection, id: DeploymentHash) -> Result Result<(), StoreError> { +pub fn set_synced( + conn: &mut PgConnection, + id: &DeploymentHash, + block_ptr: BlockPtr, +) -> Result<(), StoreError> { use subgraph_deployment as d; update( @@ -739,7 +744,10 @@ pub fn set_synced(conn: &mut PgConnection, id: &DeploymentHash) -> Result<(), St .filter(d::deployment.eq(id.as_str())) .filter(d::synced_at.is_null()), ) - .set(d::synced_at.eq(now)) + .set(( + d::synced_at.eq(now), + d::synced_at_block_number.eq(block_ptr.number), + )) .execute(conn)?; Ok(()) } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index d8b04faac0b..238d51397b2 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -563,9 +563,13 @@ impl DeploymentStore { deployment::exists_and_synced(&mut conn, id.as_str()) } - pub(crate) fn deployment_synced(&self, id: &DeploymentHash) -> Result<(), StoreError> { + pub(crate) fn deployment_synced( + &self, + id: &DeploymentHash, + block_ptr: BlockPtr, + ) -> Result<(), StoreError> { let mut conn = self.get_conn()?; - conn.transaction(|conn| deployment::set_synced(conn, id)) + conn.transaction(|conn| deployment::set_synced(conn, id, block_ptr)) } /// Look up the on_sync action for this deployment diff --git a/store/postgres/src/detail.rs b/store/postgres/src/detail.rs index a0e93933616..807e238f4fe 100644 --- a/store/postgres/src/detail.rs +++ b/store/postgres/src/detail.rs @@ -50,6 +50,7 @@ pub struct DeploymentDetail { pub failed: bool, health: HealthType, pub synced_at: Option>, + pub synced_at_block_number: Option, fatal_error: Option, non_fatal_errors: Vec, /// The earliest block for which we have history diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 0af17928b8b..6017fc093ec 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -180,6 +180,7 @@ table! { latest_ethereum_block_number -> Nullable, failed -> Bool, synced_at -> Nullable, + synced_at_block_number -> Nullable, } } @@ -233,6 +234,7 @@ pub struct UnusedDeployment { pub latest_ethereum_block_number: Option, pub failed: bool, pub synced_at: Option>, + pub synced_at_block_number: Option, } #[derive(Clone, Debug, PartialEq, Eq, Hash, AsExpression, FromSqlRow)] @@ -1681,6 +1683,7 @@ impl<'a> Connection<'a> { u::latest_ethereum_block_number.eq(latest_number), u::failed.eq(detail.failed), u::synced_at.eq(detail.synced_at), + u::synced_at_block_number.eq(detail.synced_at_block_number.clone()), )) .execute(self.conn.as_mut())?; } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index ee7a5e4754f..4bcf434b6ec 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -420,7 +420,7 @@ impl SyncStore { } } - fn deployment_synced(&self) -> Result<(), StoreError> { + fn deployment_synced(&self, block_ptr: BlockPtr) -> Result<(), StoreError> { retry::forever(&self.logger, "deployment_synced", || { let event = { // Make sure we drop `pconn` before we call into the deployment @@ -452,7 +452,8 @@ impl SyncStore { } } - self.writable.deployment_synced(&self.site.deployment)?; + self.writable + .deployment_synced(&self.site.deployment, block_ptr.clone())?; self.store.send_store_event(&event) }) @@ -1659,7 +1660,7 @@ impl WritableStoreTrait for WritableStore { is_caught_up_with_chain_head: bool, ) -> Result<(), StoreError> { if is_caught_up_with_chain_head { - self.deployment_synced()?; + self.deployment_synced(block_ptr_to.clone())?; } else { self.writer.start_batching(); } @@ -1696,10 +1697,10 @@ impl WritableStoreTrait for WritableStore { /// - Disable the time-to-sync metrics gathering. /// - Stop batching writes. /// - Promote it to 'synced' status in the DB, if that hasn't been done already. - fn deployment_synced(&self) -> Result<(), StoreError> { + fn deployment_synced(&self, block_ptr: BlockPtr) -> Result<(), StoreError> { self.writer.deployment_synced(); if !self.is_deployment_synced.load(Ordering::SeqCst) { - self.store.deployment_synced()?; + self.store.deployment_synced(block_ptr)?; self.is_deployment_synced.store(true, Ordering::SeqCst); } Ok(()) diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index d7ebb30785c..b90283f6c93 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -156,7 +156,7 @@ impl WritableStore for MockStore { unimplemented!() } - fn deployment_synced(&self) -> Result<(), StoreError> { + fn deployment_synced(&self, _block_ptr: BlockPtr) -> Result<(), StoreError> { unimplemented!() } diff --git a/store/test-store/tests/postgres/graft.rs b/store/test-store/tests/postgres/graft.rs index 88f77c45b97..1580a62b1aa 100644 --- a/store/test-store/tests/postgres/graft.rs +++ b/store/test-store/tests/postgres/graft.rs @@ -482,7 +482,7 @@ fn on_sync() { .await?; writable.start_subgraph_deployment(&LOGGER).await?; - writable.deployment_synced()?; + writable.deployment_synced(BLOCKS[0].clone())?; let mut primary = primary_connection(); let src_site = primary.locate_site(src)?.unwrap(); @@ -539,7 +539,7 @@ fn on_sync() { store.activate(&dst)?; store.remove_deployment(src.id.into())?; - let res = writable.deployment_synced(); + let res = writable.deployment_synced(BLOCKS[2].clone()); assert!(res.is_ok()); } Ok(()) diff --git a/store/test-store/tests/postgres/subgraph.rs b/store/test-store/tests/postgres/subgraph.rs index f59519b8945..f52e8fa71f9 100644 --- a/store/test-store/tests/postgres/subgraph.rs +++ b/store/test-store/tests/postgres/subgraph.rs @@ -209,14 +209,18 @@ fn create_subgraph() { changes } - fn deployment_synced(store: &Arc, deployment: &DeploymentLocator) { + fn deployment_synced( + store: &Arc, + deployment: &DeploymentLocator, + block_ptr: BlockPtr, + ) { futures03::executor::block_on(store.cheap_clone().writable( LOGGER.clone(), deployment.id, Arc::new(Vec::new()), )) .expect("can get writable") - .deployment_synced() + .deployment_synced(block_ptr) .unwrap(); } @@ -259,7 +263,7 @@ fn create_subgraph() { assert!(pending.is_none()); // Sync deployment - deployment_synced(&store, &deployment2); + deployment_synced(&store, &deployment2, GENESIS_PTR.clone()); // Deploying again still overwrites current let (deployment3, events) = deploy(store.as_ref(), ID3, MODE); @@ -319,7 +323,7 @@ fn create_subgraph() { assert!(pending.is_none()); // Deploy when current is synced leaves current alone and adds pending - deployment_synced(&store, &deployment2); + deployment_synced(&store, &deployment2, GENESIS_PTR.clone()); let (deployment3, events) = deploy(store.as_ref(), ID3, MODE); let expected = deploy_event(&deployment3); assert_eq!(expected, events); @@ -354,7 +358,7 @@ fn create_subgraph() { assert_eq!(None, pending.as_deref()); // Mark `ID3` as synced and deploy that again - deployment_synced(&store, &deployment3); + deployment_synced(&store, &deployment3, GENESIS_PTR.clone()); let expected = HashSet::from([unassigned(&deployment2), assigned(&deployment3)]); let (deployment3_again, events) = deploy(store.as_ref(), ID3, MODE); assert_eq!(&deployment3, &deployment3_again); diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index 4a986e6f3fa..df04615898a 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -146,7 +146,7 @@ where let read_count = || read_count(writable.as_ref()); if !batch { - writable.deployment_synced().unwrap(); + writable.deployment_synced(block_pointer(0)).unwrap(); } for count in 1..4 {