Skip to content

Commit

Permalink
core,node,server: use common fn for graphman cli & server for unassig…
Browse files Browse the repository at this point in the history
…n and reassign
  • Loading branch information
shiyasmohd committed Dec 28, 2024
1 parent d0d7d5c commit 38ceded
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 29 deletions.
34 changes: 28 additions & 6 deletions core/graphman/src/commands/deployment/reassign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ use crate::deployment::DeploymentSelector;
use crate::deployment::DeploymentVersionSelector;
use crate::GraphmanError;

pub struct ActiveDeployment {
pub struct Deployment {
locator: DeploymentLocator,
site: Site,
}

impl Deployment {
pub fn locator(&self) -> &DeploymentLocator {
&self.locator
}
}

#[derive(Debug, Error)]
pub enum ReassignDeploymentError {
#[error("deployment '{0}' is already assigned to '{1}'")]
Expand All @@ -29,10 +35,16 @@ pub enum ReassignDeploymentError {
Common(#[from] GraphmanError),
}

#[derive(Clone, Debug)]
pub enum ReassignResult {
EmptyResponse,
CompletedWithWarnings(Vec<String>),
}

pub fn load_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<ActiveDeployment, ReassignDeploymentError> {
) -> Result<Deployment, ReassignDeploymentError> {
let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?;

let locator = crate::deployment::load_deployment_locator(
Expand All @@ -50,15 +62,15 @@ pub fn load_deployment(
GraphmanError::Store(anyhow!("deployment site not found for '{locator}'"))
})?;

Ok(ActiveDeployment { locator, site })
Ok(Deployment { locator, site })
}

pub fn reassign_deployment(
primary_pool: ConnectionPool,
notification_sender: Arc<NotificationSender>,
deployment: ActiveDeployment,
deployment: &Deployment,
node: &NodeId,
) -> Result<(), ReassignDeploymentError> {
) -> Result<ReassignResult, ReassignDeploymentError> {
let primary_conn = primary_pool.get().map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_conn);

Expand Down Expand Up @@ -91,5 +103,15 @@ pub fn reassign_deployment(
.send_store_event(&notification_sender, &StoreEvent::new(changes))
.map_err(GraphmanError::from)?;

Ok(())
let mirror = catalog::Mirror::primary_only(primary_pool);
let count = mirror
.assignments(&node)
.map_err(GraphmanError::from)?
.len();
if count == 1 {
let warning_msg = format!("This is the only deployment assigned to '{}'. Please make sure that the node ID is spelled correctly.",node.as_str());
Ok(ReassignResult::CompletedWithWarnings(vec![warning_msg]))
} else {
Ok(ReassignResult::EmptyResponse)
}
}
10 changes: 9 additions & 1 deletion core/graphman/src/commands/deployment/unassign.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use anyhow::anyhow;
use graph::components::store::DeploymentLocator;
use graph::components::store::StoreEvent;
use graph_store_postgres::command_support::catalog;
use graph_store_postgres::command_support::catalog::Site;
Expand All @@ -13,9 +14,16 @@ use crate::deployment::DeploymentVersionSelector;
use crate::GraphmanError;

pub struct AssignedDeployment {
locator: DeploymentLocator,
site: Site,
}

impl AssignedDeployment {
pub fn locator(&self) -> &DeploymentLocator {
&self.locator
}
}

#[derive(Debug, Error)]
pub enum UnassignDeploymentError {
#[error("deployment '{0}' is already unassigned")]
Expand Down Expand Up @@ -50,7 +58,7 @@ pub fn load_assigned_deployment(
.assigned_node(&site)
.map_err(GraphmanError::from)?
{
Some(_) => Ok(AssignedDeployment { site }),
Some(_) => Ok(AssignedDeployment { locator, site }),
None => Err(UnassignDeploymentError::AlreadyUnassigned(
locator.to_string(),
)),
Expand Down
20 changes: 15 additions & 5 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use graph::prelude::{MetricsRegistry, BLOCK_NUMBER_MAX};
use graph::{data::graphql::load_manager::LoadManager, prelude::chrono, prometheus::Registry};
use graph::{
prelude::{
anyhow::{self, Context as AnyhowContextTrait},
anyhow::{self, anyhow, Context as AnyhowContextTrait},
info, tokio, Logger, NodeId,
},
url::Url,
Expand Down Expand Up @@ -1198,12 +1198,22 @@ async fn main() -> anyhow::Result<()> {
Remove { name } => commands::remove::run(ctx.subgraph_store(), &name),
Create { name } => commands::create::run(ctx.subgraph_store(), name),
Unassign { deployment } => {
let sender = ctx.notification_sender();
commands::assign::unassign(ctx.primary_pool(), &sender, &deployment).await
let notifications_sender = ctx.notification_sender();
let primary_pool = ctx.primary_pool();
let deployment = make_deployment_selector(deployment);
commands::deployment::unassign::run(primary_pool, notifications_sender, deployment)
}
Reassign { deployment, node } => {
let sender = ctx.notification_sender();
commands::assign::reassign(ctx.primary_pool(), &sender, &deployment, node)
let notifications_sender = ctx.notification_sender();
let primary_pool = ctx.primary_pool();
let deployment = make_deployment_selector(deployment);
let node = NodeId::new(node).map_err(|node| anyhow!("invalid node id {:?}", node))?;
commands::deployment::reassign::run(
primary_pool,
notifications_sender,
deployment,
&node,
)
}
Pause { deployment } => {
let notifications_sender = ctx.notification_sender();
Expand Down
2 changes: 2 additions & 0 deletions node/src/manager/commands/deployment/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub mod info;
pub mod pause;
pub mod reassign;
pub mod restart;
pub mod resume;
pub mod unassign;
41 changes: 41 additions & 0 deletions node/src/manager/commands/deployment/reassign.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::sync::Arc;

use anyhow::Result;
use graph::prelude::NodeId;
use graph_store_postgres::connection_pool::ConnectionPool;
use graph_store_postgres::NotificationSender;
use graphman::commands::deployment::reassign::{
load_deployment, reassign_deployment, ReassignResult,
};
use graphman::deployment::DeploymentSelector;

pub fn run(
primary_pool: ConnectionPool,
notification_sender: Arc<NotificationSender>,
deployment: DeploymentSelector,
node: &NodeId,
) -> Result<()> {
let deployment = load_deployment(primary_pool.clone(), &deployment)?;

println!("Reassigning deployment {}", deployment.locator());

let reassign_result =
reassign_deployment(primary_pool, notification_sender, &deployment, node)?;

match reassign_result {
ReassignResult::EmptyResponse => {
println!(
"Deployment {} assigned to node {}",
deployment.locator(),
node
);
}
ReassignResult::CompletedWithWarnings(warnings) => {
for msg in warnings {
println!("{}", msg);
}
}
}

Ok(())
}
22 changes: 22 additions & 0 deletions node/src/manager/commands/deployment/unassign.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::sync::Arc;

use anyhow::Result;
use graph_store_postgres::connection_pool::ConnectionPool;
use graph_store_postgres::NotificationSender;
use graphman::commands::deployment::unassign::load_assigned_deployment;
use graphman::commands::deployment::unassign::unassign_deployment;
use graphman::deployment::DeploymentSelector;

pub fn run(
primary_pool: ConnectionPool,
notification_sender: Arc<NotificationSender>,
deployment: DeploymentSelector,
) -> Result<()> {
let assigned_deployment = load_assigned_deployment(primary_pool.clone(), &deployment)?;

println!("Unassigning deployment {}", assigned_deployment.locator());

unassign_deployment(primary_pool, notification_sender, assigned_deployment)?;

Ok(())
}
21 changes: 9 additions & 12 deletions server/graphman/src/resolvers/deployment_mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use async_graphql::Object;
use async_graphql::Result;
use async_graphql::Union;
use graph::prelude::NodeId;
use graph_store_postgres::command_support::catalog;
use graph_store_postgres::graphman::GraphmanStore;
use graphman::commands::deployment::reassign::ReassignResult;

use crate::entities::CompletedWithWarnings;
use crate::entities::DeploymentSelector;
Expand Down Expand Up @@ -119,17 +119,14 @@ impl DeploymentMutation {
let ctx = GraphmanContext::new(ctx)?;
let deployment = deployment.try_into()?;
let node = NodeId::new(node.clone()).map_err(|()| anyhow!("illegal node id `{}`", node))?;
reassign::run(&ctx, &deployment, &node)?;

let mirror = catalog::Mirror::primary_only(ctx.primary_pool);
let count = mirror.assignments(&node)?.len();
if count == 1 {
let warning_msg = format!("This is the only deployment assigned to '{}'. Please make sure that the node ID is spelled correctly.",node.as_str());
Ok(ReassignResponse::CompletedWithWarnings(
CompletedWithWarnings::new(vec![warning_msg]),
))
} else {
Ok(ReassignResponse::EmptyResponse(EmptyResponse::new()))
let reassign_result = reassign::run(&ctx, &deployment, &node)?;
match reassign_result {
ReassignResult::CompletedWithWarnings(warnings) => Ok(
ReassignResponse::CompletedWithWarnings(CompletedWithWarnings::new(warnings)),
),
ReassignResult::EmptyResponse => {
Ok(ReassignResponse::EmptyResponse(EmptyResponse::new()))
}
}
}
}
15 changes: 10 additions & 5 deletions server/graphman/src/resolvers/deployment_mutation/reassign.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
use anyhow::Ok;
use async_graphql::Result;
use graph::prelude::NodeId;
use graphman::commands::deployment::reassign::load_deployment;
use graphman::commands::deployment::reassign::reassign_deployment;
use graphman::commands::deployment::reassign::ReassignResult;
use graphman::deployment::DeploymentSelector;

use crate::resolvers::context::GraphmanContext;

pub fn run(ctx: &GraphmanContext, deployment: &DeploymentSelector, node: &NodeId) -> Result<()> {
pub fn run(
ctx: &GraphmanContext,
deployment: &DeploymentSelector,
node: &NodeId,
) -> Result<ReassignResult, anyhow::Error> {
let deployment = load_deployment(ctx.primary_pool.clone(), deployment)?;
reassign_deployment(
let reassign_result = reassign_deployment(
ctx.primary_pool.clone(),
ctx.notification_sender.clone(),
deployment,
&deployment,
&node,
)?;

Ok(())
Ok(reassign_result)
}

0 comments on commit 38ceded

Please sign in to comment.