Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support for Firebolt rpc 2.0 #677

Merged
merged 8 commits into from
Nov 21, 2024
Merged
93 changes: 91 additions & 2 deletions core/main/src/broker/endpoint_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

use ripple_sdk::{
api::{
firebolt::fb_capabilities::JSON_RPC_STANDARD_ERROR_INVALID_PARAMS,
firebolt::fb_capabilities::{
FireboltPermission, CAPABILITY_NOT_AVAILABLE, JSON_RPC_STANDARD_ERROR_INVALID_PARAMS,
},
gateway::rpc_gateway_api::{
ApiMessage, ApiProtocol, ApiStats, CallContext, JsonRpcApiRequest, JsonRpcApiResponse,
RpcRequest,
Expand Down Expand Up @@ -47,7 +49,7 @@ use crate::{
broker::broker_utils::BrokerUtils,
firebolt::firebolt_gateway::{FireboltGatewayCommand, JsonRpcError},
service::extn::ripple_client::RippleClient,
state::platform_state::PlatformState,
state::{platform_state::PlatformState, session_state::Session},
utils::router_utils::{
add_telemetry_status_code, get_rpc_header, return_api_message_for_transport,
return_extn_response,
Expand All @@ -57,6 +59,7 @@ use crate::{
use super::{
event_management_utility::EventManagementUtility,
http_broker::HttpBroker,
provider_broker_state::{ProvideBrokerState, ProviderResult},
rules_engine::{jq_compile, Rule, RuleEndpoint, RuleEndpointProtocol, RuleEngine},
thunder_broker::ThunderBroker,
websocket_broker::WebsocketBroker,
Expand Down Expand Up @@ -308,6 +311,7 @@ pub struct EndpointBrokerState {
rule_engine: RuleEngine,
cleaner_list: Arc<RwLock<Vec<BrokerCleaner>>>,
reconnect_tx: Sender<BrokerConnectRequest>,
provider_broker_state: ProvideBrokerState,
}
impl Default for EndpointBrokerState {
fn default() -> Self {
Expand All @@ -319,6 +323,7 @@ impl Default for EndpointBrokerState {
rule_engine: RuleEngine::default(),
cleaner_list: Arc::new(RwLock::new(Vec::new())),
reconnect_tx: mpsc::channel(2).0,
provider_broker_state: ProvideBrokerState::default(),
}
}
}
Expand All @@ -338,6 +343,7 @@ impl EndpointBrokerState {
rule_engine,
cleaner_list: Arc::new(RwLock::new(Vec::new())),
reconnect_tx,
provider_broker_state: ProvideBrokerState::default(),
};
state.reconnect_thread(rec_tr, ripple_client);
state
Expand Down Expand Up @@ -535,6 +541,74 @@ impl EndpointBrokerState {
tokio::spawn(async move { callback.sender.send(output).await });
}

fn handle_provided_request(
&self,
rpc_request: &RpcRequest,
rule: Rule,
callback: BrokerCallback,
permission: Vec<FireboltPermission>,
session: Option<Session>,
) {
let (id, request) = self.update_request(rpc_request, rule, None, None);
match self.provider_broker_state.check_provider_request(
rpc_request,
&permission,
session.clone(),
) {
Some(ProviderResult::Registered) => {
// return empty result and handle the rest with jq rule
let data = JsonRpcApiResponse {
id: Some(id),
jsonrpc: "2.0".to_string(),
result: Some(Value::Null),
error: None,
method: None,
params: None,
};

let output = BrokerOutput { data };
tokio::spawn(async move { callback.sender.send(output).await });
}
Some(ProviderResult::Session(s)) => {
ProvideBrokerState::send_to_provider(request, id, s);
}
Some(ProviderResult::NotAvailable(p)) => {
// Not Available
let data = JsonRpcApiResponse {
jsonrpc: "2.0".to_string(),
id: Some(id),
result: None,
error: Some(json!({
"error": CAPABILITY_NOT_AVAILABLE,
"messsage": format!("{} not available", p)
})),
method: None,
params: None,
};

let output = BrokerOutput { data };
tokio::spawn(async move { callback.sender.send(output).await });
}
None => {
// Not Available
let data = JsonRpcApiResponse {
jsonrpc: "2.0".to_string(),
id: Some(id),
result: None,
error: Some(json!({
"error": CAPABILITY_NOT_AVAILABLE,
"messsage": "capability not available".to_string()
})),
method: None,
params: None,
};

let output = BrokerOutput { data };
tokio::spawn(async move { callback.sender.send(output).await });
}
}
}

fn get_sender(&self, hash: &str) -> Option<BrokerSender> {
self.endpoint_map.read().unwrap().get(hash).cloned()
}
Expand All @@ -546,6 +620,8 @@ impl EndpointBrokerState {
rpc_request: RpcRequest,
extn_message: Option<ExtnMessage>,
requestor_callback: Option<BrokerCallback>,
permissions: Vec<FireboltPermission>,
session: Option<Session>,
) -> bool {
let mut handled: bool = true;
let callback = self.callback.clone();
Expand Down Expand Up @@ -576,6 +652,8 @@ impl EndpointBrokerState {
callback,
requestor_callback,
);
} else if rule.alias.eq_ignore_ascii_case("provided") {
self.handle_provided_request(&rpc_request, rule, callback, permissions, session);
} else if broker_sender.is_some() {
trace!("handling not static request for {:?}", rpc_request);
let broker = broker_sender.unwrap();
Expand All @@ -596,6 +674,12 @@ impl EndpointBrokerState {
handled
}

pub fn handle_broker_response(&self, data: JsonRpcApiResponse) {
if let Err(e) = self.callback.sender.try_send(BrokerOutput { data }) {
error!("Cannot forward broker response {:?}", e)
}
}

pub fn get_rule(&self, rpc_request: &RpcRequest) -> Option<Rule> {
self.rule_engine.get_rule(rpc_request)
}
Expand Down Expand Up @@ -871,6 +955,11 @@ impl BrokerOutputForwarder {
.await;
} else {
let tm_str = get_rpc_header(&rpc_request);

if is_event {
response.update_event_message(&rpc_request);
}

// Step 2: Create the message
let mut message = ApiMessage::new(
rpc_request.ctx.protocol.clone(),
Expand Down
1 change: 1 addition & 0 deletions core/main/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod broker_utils;
pub mod endpoint_broker;
pub mod event_management_utility;
pub mod http_broker;
pub mod provider_broker_state;
pub mod rules_engine;
pub mod thunder;
pub mod thunder_broker;
Expand Down
121 changes: 121 additions & 0 deletions core/main/src/broker/provider_broker_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2023 Comcast Cable Communications Management, LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
//

use std::{
collections::HashMap,
sync::{Arc, RwLock},
};

use ripple_sdk::{
api::{
firebolt::fb_capabilities::FireboltPermission,
gateway::rpc_gateway_api::{ApiMessage, ApiProtocol, RpcRequest},
},
log::{debug, error},
tokio,
};

use serde_json::json;

use crate::state::session_state::Session;

use super::endpoint_broker::BrokerRequest;

#[derive(Debug, Clone, Default)]
pub struct ProvideBrokerState {
capability_map: Arc<RwLock<HashMap<String, Session>>>,
}

pub enum ProviderResult {
Session(Session),
Registered,
NotAvailable(String),
}

impl ProvideBrokerState {
pub fn check_provider_request(
&self,
request: &RpcRequest,
permission: &Vec<FireboltPermission>,
session: Option<Session>,
) -> Option<ProviderResult> {
debug!("Method {}", request.method);
if request.method.contains(".provide") {
pahearn73 marked this conversation as resolved.
Show resolved Hide resolved
debug!("inside method before session");
if let Some(s) = session {
debug!("inside session before permission {:?}", permission);
if let Some(p) = Self::get_permission(permission) {
{
debug!("adding permission {}", p);
let mut cap_map = self.capability_map.write().unwrap();
let _ = cap_map.insert(p.clone(), s.clone());
let _ = cap_map
.insert(format!("{}.{}", p, request.ctx.app_id.clone()), s.clone());
}
debug!("return registered");
return Some(ProviderResult::Registered);
}
}
} else if let Some(p) = Self::get_permission(permission) {
debug!("Checking session {}", p);
if let Some(session) = { self.capability_map.read().unwrap().get(&p).cloned() } {
debug!("Returning session");
return Some(ProviderResult::Session(session));
}
return Some(ProviderResult::NotAvailable(p));
}

None
}

fn get_permission(permission: &[FireboltPermission]) -> Option<String> {
if !permission.is_empty() {
if let Some(p) = permission.first() {
return Some(p.cap.as_str());
}
}
None
}

pub fn send_to_provider(request: BrokerRequest, id: u64, session: Session) {
let method = request.clone().rpc.ctx.method;
let r = if let Some(p) = request.rpc.get_params() {
json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": p
})
} else {
json!({
"jsonrpc": "2.0",
"id": id,
"method": request.rpc.ctx.method
})
};
let message = ApiMessage::new(
ApiProtocol::JsonRpc,
serde_json::to_string(&r).unwrap(),
"".into(),
);
tokio::spawn(async move {
if let Err(e) = session.send_json_rpc(message).await {
error!("Couldnt send Provider request {:?}", e)
}
});
}
}
2 changes: 2 additions & 0 deletions core/main/src/broker/workflow_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ async fn subbroker_call(
Some(BrokerCallback {
sender: brokered_tx,
}),
Vec::new(),
None,
);

match brokered_rx.recv().await {
Expand Down
9 changes: 6 additions & 3 deletions core/main/src/firebolt/firebolt_gatekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ impl FireboltGatekeeper {
))
}
// TODO return Deny Reason into ripple error
pub async fn gate(state: PlatformState, request: RpcRequest) -> Result<(), DenyReasonWithCap> {
pub async fn gate(
state: PlatformState,
request: RpcRequest,
) -> Result<Vec<FireboltPermission>, DenyReasonWithCap> {
let caps =
Self::get_resolved_caps_for_method(&state, &request.method, request.ctx.gateway_secure)
.ok_or(DenyReasonWithCap {
Expand All @@ -102,7 +105,7 @@ impl FireboltGatekeeper {
.clear_non_negotiable_permission(&state, &caps);
if filtered_perm_list.is_empty() {
trace!("Role/Capability is cleared based on non-negotiable policy");
return Ok(());
return Ok(caps);
}
// Supported and Availability checks
trace!(
Expand All @@ -121,7 +124,7 @@ impl FireboltGatekeeper {
}
// permission checks
Self::permissions_check(state, request, filtered_perm_list).await?;
Ok(())
Ok(caps)
}

async fn permissions_check(
Expand Down
Loading
Loading