Skip to content

Commit

Permalink
Store synced_at_block_number when a deployment syncs (#5610)
Browse files Browse the repository at this point in the history
This is a follow-up to #5566
  • Loading branch information
encalypto authored Sep 20, 2024
1 parent 6cfc444 commit 990ef4d
Show file tree
Hide file tree
Showing 12 changed files with 44 additions and 19 deletions.
2 changes: 1 addition & 1 deletion graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ pub trait WritableStore: ReadStore + DeploymentCursorTracker {
) -> Result<(), StoreError>;

/// Force synced status, used for testing.
fn deployment_synced(&self) -> Result<(), StoreError>;
fn deployment_synced(&self, block_ptr: BlockPtr) -> Result<(), StoreError>;

/// Return true if the deployment with the given id is fully synced, and return false otherwise.
/// Cheap, cached operation.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE subgraphs.subgraph_deployment DROP COLUMN synced_at_block_number;
ALTER TABLE unused_deployments DROP COLUMN synced_at_block_number;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE subgraphs.subgraph_deployment ADD COLUMN synced_at_block_number INT4;
ALTER TABLE unused_deployments ADD COLUMN synced_at_block_number INT4;
12 changes: 10 additions & 2 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ table! {
failed -> Bool,
health -> crate::deployment::SubgraphHealthMapping,
synced_at -> Nullable<Timestamptz>,
synced_at_block_number -> Nullable<Int4>,
fatal_error -> Nullable<Text>,
non_fatal_errors -> Array<Text>,
earliest_block_number -> Integer,
Expand Down Expand Up @@ -731,15 +732,22 @@ pub fn state(conn: &mut PgConnection, id: DeploymentHash) -> Result<DeploymentSt
}

/// Mark the deployment `id` as synced
pub fn set_synced(conn: &mut PgConnection, id: &DeploymentHash) -> Result<(), StoreError> {
pub fn set_synced(
conn: &mut PgConnection,
id: &DeploymentHash,
block_ptr: BlockPtr,
) -> Result<(), StoreError> {
use subgraph_deployment as d;

update(
d::table
.filter(d::deployment.eq(id.as_str()))
.filter(d::synced_at.is_null()),
)
.set(d::synced_at.eq(now))
.set((
d::synced_at.eq(now),
d::synced_at_block_number.eq(block_ptr.number),
))
.execute(conn)?;
Ok(())
}
Expand Down
8 changes: 6 additions & 2 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,13 @@ impl DeploymentStore {
deployment::exists_and_synced(&mut conn, id.as_str())
}

pub(crate) fn deployment_synced(&self, id: &DeploymentHash) -> Result<(), StoreError> {
pub(crate) fn deployment_synced(
&self,
id: &DeploymentHash,
block_ptr: BlockPtr,
) -> Result<(), StoreError> {
let mut conn = self.get_conn()?;
conn.transaction(|conn| deployment::set_synced(conn, id))
conn.transaction(|conn| deployment::set_synced(conn, id, block_ptr))
}

/// Look up the on_sync action for this deployment
Expand Down
1 change: 1 addition & 0 deletions store/postgres/src/detail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct DeploymentDetail {
pub failed: bool,
health: HealthType,
pub synced_at: Option<DateTime<Utc>>,
pub synced_at_block_number: Option<i32>,
fatal_error: Option<String>,
non_fatal_errors: Vec<String>,
/// The earliest block for which we have history
Expand Down
3 changes: 3 additions & 0 deletions store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ table! {
latest_ethereum_block_number -> Nullable<Integer>,
failed -> Bool,
synced_at -> Nullable<Timestamptz>,
synced_at_block_number -> Nullable<Int4>,
}
}

Expand Down Expand Up @@ -233,6 +234,7 @@ pub struct UnusedDeployment {
pub latest_ethereum_block_number: Option<i32>,
pub failed: bool,
pub synced_at: Option<DateTime<Utc>>,
pub synced_at_block_number: Option<i32>,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash, AsExpression, FromSqlRow)]
Expand Down Expand Up @@ -1681,6 +1683,7 @@ impl<'a> Connection<'a> {
u::latest_ethereum_block_number.eq(latest_number),
u::failed.eq(detail.failed),
u::synced_at.eq(detail.synced_at),
u::synced_at_block_number.eq(detail.synced_at_block_number.clone()),
))
.execute(self.conn.as_mut())?;
}
Expand Down
11 changes: 6 additions & 5 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ impl SyncStore {
}
}

fn deployment_synced(&self) -> Result<(), StoreError> {
fn deployment_synced(&self, block_ptr: BlockPtr) -> Result<(), StoreError> {
retry::forever(&self.logger, "deployment_synced", || {
let event = {
// Make sure we drop `pconn` before we call into the deployment
Expand Down Expand Up @@ -452,7 +452,8 @@ impl SyncStore {
}
}

self.writable.deployment_synced(&self.site.deployment)?;
self.writable
.deployment_synced(&self.site.deployment, block_ptr.clone())?;

self.store.send_store_event(&event)
})
Expand Down Expand Up @@ -1659,7 +1660,7 @@ impl WritableStoreTrait for WritableStore {
is_caught_up_with_chain_head: bool,
) -> Result<(), StoreError> {
if is_caught_up_with_chain_head {
self.deployment_synced()?;
self.deployment_synced(block_ptr_to.clone())?;
} else {
self.writer.start_batching();
}
Expand Down Expand Up @@ -1696,10 +1697,10 @@ impl WritableStoreTrait for WritableStore {
/// - Disable the time-to-sync metrics gathering.
/// - Stop batching writes.
/// - Promote it to 'synced' status in the DB, if that hasn't been done already.
fn deployment_synced(&self) -> Result<(), StoreError> {
fn deployment_synced(&self, block_ptr: BlockPtr) -> Result<(), StoreError> {
self.writer.deployment_synced();
if !self.is_deployment_synced.load(Ordering::SeqCst) {
self.store.deployment_synced()?;
self.store.deployment_synced(block_ptr)?;
self.is_deployment_synced.store(true, Ordering::SeqCst);
}
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion store/test-store/tests/graph/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl WritableStore for MockStore {
unimplemented!()
}

fn deployment_synced(&self) -> Result<(), StoreError> {
fn deployment_synced(&self, _block_ptr: BlockPtr) -> Result<(), StoreError> {
unimplemented!()
}

Expand Down
4 changes: 2 additions & 2 deletions store/test-store/tests/postgres/graft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ fn on_sync() {
.await?;

writable.start_subgraph_deployment(&LOGGER).await?;
writable.deployment_synced()?;
writable.deployment_synced(BLOCKS[0].clone())?;

let mut primary = primary_connection();
let src_site = primary.locate_site(src)?.unwrap();
Expand Down Expand Up @@ -539,7 +539,7 @@ fn on_sync() {
store.activate(&dst)?;
store.remove_deployment(src.id.into())?;

let res = writable.deployment_synced();
let res = writable.deployment_synced(BLOCKS[2].clone());
assert!(res.is_ok());
}
Ok(())
Expand Down
14 changes: 9 additions & 5 deletions store/test-store/tests/postgres/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,18 @@ fn create_subgraph() {
changes
}

fn deployment_synced(store: &Arc<SubgraphStore>, deployment: &DeploymentLocator) {
fn deployment_synced(
store: &Arc<SubgraphStore>,
deployment: &DeploymentLocator,
block_ptr: BlockPtr,
) {
futures03::executor::block_on(store.cheap_clone().writable(
LOGGER.clone(),
deployment.id,
Arc::new(Vec::new()),
))
.expect("can get writable")
.deployment_synced()
.deployment_synced(block_ptr)
.unwrap();
}

Expand Down Expand Up @@ -259,7 +263,7 @@ fn create_subgraph() {
assert!(pending.is_none());

// Sync deployment
deployment_synced(&store, &deployment2);
deployment_synced(&store, &deployment2, GENESIS_PTR.clone());

// Deploying again still overwrites current
let (deployment3, events) = deploy(store.as_ref(), ID3, MODE);
Expand Down Expand Up @@ -319,7 +323,7 @@ fn create_subgraph() {
assert!(pending.is_none());

// Deploy when current is synced leaves current alone and adds pending
deployment_synced(&store, &deployment2);
deployment_synced(&store, &deployment2, GENESIS_PTR.clone());
let (deployment3, events) = deploy(store.as_ref(), ID3, MODE);
let expected = deploy_event(&deployment3);
assert_eq!(expected, events);
Expand Down Expand Up @@ -354,7 +358,7 @@ fn create_subgraph() {
assert_eq!(None, pending.as_deref());

// Mark `ID3` as synced and deploy that again
deployment_synced(&store, &deployment3);
deployment_synced(&store, &deployment3, GENESIS_PTR.clone());
let expected = HashSet::from([unassigned(&deployment2), assigned(&deployment3)]);
let (deployment3_again, events) = deploy(store.as_ref(), ID3, MODE);
assert_eq!(&deployment3, &deployment3_again);
Expand Down
2 changes: 1 addition & 1 deletion store/test-store/tests/postgres/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ where
let read_count = || read_count(writable.as_ref());

if !batch {
writable.deployment_synced().unwrap();
writable.deployment_synced(block_pointer(0)).unwrap();
}

for count in 1..4 {
Expand Down

0 comments on commit 990ef4d

Please sign in to comment.