Skip to content

Commit

Permalink
replace ReadSTore with SourceableStore
Browse files Browse the repository at this point in the history
  • Loading branch information
zorancv committed Dec 3, 2024
1 parent 0d7e4d5 commit 38c4286
Show file tree
Hide file tree
Showing 17 changed files with 56 additions and 55 deletions.
4 changes: 2 additions & 2 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use graph::blockchain::{
};
use graph::cheap_clone::CheapClone;
use graph::components::adapter::ChainId;
use graph::components::store::{DeploymentCursorTracker, ReadStore};
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::EnvVars;
use graph::firehose::FirehoseEndpoint;
Expand Down Expand Up @@ -121,7 +121,7 @@ impl Blockchain for Chain {
deployment: DeploymentLocator,
store: impl DeploymentCursorTracker,
start_blocks: Vec<BlockNumber>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
filter: Arc<TriggerFilterWrapper<Self>>,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Self>>, Error> {
Expand Down
4 changes: 2 additions & 2 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use graph::blockchain::block_stream::{BlockStreamError, BlockStreamMapper, Fireh
use graph::blockchain::client::ChainClient;
use graph::blockchain::{BasicBlockchainBuilder, BlockchainBuilder, NoopRuntimeAdapter};
use graph::cheap_clone::CheapClone;
use graph::components::store::{DeploymentCursorTracker, ReadStore};
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::{
blockchain::{
Expand Down Expand Up @@ -114,7 +114,7 @@ impl Blockchain for Chain {
deployment: DeploymentLocator,
store: impl DeploymentCursorTracker,
start_blocks: Vec<BlockNumber>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
filter: Arc<TriggerFilterWrapper<Self>>,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Self>>, Error> {
Expand Down
8 changes: 4 additions & 4 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use graph::blockchain::{
TriggersAdapterSelector,
};
use graph::components::adapter::ChainId;
use graph::components::store::{DeploymentCursorTracker, ReadStore};
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::{FirehoseEndpoint, ForkStep};
use graph::futures03::compat::Future01CompatExt;
Expand Down Expand Up @@ -128,7 +128,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
chain: &Chain,
deployment: DeploymentLocator,
start_blocks: Vec<BlockNumber>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
subgraph_current_block: Option<BlockPtr>,
filter: Arc<TriggerFilterWrapper<Chain>>,
unified_api_version: UnifiedMappingApiVersion,
Expand All @@ -150,7 +150,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
chain: &Chain,
deployment: DeploymentLocator,
start_blocks: Vec<BlockNumber>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
subgraph_current_block: Option<BlockPtr>,
filter: Arc<TriggerFilterWrapper<Chain>>,
unified_api_version: UnifiedMappingApiVersion,
Expand Down Expand Up @@ -437,7 +437,7 @@ impl Blockchain for Chain {
deployment: DeploymentLocator,
store: impl DeploymentCursorTracker,
start_blocks: Vec<BlockNumber>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
filter: Arc<TriggerFilterWrapper<Self>>,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Self>>, Error> {
Expand Down
6 changes: 3 additions & 3 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use graph::blockchain::{
};
use graph::cheap_clone::CheapClone;
use graph::components::adapter::ChainId;
use graph::components::store::{DeploymentCursorTracker, ReadStore};
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::EnvVars;
use graph::firehose::FirehoseEndpoint;
Expand Down Expand Up @@ -152,7 +152,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
_chain: &Chain,
_deployment: DeploymentLocator,
_start_blocks: Vec<BlockNumber>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
_subgraph_current_block: Option<BlockPtr>,
_filter: Arc<TriggerFilterWrapper<Chain>>,
_unified_api_version: UnifiedMappingApiVersion,
Expand Down Expand Up @@ -232,7 +232,7 @@ impl Blockchain for Chain {
deployment: DeploymentLocator,
store: impl DeploymentCursorTracker,
start_blocks: Vec<BlockNumber>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
filter: Arc<TriggerFilterWrapper<Self>>,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Self>>, Error> {
Expand Down
6 changes: 3 additions & 3 deletions chain/starknet/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use graph::{
cheap_clone::CheapClone,
components::{
adapter::ChainId,
store::{DeploymentCursorTracker, DeploymentLocator, ReadStore},
store::{DeploymentCursorTracker, DeploymentLocator, SourceableStore},
},
data::subgraph::UnifiedMappingApiVersion,
env::EnvVars,
Expand Down Expand Up @@ -116,7 +116,7 @@ impl Blockchain for Chain {
deployment: DeploymentLocator,
store: impl DeploymentCursorTracker,
start_blocks: Vec<BlockNumber>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
filter: Arc<TriggerFilterWrapper<Self>>,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Self>>, Error> {
Expand Down Expand Up @@ -240,7 +240,7 @@ impl BlockStreamBuilder<Chain> for StarknetStreamBuilder {
_chain: &Chain,
_deployment: DeploymentLocator,
_start_blocks: Vec<BlockNumber>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
_subgraph_current_block: Option<BlockPtr>,
_filter: Arc<TriggerFilterWrapper<Chain>>,
_unified_api_version: UnifiedMappingApiVersion,
Expand Down
4 changes: 2 additions & 2 deletions chain/substreams/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use graph::{
substreams_block_stream::SubstreamsBlockStream,
Blockchain, TriggerFilterWrapper,
},
components::store::{DeploymentLocator, ReadStore},
components::store::{DeploymentLocator, SourceableStore},
data::subgraph::UnifiedMappingApiVersion,
prelude::{async_trait, BlockNumber, BlockPtr, DeploymentHash},
schema::InputSchema,
Expand Down Expand Up @@ -104,7 +104,7 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
_chain: &Chain,
_deployment: DeploymentLocator,
_start_blocks: Vec<BlockNumber>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
_subgraph_current_block: Option<BlockPtr>,
_filter: Arc<TriggerFilterWrapper<Chain>>,
_unified_api_version: UnifiedMappingApiVersion,
Expand Down
4 changes: 2 additions & 2 deletions chain/substreams/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use graph::blockchain::{
NoopRuntimeAdapter, TriggerFilterWrapper,
};
use graph::components::adapter::ChainId;
use graph::components::store::{DeploymentCursorTracker, ReadStore};
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
use graph::env::EnvVars;
use graph::prelude::{
BlockHash, CheapClone, DeploymentHash, Entity, LoggerFactory, MetricsRegistry,
Expand Down Expand Up @@ -142,7 +142,7 @@ impl Blockchain for Chain {
deployment: DeploymentLocator,
store: impl DeploymentCursorTracker,
_start_blocks: Vec<BlockNumber>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
filter: Arc<TriggerFilterWrapper<Self>>,
_unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Self>>, Error> {
Expand Down
4 changes: 2 additions & 2 deletions core/src/subgraph/inputs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use graph::{
blockchain::{block_stream::TriggersAdapterWrapper, Blockchain},
components::{
store::{DeploymentLocator, ReadStore, SubgraphFork, WritableStore},
store::{DeploymentLocator, SourceableStore, SubgraphFork, WritableStore},
subgraph::ProofOfIndexingVersion,
},
data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion},
Expand All @@ -16,7 +16,7 @@ pub struct IndexingInputs<C: Blockchain> {
pub features: BTreeSet<SubgraphFeature>,
pub start_blocks: Vec<BlockNumber>,
pub end_blocks: BTreeSet<BlockNumber>,
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
pub stop_block: Option<BlockNumber>,
pub store: Arc<dyn WritableStore>,
pub debug_fork: Option<Arc<dyn SubgraphFork>>,
Expand Down
6 changes: 3 additions & 3 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::subgraph::runner::SubgraphRunner;
use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper};
use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities};
use graph::components::metrics::gas::GasMetrics;
use graph::components::store::ReadStore;
use graph::components::store::SourceableStore;
use graph::components::subgraph::ProofOfIndexingVersion;
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
use graph::data::value::Word;
Expand Down Expand Up @@ -211,7 +211,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
hashes: Vec<DeploymentHash>,
max_spec_version: Version,
is_runner_test: bool,
) -> anyhow::Result<Vec<(DeploymentHash, Arc<dyn ReadStore>)>> {
) -> anyhow::Result<Vec<(DeploymentHash, Arc<dyn SourceableStore>)>> {
let mut writable_stores = Vec::new();
let subgraph_store = self.subgraph_store.clone();

Expand All @@ -237,7 +237,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {

let readable_store = subgraph_store
.clone()
.readable(
.sourceable(
logger.clone(),
loc.id.clone(),
Arc::new(manifest.template_idx_and_name().collect()),
Expand Down
10 changes: 5 additions & 5 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::{
Block, BlockPtr, BlockTime, Blockchain, SubgraphFilter, Trigger, TriggerFilterWrapper,
};
use crate::anyhow::Result;
use crate::components::store::{BlockNumber, DeploymentLocator, ReadStore};
use crate::components::store::{BlockNumber, DeploymentLocator, SourceableStore};
use crate::data::subgraph::UnifiedMappingApiVersion;
use crate::firehose::{self, FirehoseEndpoint};
use crate::futures03::stream::StreamExt as _;
Expand Down Expand Up @@ -149,7 +149,7 @@ pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
chain: &C,
deployment: DeploymentLocator,
start_blocks: Vec<BlockNumber>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
subgraph_current_block: Option<BlockPtr>,
filter: Arc<TriggerFilterWrapper<C>>,
unified_api_version: UnifiedMappingApiVersion,
Expand All @@ -160,7 +160,7 @@ pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
chain: &C,
deployment: DeploymentLocator,
start_blocks: Vec<BlockNumber>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
subgraph_current_block: Option<BlockPtr>,
filter: Arc<TriggerFilterWrapper<C>>,
unified_api_version: UnifiedMappingApiVersion,
Expand Down Expand Up @@ -320,13 +320,13 @@ impl<C: Blockchain> BlockWithTriggers<C> {
/// logic for each chain, increasing code repetition.
pub struct TriggersAdapterWrapper<C: Blockchain> {
pub adapter: Arc<dyn TriggersAdapter<C>>,
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
}

impl<C: Blockchain> TriggersAdapterWrapper<C> {
pub fn new(
adapter: Arc<dyn TriggersAdapter<C>>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
) -> Self {
Self {
adapter,
Expand Down
4 changes: 2 additions & 2 deletions graph/src/blockchain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
bail,
components::{
link_resolver::LinkResolver,
store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator, ReadStore},
store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator, SourceableStore},
subgraph::InstanceDSTemplateInfo,
},
data::subgraph::UnifiedMappingApiVersion,
Expand Down Expand Up @@ -386,7 +386,7 @@ impl Blockchain for MockBlockchain {
_deployment: DeploymentLocator,
_store: impl DeploymentCursorTracker,
_start_blocks: Vec<BlockNumber>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
_filter: Arc<TriggerFilterWrapper<Self>>,
_unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Self>>, Error> {
Expand Down
6 changes: 4 additions & 2 deletions graph/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use crate::{
components::{
adapter::ChainId,
metrics::subgraph::SubgraphInstanceMetrics,
store::{DeploymentCursorTracker, DeploymentLocator, ReadStore, StoredDynamicDataSource},
store::{
DeploymentCursorTracker, DeploymentLocator, SourceableStore, StoredDynamicDataSource,
},
subgraph::{HostMetrics, InstanceDSTemplateInfo, MappingError},
trigger_processor::RunnableTriggers,
},
Expand Down Expand Up @@ -189,7 +191,7 @@ pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static {
deployment: DeploymentLocator,
store: impl DeploymentCursorTracker,
start_blocks: Vec<BlockNumber>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
filter: Arc<TriggerFilterWrapper<Self>>,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Self>>, Error>;
Expand Down
7 changes: 1 addition & 6 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,11 @@ pub trait SubgraphStore: Send + Sync + 'static {
manifest_idx_and_name: Arc<Vec<(u32, String)>>,
) -> Result<Arc<dyn WritableStore>, StoreError>;

async fn readable(
async fn sourceable(
self: Arc<Self>,
logger: Logger,
deployment: DeploymentId,
manifest_idx_and_name: Arc<Vec<(u32, String)>>,
) -> Result<Arc<dyn ReadStore>, StoreError>;

async fn sourceable(
self: Arc<Self>,
deployment: DeploymentId,
) -> Result<Arc<dyn SourceableStore>, StoreError>;

/// Initiate a graceful shutdown of the writable that a previous call to
Expand Down
17 changes: 6 additions & 11 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1537,25 +1537,20 @@ impl SubgraphStoreTrait for SubgraphStore {
.map(|store| store as Arc<dyn store::WritableStore>)
}

async fn readable(
async fn sourceable(
self: Arc<Self>,
logger: Logger,
deployment: graph::components::store::DeploymentId,
manifest_idx_and_name: Arc<Vec<(u32, String)>>,
) -> Result<Arc<dyn store::ReadStore>, StoreError> {
self.get_or_create_writable_store(logger, deployment, manifest_idx_and_name)
.await
.map(|store| store as Arc<dyn store::ReadStore>)
}

async fn sourceable(
self: Arc<Self>,
deployment: graph::components::store::DeploymentId,
) -> Result<Arc<dyn store::SourceableStore>, StoreError> {
let writable = self
.clone()
.writable(logger, deployment, manifest_idx_and_name)
.await?;
let deployment = deployment.into();
let site = self.find_site(deployment)?;
let store = self.for_site(&site)?;
let s = Arc::new(SourceableStore::new(site, store.clone()));
let s = Arc::new(SourceableStore::new(site, store.clone(), writable));
Ok(s as Arc<dyn store::SourceableStore>)
}

Expand Down
13 changes: 11 additions & 2 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1572,11 +1572,20 @@ impl ReadStore for WritableStore {
pub struct SourceableStore {
site: Arc<Site>,
store: Arc<DeploymentStore>,
_writable: Arc<dyn store::WritableStore>,
}

impl SourceableStore {
pub fn new(site: Arc<Site>, store: Arc<DeploymentStore>) -> Self {
Self { site, store }
pub fn new(
site: Arc<Site>,
store: Arc<DeploymentStore>,
_writable: Arc<dyn store::WritableStore>,
) -> Self {
Self {
site,
store,
_writable,
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion store/test-store/tests/postgres/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ where
.expect("we can get a writable store");
let sourceable = store
.subgraph_store()
.sourceable(deployment.id)
.sourceable(LOGGER.clone(), deployment.id, Arc::new(Vec::new()))
.await
.expect("we can get a writable store");

Expand Down
6 changes: 3 additions & 3 deletions tests/src/fixture/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use graph::cheap_clone::CheapClone;
use graph::components::adapter::ChainId;
use graph::components::link_resolver::{ArweaveClient, ArweaveResolver, FileSizeLimit};
use graph::components::metrics::MetricsRegistry;
use graph::components::store::{BlockStore, DeploymentLocator, EthereumCallCache, ReadStore};
use graph::components::store::{BlockStore, DeploymentLocator, EthereumCallCache, SourceableStore};
use graph::components::subgraph::Settings;
use graph::data::graphql::load_manager::LoadManager;
use graph::data::query::{Query, QueryTarget};
Expand Down Expand Up @@ -723,7 +723,7 @@ impl<C: Blockchain> BlockStreamBuilder<C> for MutexBlockStreamBuilder<C> {
chain: &C,
deployment: DeploymentLocator,
start_blocks: Vec<BlockNumber>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
subgraph_current_block: Option<BlockPtr>,
filter: Arc<TriggerFilterWrapper<C>>,
unified_api_version: graph::data::subgraph::UnifiedMappingApiVersion,
Expand Down Expand Up @@ -798,7 +798,7 @@ where
_chain: &C,
_deployment: DeploymentLocator,
_start_blocks: Vec<graph::prelude::BlockNumber>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
subgraph_current_block: Option<graph::blockchain::BlockPtr>,
_filter: Arc<TriggerFilterWrapper<C>>,
_unified_api_version: graph::data::subgraph::UnifiedMappingApiVersion,
Expand Down

0 comments on commit 38c4286

Please sign in to comment.