From 676ba801fa817adc103cea3cfa20374320b5e73c Mon Sep 17 00:00:00 2001 From: pahearn73 Date: Wed, 18 Sep 2024 10:26:52 -0400 Subject: [PATCH 1/8] Analytics plug-in support: WIP --- core/main/src/broker/endpoint_broker.rs | 3 +++ core/main/src/broker/thunder_broker.rs | 1 + core/main/src/processor/metrics_processor.rs | 22 ++++++++++++++++++- core/sdk/src/api/mod.rs | 6 +++++ core/sdk/src/extn/extn_client_message.rs | 4 ++++ core/sdk/src/framework/ripple_contract.rs | 3 +++ .../src/bootstrap/boot_thunder_channel.rs | 1 + .../src/bootstrap/boot_thunder.rs | 6 +++++ .../src/bootstrap/setup_thunder_processors.rs | 7 ++++++ .../src/client/thunder_client.rs | 4 ++++ device/thunder_ripple_sdk/src/lib.rs | 3 +++ 11 files changed, 59 insertions(+), 1 deletion(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 62c15f37c..9add4e1eb 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -432,6 +432,7 @@ impl EndpointBrokerState { rpc_request: RpcRequest, extn_message: Option, ) -> bool { + println!("*** _DEBUG: handle_brokerage: entry"); let mut handled: bool = true; let callback = self.callback.clone(); let mut broker_sender = None; @@ -458,7 +459,9 @@ impl EndpointBrokerState { let broker = broker_sender.unwrap(); let (_, updated_request) = self.update_request(&rpc_request, rule, extn_message); tokio::spawn(async move { + println!("*** _DEBUG: handle_brokerage: Mark 1"); if let Err(e) = broker.send(updated_request.clone()).await { + println!("*** _DEBUG: handle_brokerage: Mark 2"); callback.send_error(updated_request, e).await } }); diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index 4872ad063..4548761c7 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -110,6 +110,7 @@ impl ThunderBroker { match broker_c.prepare_request(&request) { Ok(updated_request) => { debug!("Sending request to broker {:?}", updated_request); + debug!("*** _DEBUG: Sending request to broker {:?}", updated_request); for r in updated_request { let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(r)).await; let _flush = ws_tx.flush().await; diff --git a/core/main/src/processor/metrics_processor.rs b/core/main/src/processor/metrics_processor.rs index 60d15d9cc..d849341cd 100644 --- a/core/main/src/processor/metrics_processor.rs +++ b/core/main/src/processor/metrics_processor.rs @@ -25,7 +25,7 @@ use ripple_sdk::{ }, fb_telemetry::OperationalMetricRequest, }, - gateway::rpc_gateway_api::CallContext, + gateway::rpc_gateway_api::{CallContext, RpcRequest}, }, async_trait::async_trait, extn::{ @@ -59,6 +59,25 @@ pub async fn send_metric( debug!("drop data is true, not sending BI metrics"); return Ok(()); } + // Added + // debug!("*** _DEBUG: send_metric: Mark 1"); + // if let Ok(payload_value) = serde_json::to_value(&payload) { + // let rpc_request = RpcRequest::get_new_internal( + // InternalRule::AnalyticsProcessor.to_string(), + // Some(payload_value), + // ); + // debug!("*** _DEBUG: send_metric: rpc_request={:?}", rpc_request); + + // if platform_state + // .endpoint_state + // .handle_brokerage(rpc_request, None) + // { + // debug!("*** _DEBUG: send_metric: Brokerage handled"); + // return Ok(()); + // } + // } + // + if let Some(session) = platform_state.session_state.get_account_session() { let request = BehavioralMetricRequest { context: Some(platform_state.metrics.get_context()), @@ -69,6 +88,7 @@ pub async fn send_metric( .get_client() .send_extn_request_transient(request); } + // Err(ripple_sdk::utils::error::RippleError::ProcessorError) } diff --git a/core/sdk/src/api/mod.rs b/core/sdk/src/api/mod.rs index d9a00b5ee..051731623 100644 --- a/core/sdk/src/api/mod.rs +++ b/core/sdk/src/api/mod.rs @@ -32,6 +32,9 @@ pub mod status_update; pub mod storage_property; pub mod usergrant_entry; pub mod wifi; +// Added +//pub mod rules; +// pub mod gateway { pub mod rpc_error; @@ -74,4 +77,7 @@ pub mod firebolt { pub mod observability { pub mod metrics_util; pub mod operational_metrics; + // 2 + pub mod analytics; + // } diff --git a/core/sdk/src/extn/extn_client_message.rs b/core/sdk/src/extn/extn_client_message.rs index d66926eb6..c9a358358 100644 --- a/core/sdk/src/extn/extn_client_message.rs +++ b/core/sdk/src/extn/extn_client_message.rs @@ -62,6 +62,7 @@ use crate::{ }, gateway::rpc_gateway_api::RpcRequest, manifest::device_manifest::AppLibraryEntry, + observability::analytics::AnalyticsEvent, protocol::BridgeProtocolRequest, pubsub::{PubSubEvents, PubSubRequest, PubSubResponse}, session::{AccountSessionRequest, AccountSessionResponse, SessionTokenRequest}, @@ -396,6 +397,9 @@ pub enum ExtnEvent { PubSubEvent(PubSubEvents), TimeZone(TimeZone), AppsUpdate(AppsUpdate), + // 2 + Analytics(AnalyticsEvent), + // } impl ExtnPayloadProvider for ExtnEvent { diff --git a/core/sdk/src/framework/ripple_contract.rs b/core/sdk/src/framework/ripple_contract.rs index 0d24da09b..331f933d1 100644 --- a/core/sdk/src/framework/ripple_contract.rs +++ b/core/sdk/src/framework/ripple_contract.rs @@ -129,6 +129,9 @@ pub enum RippleContract { Apps, // Runtime ability for a given distributor to turn off a certian feature RemoteFeatureControl, + // 2 + Analytics, + // } pub trait ContractAdjective: serde::ser::Serialize + DeserializeOwned { diff --git a/device/thunder/src/bootstrap/boot_thunder_channel.rs b/device/thunder/src/bootstrap/boot_thunder_channel.rs index fb753c9a7..d3c64cffe 100644 --- a/device/thunder/src/bootstrap/boot_thunder_channel.rs +++ b/device/thunder/src/bootstrap/boot_thunder_channel.rs @@ -23,6 +23,7 @@ use thunder_ripple_sdk::{ pub async fn boot_thunder_channel(state: ExtnClient) { info!("Booting thunder"); + println!("*** _DEBUG: Booting thunder 2"); let _ = boot_thunder( state, ThunderPluginBootParam { diff --git a/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs b/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs index 97e443596..fd6924f5b 100644 --- a/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs +++ b/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs @@ -31,14 +31,20 @@ pub async fn boot_thunder( plugin_param: ThunderPluginBootParam, ) -> Option { info!("Booting thunder"); + println!("*** _DEBUG: boot_thunder: Mark 0"); if let Ok(state) = ThunderGetConfigStep::setup(state, plugin_param).await { + info!("*** _DEBUG: boot_thunder: Mark 1"); if let Ok(state) = ThunderPoolStep::setup(state).await { + info!("*** _DEBUG: boot_thunder: Mark 2"); + // YAH: See if we get here now SetupThunderProcessor::setup(state.clone()).await; return Some(state); } else { + info!("*** _DEBUG: boot_thunder: Mark 3"); error!("Unable to connect to Thuner, error in ThunderPoolStep"); } } else { + info!("*** _DEBUG: boot_thunder: Mark 4"); error!("Unable to connect to Thuner, error in ThunderGetConfigStep"); } None diff --git a/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs b/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs index 80e34c7d8..e22bea8e9 100644 --- a/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs +++ b/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs @@ -19,6 +19,7 @@ use ripple_sdk::api::firebolt::fb_telemetry::OperationalMetricRequest; use ripple_sdk::api::status_update::ExtnStatus; use ripple_sdk::log::error; +use crate::processors::thunder_analytics::ThunderAnalyticsProcessor; use crate::processors::thunder_package_manager::ThunderPackageManagerRequestProcessor; use crate::processors::thunder_rfc::ThunderRFCProcessor; use crate::processors::thunder_telemetry::ThunderTelemetryProcessor; @@ -70,6 +71,12 @@ impl SetupThunderProcessor { Err(_) => error!("Telemetry not setup"), } } + + // 2 + // TODO: Config check here + println!("*** _DEBUG: setup"); + extn_client.add_event_processor(ThunderAnalyticsProcessor::new(state.clone().state)); + // extn_client.add_request_processor(ThunderRFCProcessor::new(state.clone().state)); let _ = extn_client.event(ExtnStatus::Ready); } diff --git a/device/thunder_ripple_sdk/src/client/thunder_client.rs b/device/thunder_ripple_sdk/src/client/thunder_client.rs index 99b1036d4..cdd78e337 100644 --- a/device/thunder_ripple_sdk/src/client/thunder_client.rs +++ b/device/thunder_ripple_sdk/src/client/thunder_client.rs @@ -466,6 +466,10 @@ impl ThunderClient { Ok(()) } fn extract_callsign_from_register_method(method: &str) -> Option { + println!( + "*** _DBEUG: extract_callsign_from_register_method: method={}", + method + ); // capture the initial string before an optional version number, followed by ".register" let re = Regex::new(r"^(.*?)(?:\.\d+)?\.register$").unwrap(); diff --git a/device/thunder_ripple_sdk/src/lib.rs b/device/thunder_ripple_sdk/src/lib.rs index 673dd8bcc..583bbcd52 100644 --- a/device/thunder_ripple_sdk/src/lib.rs +++ b/device/thunder_ripple_sdk/src/lib.rs @@ -48,6 +48,9 @@ pub mod processors { pub mod thunder_telemetry; pub mod thunder_wifi; pub mod thunder_window_manager; + // 2 + pub mod thunder_analytics; + // } pub mod utils; From ffcedb47b2c80d2d5835d6f0b422794ec22d334a Mon Sep 17 00:00:00 2001 From: pahearn73 Date: Wed, 18 Sep 2024 13:21:19 -0400 Subject: [PATCH 2/8] Analytics plug-in support: WIP --- core/sdk/src/api/observability/analytics.rs | 29 ++++++ core/sdk/src/api/rules.rs | 17 ++++ .../src/processors/thunder_analytics.rs | 89 +++++++++++++++++++ 3 files changed, 135 insertions(+) create mode 100644 core/sdk/src/api/observability/analytics.rs create mode 100644 core/sdk/src/api/rules.rs create mode 100644 device/thunder_ripple_sdk/src/processors/thunder_analytics.rs diff --git a/core/sdk/src/api/observability/analytics.rs b/core/sdk/src/api/observability/analytics.rs new file mode 100644 index 000000000..63a193e7d --- /dev/null +++ b/core/sdk/src/api/observability/analytics.rs @@ -0,0 +1,29 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::{ + extn::extn_client_message::{ExtnEvent, ExtnPayload, ExtnPayloadProvider}, + framework::ripple_contract::RippleContract, +}; + +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +pub enum AnalyticsEvent { + SendMetrics(Value), +} + +impl ExtnPayloadProvider for AnalyticsEvent { + fn get_extn_payload(&self) -> ExtnPayload { + ExtnPayload::Event(ExtnEvent::Analytics(self.clone())) + } + + fn get_from_payload(payload: ExtnPayload) -> Option { + if let ExtnPayload::Event(ExtnEvent::Analytics(analytics_event)) = payload { + return Some(analytics_event); + } + None + } + + fn contract() -> RippleContract { + RippleContract::Analytics + } +} diff --git a/core/sdk/src/api/rules.rs b/core/sdk/src/api/rules.rs new file mode 100644 index 000000000..1e1dd7409 --- /dev/null +++ b/core/sdk/src/api/rules.rs @@ -0,0 +1,17 @@ +pub enum InternalRule { + NotificationServiceOnConnect, + AnalyticsProcessor, +} + +impl ToString for InternalRule { + fn to_string(&self) -> String { + let suffix = match self { + InternalRule::NotificationServiceOnConnect => { + String::from("notification_service_on_connect") + } + InternalRule::AnalyticsProcessor => String::from("analytics_processor"), + }; + + format!("internal.{}", suffix) + } +} diff --git a/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs b/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs new file mode 100644 index 000000000..c82f72fb5 --- /dev/null +++ b/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs @@ -0,0 +1,89 @@ +// Copyright 2023 Comcast Cable Communications Management, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 +// + +use ripple_sdk::{ + api::observability::analytics::AnalyticsEvent, + async_trait::async_trait, + extn::{ + client::extn_processor::{ + DefaultExtnStreamer, ExtnEventProcessor, ExtnStreamProcessor, ExtnStreamer, + }, + extn_client_message::ExtnMessage, + }, + tokio::sync::mpsc::{Receiver as MReceiver, Sender as MSender}, + utils::error::RippleError, +}; +use serde::{Deserialize, Serialize}; + +use crate::{client::thunder_plugin::ThunderPlugin, thunder_state::ThunderState}; + +#[derive(Debug)] +pub struct ThunderAnalyticsProcessor { + state: ThunderState, + streamer: DefaultExtnStreamer, +} + +impl ThunderAnalyticsProcessor { + pub fn new(state: ThunderState) -> ThunderAnalyticsProcessor { + println!("*** _DEBUG: ThunderAnalyticsProcessor::new: entry"); + ThunderAnalyticsProcessor { + state, + streamer: DefaultExtnStreamer::new(), + } + } +} + +impl ExtnStreamProcessor for ThunderAnalyticsProcessor { + type VALUE = AnalyticsEvent; + type STATE = ThunderState; + + fn get_state(&self) -> Self::STATE { + self.state.clone() + } + + fn sender(&self) -> MSender { + self.streamer.sender() + } + + fn receiver(&mut self) -> MReceiver { + self.streamer.receiver() + } +} + +#[async_trait] +impl ExtnEventProcessor for ThunderAnalyticsProcessor { + async fn process_event( + state: Self::STATE, + _msg: ExtnMessage, + extracted_message: Self::VALUE, + ) -> Option { + println!( + "*** _DEBUG: ThunderAnalyticsProcessor::process_event: extracted_message={:?}", + extracted_message + ); + match extracted_message { + AnalyticsEvent::SendMetrics(data) => { + println!( + "*** _DEBUG: ThunderAnalyticsProcessor::process_event: data={:?}", + data + ); + } + } + + None + } +} From 11c5e2f72c9b79ef1ff5045786731d73de7a669d Mon Sep 17 00:00:00 2001 From: pahearn73 Date: Mon, 23 Sep 2024 21:40:23 -0400 Subject: [PATCH 3/8] Analytics plug-in support: WIP --- core/main/src/broker/endpoint_broker.rs | 3 - core/main/src/broker/thunder_broker.rs | 1 - core/main/src/firebolt/firebolt_gateway.rs | 2 + core/sdk/src/api/firebolt/fb_metrics.rs | 15 +++ core/sdk/src/api/observability/analytics.rs | 16 +-- core/sdk/src/extn/client/extn_client.rs | 1 + core/sdk/src/extn/extn_client_message.rs | 8 +- .../src/bootstrap/boot_thunder.rs | 1 - .../src/bootstrap/setup_thunder_processors.rs | 5 +- .../src/client/thunder_client.rs | 4 - .../src/client/thunder_plugin.rs | 9 ++ .../src/processors/thunder_analytics.rs | 99 ++++++++++++++++--- 12 files changed, 125 insertions(+), 39 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index a8f6d8b17..9431d6ef6 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -432,7 +432,6 @@ impl EndpointBrokerState { rpc_request: RpcRequest, extn_message: Option, ) -> bool { - println!("*** _DEBUG: handle_brokerage: entry"); let mut handled: bool = true; let callback = self.callback.clone(); let mut broker_sender = None; @@ -459,9 +458,7 @@ impl EndpointBrokerState { let broker = broker_sender.unwrap(); let (_, updated_request) = self.update_request(&rpc_request, rule, extn_message); tokio::spawn(async move { - println!("*** _DEBUG: handle_brokerage: Mark 1"); if let Err(e) = broker.send(updated_request.clone()).await { - println!("*** _DEBUG: handle_brokerage: Mark 2"); callback.send_error(updated_request, e).await } }); diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index 4548761c7..4872ad063 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -110,7 +110,6 @@ impl ThunderBroker { match broker_c.prepare_request(&request) { Ok(updated_request) => { debug!("Sending request to broker {:?}", updated_request); - debug!("*** _DEBUG: Sending request to broker {:?}", updated_request); for r in updated_request { let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(r)).await; let _flush = ws_tx.flush().await; diff --git a/core/main/src/firebolt/firebolt_gateway.rs b/core/main/src/firebolt/firebolt_gateway.rs index 34e93f4db..a736ad692 100644 --- a/core/main/src/firebolt/firebolt_gateway.rs +++ b/core/main/src/firebolt/firebolt_gateway.rs @@ -27,6 +27,7 @@ use ripple_sdk::{ rpc_error::RpcError, rpc_gateway_api::{ApiMessage, ApiProtocol, RpcRequest}, }, + observability::analytics::AnalyticsRequest, }, chrono::Utc, extn::extn_client_message::ExtnMessage, @@ -177,6 +178,7 @@ impl FireboltGateway { } } let platform_state = self.state.platform_state.clone(); + /* * The reason for spawning a new thread is that when request-1 comes, and it waits for * user grant. The response from user grant, (eg ChallengeResponse) comes as rpc which diff --git a/core/sdk/src/api/firebolt/fb_metrics.rs b/core/sdk/src/api/firebolt/fb_metrics.rs index e43268747..38f777bb4 100644 --- a/core/sdk/src/api/firebolt/fb_metrics.rs +++ b/core/sdk/src/api/firebolt/fb_metrics.rs @@ -1023,6 +1023,21 @@ pub fn get_metrics_tags( Some(tags) } +// +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +pub struct BehavioralMetricsEvent { + pub event_name: String, + pub event_version: Option, + pub source_name: String, + pub source_version: String, + pub cet_list: Vec, + pub epoch_timestamp: Option, + pub uptime_timestamp: Option, + //pub payload: BehavioralMetricPayload, + pub payload: String, +} +// + #[cfg(test)] mod tests { use super::*; diff --git a/core/sdk/src/api/observability/analytics.rs b/core/sdk/src/api/observability/analytics.rs index 63a193e7d..950b72581 100644 --- a/core/sdk/src/api/observability/analytics.rs +++ b/core/sdk/src/api/observability/analytics.rs @@ -1,24 +1,24 @@ use serde::{Deserialize, Serialize}; -use serde_json::Value; use crate::{ - extn::extn_client_message::{ExtnEvent, ExtnPayload, ExtnPayloadProvider}, + api::firebolt::fb_metrics::BehavioralMetricsEvent, + extn::extn_client_message::{ExtnPayload, ExtnPayloadProvider, ExtnRequest}, framework::ripple_contract::RippleContract, }; #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] -pub enum AnalyticsEvent { - SendMetrics(Value), +pub enum AnalyticsRequest { + SendMetrics(BehavioralMetricsEvent), } -impl ExtnPayloadProvider for AnalyticsEvent { +impl ExtnPayloadProvider for AnalyticsRequest { fn get_extn_payload(&self) -> ExtnPayload { - ExtnPayload::Event(ExtnEvent::Analytics(self.clone())) + ExtnPayload::Request(ExtnRequest::Analytics(self.clone())) } fn get_from_payload(payload: ExtnPayload) -> Option { - if let ExtnPayload::Event(ExtnEvent::Analytics(analytics_event)) = payload { - return Some(analytics_event); + if let ExtnPayload::Request(ExtnRequest::Analytics(analytics_request)) = payload { + return Some(analytics_request); } None } diff --git a/core/sdk/src/extn/client/extn_client.rs b/core/sdk/src/extn/client/extn_client.rs index 16828945f..f514ae690 100644 --- a/core/sdk/src/extn/client/extn_client.rs +++ b/core/sdk/src/extn/client/extn_client.rs @@ -478,6 +478,7 @@ impl ExtnClient { } } } else { + warn!("*** _DEBUG: No valid processors for the event {:?}", msg); warn!("No valid processors for the event {:?}", msg) } }; diff --git a/core/sdk/src/extn/extn_client_message.rs b/core/sdk/src/extn/extn_client_message.rs index c9a358358..934422c06 100644 --- a/core/sdk/src/extn/extn_client_message.rs +++ b/core/sdk/src/extn/extn_client_message.rs @@ -62,7 +62,7 @@ use crate::{ }, gateway::rpc_gateway_api::RpcRequest, manifest::device_manifest::AppLibraryEntry, - observability::analytics::AnalyticsEvent, + observability::analytics::AnalyticsRequest, protocol::BridgeProtocolRequest, pubsub::{PubSubEvents, PubSubRequest, PubSubResponse}, session::{AccountSessionRequest, AccountSessionResponse, SessionTokenRequest}, @@ -315,6 +315,9 @@ pub enum ExtnRequest { DistributorToken(DistributorTokenRequest), Context(RippleContextUpdateRequest), AppCatalog(AppCatalogRequest), + // 2 + Analytics(AnalyticsRequest), + // } impl ExtnPayloadProvider for ExtnRequest { @@ -397,9 +400,6 @@ pub enum ExtnEvent { PubSubEvent(PubSubEvents), TimeZone(TimeZone), AppsUpdate(AppsUpdate), - // 2 - Analytics(AnalyticsEvent), - // } impl ExtnPayloadProvider for ExtnEvent { diff --git a/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs b/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs index fd6924f5b..883302287 100644 --- a/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs +++ b/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs @@ -36,7 +36,6 @@ pub async fn boot_thunder( info!("*** _DEBUG: boot_thunder: Mark 1"); if let Ok(state) = ThunderPoolStep::setup(state).await { info!("*** _DEBUG: boot_thunder: Mark 2"); - // YAH: See if we get here now SetupThunderProcessor::setup(state.clone()).await; return Some(state); } else { diff --git a/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs b/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs index e22bea8e9..7dd70cb50 100644 --- a/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs +++ b/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs @@ -17,6 +17,7 @@ use ripple_sdk::api::firebolt::fb_telemetry::OperationalMetricRequest; use ripple_sdk::api::status_update::ExtnStatus; +use ripple_sdk::extn::client::extn_processor::ExtnStreamProcessor; use ripple_sdk::log::error; use crate::processors::thunder_analytics::ThunderAnalyticsProcessor; @@ -74,8 +75,8 @@ impl SetupThunderProcessor { // 2 // TODO: Config check here - println!("*** _DEBUG: setup"); - extn_client.add_event_processor(ThunderAnalyticsProcessor::new(state.clone().state)); + let thunder_analytics_processor = ThunderAnalyticsProcessor::new(state.clone().state); + extn_client.add_request_processor(thunder_analytics_processor); // extn_client.add_request_processor(ThunderRFCProcessor::new(state.clone().state)); let _ = extn_client.event(ExtnStatus::Ready); diff --git a/device/thunder_ripple_sdk/src/client/thunder_client.rs b/device/thunder_ripple_sdk/src/client/thunder_client.rs index cdd78e337..99b1036d4 100644 --- a/device/thunder_ripple_sdk/src/client/thunder_client.rs +++ b/device/thunder_ripple_sdk/src/client/thunder_client.rs @@ -466,10 +466,6 @@ impl ThunderClient { Ok(()) } fn extract_callsign_from_register_method(method: &str) -> Option { - println!( - "*** _DBEUG: extract_callsign_from_register_method: method={}", - method - ); // capture the initial string before an optional version number, followed by ".register" let re = Regex::new(r"^(.*?)(?:\.\d+)?\.register$").unwrap(); diff --git a/device/thunder_ripple_sdk/src/client/thunder_plugin.rs b/device/thunder_ripple_sdk/src/client/thunder_plugin.rs index 893a48e45..ca8e8fde3 100644 --- a/device/thunder_ripple_sdk/src/client/thunder_plugin.rs +++ b/device/thunder_ripple_sdk/src/client/thunder_plugin.rs @@ -35,6 +35,9 @@ pub enum ThunderPlugin { Hdcp, Telemetry, PackageManager, + // + Analytics, + // } const CONTROLLER_CFG: Cfg = Cfg::new("Controller", false, true); const DEVICE_INFO_CFG: Cfg = Cfg::new("DeviceInfo", true, false); @@ -50,6 +53,9 @@ const LOCATION_SYNC: Cfg = Cfg::new("LocationSync", false, false); const TTS_CFG: Cfg = Cfg::new("org.rdk.TextToSpeech", false, true); const TELEMETRY_CFG: Cfg = Cfg::new("org.rdk.Telemetry", false, false); const PACKAGE_MANAGER_CFG: Cfg = Cfg::new("org.rdk.PackageManager", false, false); +// +const ANALYTICS_CFG: Cfg = Cfg::new("org.rdk.Analytics", false, false); +// impl ThunderPlugin { pub fn cfg(&self) -> Cfg { @@ -69,6 +75,9 @@ impl ThunderPlugin { TextToSpeech => TTS_CFG, Telemetry => TELEMETRY_CFG, PackageManager => PACKAGE_MANAGER_CFG, + // + Analytics => ANALYTICS_CFG, + // } } pub fn callsign(&self) -> &str { diff --git a/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs b/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs index c82f72fb5..8acecb222 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs @@ -16,20 +16,21 @@ // use ripple_sdk::{ - api::observability::analytics::AnalyticsEvent, + api::observability::analytics::AnalyticsRequest, async_trait::async_trait, extn::{ - client::extn_processor::{ - DefaultExtnStreamer, ExtnEventProcessor, ExtnStreamProcessor, ExtnStreamer, + client::{ + extn_client::ExtnClient, + extn_processor::{ + DefaultExtnStreamer, ExtnRequestProcessor, ExtnStreamProcessor, ExtnStreamer, + }, }, - extn_client_message::ExtnMessage, + extn_client_message::{ExtnMessage, ExtnResponse}, }, tokio::sync::mpsc::{Receiver as MReceiver, Sender as MSender}, - utils::error::RippleError, }; -use serde::{Deserialize, Serialize}; -use crate::{client::thunder_plugin::ThunderPlugin, thunder_state::ThunderState}; +use crate::thunder_state::ThunderState; #[derive(Debug)] pub struct ThunderAnalyticsProcessor { @@ -48,7 +49,7 @@ impl ThunderAnalyticsProcessor { } impl ExtnStreamProcessor for ThunderAnalyticsProcessor { - type VALUE = AnalyticsEvent; + type VALUE = AnalyticsRequest; type STATE = ThunderState; fn get_state(&self) -> Self::STATE { @@ -65,25 +66,91 @@ impl ExtnStreamProcessor for ThunderAnalyticsProcessor { } #[async_trait] -impl ExtnEventProcessor for ThunderAnalyticsProcessor { - async fn process_event( +impl ExtnRequestProcessor for ThunderAnalyticsProcessor { + fn get_client(&self) -> ExtnClient { + self.state.get_client() + } + + async fn process_request( state: Self::STATE, - _msg: ExtnMessage, + msg: ExtnMessage, extracted_message: Self::VALUE, - ) -> Option { + ) -> bool { println!( - "*** _DEBUG: ThunderAnalyticsProcessor::process_event: extracted_message={:?}", + "*** _DEBUG: 2 ThunderAnalyticsProcessor::process_request: extracted_message={:?}", extracted_message ); + match extracted_message { - AnalyticsEvent::SendMetrics(data) => { + AnalyticsRequest::SendMetrics(data) => { println!( - "*** _DEBUG: ThunderAnalyticsProcessor::process_event: data={:?}", + "*** _DEBUG: ThunderAnalyticsProcessor::process_request: data={:?}", data ); } } - None + Self::respond(state.get_client(), msg, ExtnResponse::None(())) + .await + .is_ok() + } +} + +async fn send_metrics(state: ThunderState, metrics: Value) -> ExtnResponse { + /* + setup operation at higher scope to allow it to time itself + */ + let operation = Operation::new( + AppsOperationType::Install, + app.clone().id, + AppData::new(app.clone().version), + ); + + let method: String = ThunderPlugin::PackageManager.method("install"); + let request = InstallAppRequest::new(app.clone()); + + let metrics_timer = start_service_metrics_timer( + &state.thunder_state.get_client(), + ThunderMetricsTimerName::PackageManagerInstall.to_string(), + ); + + let device_response = state + .thunder_state + .get_thunder_client() + .call(DeviceCallRequest { + method, + params: Some(DeviceChannelParams::Json( + serde_json::to_string(&request).unwrap(), + )), + }) + .await; + + let thunder_resp = serde_json::from_value::(device_response.message); + + let status = if thunder_resp.is_ok() { + ThunderResponseStatus::Success + } else { + ThunderResponseStatus::Failure + }; + + stop_and_send_service_metrics_timer( + state.thunder_state.get_client().clone(), + metrics_timer, + status.to_string(), + ) + .await; + + match thunder_resp { + Ok(handle) => { + Self::add_or_remove_operation( + state.clone(), + handle.clone(), + operation, + Some(status.to_string()), + ); + + ExtnResponse::String(handle) + } + Err(_) => ExtnResponse::Error(RippleError::ProcessorError), } } From 23659ac3a35c6a3156f6b28314fb305542966828 Mon Sep 17 00:00:00 2001 From: pahearn73 Date: Tue, 24 Sep 2024 12:19:38 -0400 Subject: [PATCH 4/8] Analytics plug-in support: implemented --- core/main/src/firebolt/firebolt_gateway.rs | 1 - core/main/src/processor/metrics_processor.rs | 20 +--- core/sdk/src/api/firebolt/fb_metrics.rs | 13 +-- core/sdk/src/api/mod.rs | 7 +- core/sdk/src/extn/client/extn_client.rs | 1 - core/sdk/src/extn/extn_client_message.rs | 2 - core/sdk/src/framework/ripple_contract.rs | 2 - .../src/bootstrap/boot_thunder_channel.rs | 1 - .../src/bootstrap/boot_thunder.rs | 5 - .../src/bootstrap/setup_thunder_processors.rs | 4 - .../src/client/thunder_plugin.rs | 6 -- device/thunder_ripple_sdk/src/lib.rs | 4 +- .../src/processors/thunder_analytics.rs | 98 ++++++------------- 13 files changed, 42 insertions(+), 122 deletions(-) diff --git a/core/main/src/firebolt/firebolt_gateway.rs b/core/main/src/firebolt/firebolt_gateway.rs index a736ad692..bca316d1e 100644 --- a/core/main/src/firebolt/firebolt_gateway.rs +++ b/core/main/src/firebolt/firebolt_gateway.rs @@ -27,7 +27,6 @@ use ripple_sdk::{ rpc_error::RpcError, rpc_gateway_api::{ApiMessage, ApiProtocol, RpcRequest}, }, - observability::analytics::AnalyticsRequest, }, chrono::Utc, extn::extn_client_message::ExtnMessage, diff --git a/core/main/src/processor/metrics_processor.rs b/core/main/src/processor/metrics_processor.rs index d849341cd..a3d49e031 100644 --- a/core/main/src/processor/metrics_processor.rs +++ b/core/main/src/processor/metrics_processor.rs @@ -59,24 +59,6 @@ pub async fn send_metric( debug!("drop data is true, not sending BI metrics"); return Ok(()); } - // Added - // debug!("*** _DEBUG: send_metric: Mark 1"); - // if let Ok(payload_value) = serde_json::to_value(&payload) { - // let rpc_request = RpcRequest::get_new_internal( - // InternalRule::AnalyticsProcessor.to_string(), - // Some(payload_value), - // ); - // debug!("*** _DEBUG: send_metric: rpc_request={:?}", rpc_request); - - // if platform_state - // .endpoint_state - // .handle_brokerage(rpc_request, None) - // { - // debug!("*** _DEBUG: send_metric: Brokerage handled"); - // return Ok(()); - // } - // } - // if let Some(session) = platform_state.session_state.get_account_session() { let request = BehavioralMetricRequest { @@ -88,7 +70,7 @@ pub async fn send_metric( .get_client() .send_extn_request_transient(request); } - // + Err(ripple_sdk::utils::error::RippleError::ProcessorError) } diff --git a/core/sdk/src/api/firebolt/fb_metrics.rs b/core/sdk/src/api/firebolt/fb_metrics.rs index 38f777bb4..2ee7f298c 100644 --- a/core/sdk/src/api/firebolt/fb_metrics.rs +++ b/core/sdk/src/api/firebolt/fb_metrics.rs @@ -1023,20 +1023,21 @@ pub fn get_metrics_tags( Some(tags) } -// #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] pub struct BehavioralMetricsEvent { pub event_name: String, + #[serde(skip_serializing_if = "Option::is_none")] pub event_version: Option, - pub source_name: String, - pub source_version: String, + pub event_source: String, + pub event_source_version: String, pub cet_list: Vec, + #[serde(skip_serializing_if = "Option::is_none")] pub epoch_timestamp: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub uptime_timestamp: Option, - //pub payload: BehavioralMetricPayload, - pub payload: String, + pub event_payload: Value, } -// #[cfg(test)] mod tests { diff --git a/core/sdk/src/api/mod.rs b/core/sdk/src/api/mod.rs index 051731623..9eabe89b2 100644 --- a/core/sdk/src/api/mod.rs +++ b/core/sdk/src/api/mod.rs @@ -32,9 +32,6 @@ pub mod status_update; pub mod storage_property; pub mod usergrant_entry; pub mod wifi; -// Added -//pub mod rules; -// pub mod gateway { pub mod rpc_error; @@ -75,9 +72,7 @@ pub mod firebolt { } pub mod observability { + pub mod analytics; pub mod metrics_util; pub mod operational_metrics; - // 2 - pub mod analytics; - // } diff --git a/core/sdk/src/extn/client/extn_client.rs b/core/sdk/src/extn/client/extn_client.rs index f514ae690..16828945f 100644 --- a/core/sdk/src/extn/client/extn_client.rs +++ b/core/sdk/src/extn/client/extn_client.rs @@ -478,7 +478,6 @@ impl ExtnClient { } } } else { - warn!("*** _DEBUG: No valid processors for the event {:?}", msg); warn!("No valid processors for the event {:?}", msg) } }; diff --git a/core/sdk/src/extn/extn_client_message.rs b/core/sdk/src/extn/extn_client_message.rs index 934422c06..70953fcd3 100644 --- a/core/sdk/src/extn/extn_client_message.rs +++ b/core/sdk/src/extn/extn_client_message.rs @@ -315,9 +315,7 @@ pub enum ExtnRequest { DistributorToken(DistributorTokenRequest), Context(RippleContextUpdateRequest), AppCatalog(AppCatalogRequest), - // 2 Analytics(AnalyticsRequest), - // } impl ExtnPayloadProvider for ExtnRequest { diff --git a/core/sdk/src/framework/ripple_contract.rs b/core/sdk/src/framework/ripple_contract.rs index 331f933d1..dea302134 100644 --- a/core/sdk/src/framework/ripple_contract.rs +++ b/core/sdk/src/framework/ripple_contract.rs @@ -129,9 +129,7 @@ pub enum RippleContract { Apps, // Runtime ability for a given distributor to turn off a certian feature RemoteFeatureControl, - // 2 Analytics, - // } pub trait ContractAdjective: serde::ser::Serialize + DeserializeOwned { diff --git a/device/thunder/src/bootstrap/boot_thunder_channel.rs b/device/thunder/src/bootstrap/boot_thunder_channel.rs index d3c64cffe..fb753c9a7 100644 --- a/device/thunder/src/bootstrap/boot_thunder_channel.rs +++ b/device/thunder/src/bootstrap/boot_thunder_channel.rs @@ -23,7 +23,6 @@ use thunder_ripple_sdk::{ pub async fn boot_thunder_channel(state: ExtnClient) { info!("Booting thunder"); - println!("*** _DEBUG: Booting thunder 2"); let _ = boot_thunder( state, ThunderPluginBootParam { diff --git a/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs b/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs index 883302287..97e443596 100644 --- a/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs +++ b/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs @@ -31,19 +31,14 @@ pub async fn boot_thunder( plugin_param: ThunderPluginBootParam, ) -> Option { info!("Booting thunder"); - println!("*** _DEBUG: boot_thunder: Mark 0"); if let Ok(state) = ThunderGetConfigStep::setup(state, plugin_param).await { - info!("*** _DEBUG: boot_thunder: Mark 1"); if let Ok(state) = ThunderPoolStep::setup(state).await { - info!("*** _DEBUG: boot_thunder: Mark 2"); SetupThunderProcessor::setup(state.clone()).await; return Some(state); } else { - info!("*** _DEBUG: boot_thunder: Mark 3"); error!("Unable to connect to Thuner, error in ThunderPoolStep"); } } else { - info!("*** _DEBUG: boot_thunder: Mark 4"); error!("Unable to connect to Thuner, error in ThunderGetConfigStep"); } None diff --git a/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs b/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs index 7dd70cb50..94e883420 100644 --- a/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs +++ b/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_processors.rs @@ -17,7 +17,6 @@ use ripple_sdk::api::firebolt::fb_telemetry::OperationalMetricRequest; use ripple_sdk::api::status_update::ExtnStatus; -use ripple_sdk::extn::client::extn_processor::ExtnStreamProcessor; use ripple_sdk::log::error; use crate::processors::thunder_analytics::ThunderAnalyticsProcessor; @@ -73,11 +72,8 @@ impl SetupThunderProcessor { } } - // 2 - // TODO: Config check here let thunder_analytics_processor = ThunderAnalyticsProcessor::new(state.clone().state); extn_client.add_request_processor(thunder_analytics_processor); - // extn_client.add_request_processor(ThunderRFCProcessor::new(state.clone().state)); let _ = extn_client.event(ExtnStatus::Ready); } diff --git a/device/thunder_ripple_sdk/src/client/thunder_plugin.rs b/device/thunder_ripple_sdk/src/client/thunder_plugin.rs index ca8e8fde3..214994fe4 100644 --- a/device/thunder_ripple_sdk/src/client/thunder_plugin.rs +++ b/device/thunder_ripple_sdk/src/client/thunder_plugin.rs @@ -35,9 +35,7 @@ pub enum ThunderPlugin { Hdcp, Telemetry, PackageManager, - // Analytics, - // } const CONTROLLER_CFG: Cfg = Cfg::new("Controller", false, true); const DEVICE_INFO_CFG: Cfg = Cfg::new("DeviceInfo", true, false); @@ -53,9 +51,7 @@ const LOCATION_SYNC: Cfg = Cfg::new("LocationSync", false, false); const TTS_CFG: Cfg = Cfg::new("org.rdk.TextToSpeech", false, true); const TELEMETRY_CFG: Cfg = Cfg::new("org.rdk.Telemetry", false, false); const PACKAGE_MANAGER_CFG: Cfg = Cfg::new("org.rdk.PackageManager", false, false); -// const ANALYTICS_CFG: Cfg = Cfg::new("org.rdk.Analytics", false, false); -// impl ThunderPlugin { pub fn cfg(&self) -> Cfg { @@ -75,9 +71,7 @@ impl ThunderPlugin { TextToSpeech => TTS_CFG, Telemetry => TELEMETRY_CFG, PackageManager => PACKAGE_MANAGER_CFG, - // Analytics => ANALYTICS_CFG, - // } } pub fn callsign(&self) -> &str { diff --git a/device/thunder_ripple_sdk/src/lib.rs b/device/thunder_ripple_sdk/src/lib.rs index 583bbcd52..f13e8ce01 100644 --- a/device/thunder_ripple_sdk/src/lib.rs +++ b/device/thunder_ripple_sdk/src/lib.rs @@ -41,6 +41,7 @@ pub mod processors { pub mod events { pub mod thunder_event_handlers; } + pub mod thunder_analytics; pub mod thunder_package_manager; pub mod thunder_persistent_store; pub mod thunder_remote; @@ -48,9 +49,6 @@ pub mod processors { pub mod thunder_telemetry; pub mod thunder_wifi; pub mod thunder_window_manager; - // 2 - pub mod thunder_analytics; - // } pub mod utils; diff --git a/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs b/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs index 8acecb222..0bf919654 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs @@ -16,7 +16,13 @@ // use ripple_sdk::{ - api::observability::analytics::AnalyticsRequest, + api::{ + device::device_operator::{ + DeviceCallRequest, DeviceChannelParams, DeviceOperator, DeviceResponseMessage, + }, + firebolt::fb_metrics::BehavioralMetricsEvent, + observability::analytics::AnalyticsRequest, + }, async_trait::async_trait, extn::{ client::{ @@ -27,10 +33,12 @@ use ripple_sdk::{ }, extn_client_message::{ExtnMessage, ExtnResponse}, }, + serde_json, tokio::sync::mpsc::{Receiver as MReceiver, Sender as MSender}, + utils::error::RippleError, }; -use crate::thunder_state::ThunderState; +use crate::{client::thunder_plugin::ThunderPlugin, thunder_state::ThunderState}; #[derive(Debug)] pub struct ThunderAnalyticsProcessor { @@ -40,7 +48,6 @@ pub struct ThunderAnalyticsProcessor { impl ThunderAnalyticsProcessor { pub fn new(state: ThunderState) -> ThunderAnalyticsProcessor { - println!("*** _DEBUG: ThunderAnalyticsProcessor::new: entry"); ThunderAnalyticsProcessor { state, streamer: DefaultExtnStreamer::new(), @@ -76,81 +83,40 @@ impl ExtnRequestProcessor for ThunderAnalyticsProcessor { msg: ExtnMessage, extracted_message: Self::VALUE, ) -> bool { - println!( - "*** _DEBUG: 2 ThunderAnalyticsProcessor::process_request: extracted_message={:?}", - extracted_message - ); - - match extracted_message { - AnalyticsRequest::SendMetrics(data) => { - println!( - "*** _DEBUG: ThunderAnalyticsProcessor::process_request: data={:?}", - data - ); + let response_message = match extracted_message { + AnalyticsRequest::SendMetrics(event) => send_metrics(state.clone(), event).await, + }; + + let extn_response = match response_message.message["success"].as_bool() { + Some(success) => { + if success { + ExtnResponse::None(()) + } else { + ExtnResponse::Error(RippleError::ExtnError) + } } - } + None => ExtnResponse::Error(RippleError::ExtnError), + }; - Self::respond(state.get_client(), msg, ExtnResponse::None(())) + Self::respond(state.get_client(), msg, extn_response) .await .is_ok() } } -async fn send_metrics(state: ThunderState, metrics: Value) -> ExtnResponse { - /* - setup operation at higher scope to allow it to time itself - */ - let operation = Operation::new( - AppsOperationType::Install, - app.clone().id, - AppData::new(app.clone().version), - ); +async fn send_metrics( + thunder_state: ThunderState, + metrics_event: BehavioralMetricsEvent, +) -> DeviceResponseMessage { + let method: String = ThunderPlugin::Analytics.method("sendEvent"); - let method: String = ThunderPlugin::PackageManager.method("install"); - let request = InstallAppRequest::new(app.clone()); - - let metrics_timer = start_service_metrics_timer( - &state.thunder_state.get_client(), - ThunderMetricsTimerName::PackageManagerInstall.to_string(), - ); - - let device_response = state - .thunder_state + thunder_state .get_thunder_client() .call(DeviceCallRequest { method, params: Some(DeviceChannelParams::Json( - serde_json::to_string(&request).unwrap(), + serde_json::to_string(&metrics_event).unwrap(), )), }) - .await; - - let thunder_resp = serde_json::from_value::(device_response.message); - - let status = if thunder_resp.is_ok() { - ThunderResponseStatus::Success - } else { - ThunderResponseStatus::Failure - }; - - stop_and_send_service_metrics_timer( - state.thunder_state.get_client().clone(), - metrics_timer, - status.to_string(), - ) - .await; - - match thunder_resp { - Ok(handle) => { - Self::add_or_remove_operation( - state.clone(), - handle.clone(), - operation, - Some(status.to_string()), - ); - - ExtnResponse::String(handle) - } - Err(_) => ExtnResponse::Error(RippleError::ProcessorError), - } + .await } From 1cc6f331906f1f0d55c629adc50b5a1d1765bfd0 Mon Sep 17 00:00:00 2001 From: pahearn73 Date: Tue, 24 Sep 2024 12:41:55 -0400 Subject: [PATCH 5/8] Analytics plug-in support: Removed unused file --- core/sdk/src/api/rules.rs | 17 ----------------- 1 file changed, 17 deletions(-) delete mode 100644 core/sdk/src/api/rules.rs diff --git a/core/sdk/src/api/rules.rs b/core/sdk/src/api/rules.rs deleted file mode 100644 index 1e1dd7409..000000000 --- a/core/sdk/src/api/rules.rs +++ /dev/null @@ -1,17 +0,0 @@ -pub enum InternalRule { - NotificationServiceOnConnect, - AnalyticsProcessor, -} - -impl ToString for InternalRule { - fn to_string(&self) -> String { - let suffix = match self { - InternalRule::NotificationServiceOnConnect => { - String::from("notification_service_on_connect") - } - InternalRule::AnalyticsProcessor => String::from("analytics_processor"), - }; - - format!("internal.{}", suffix) - } -} From b32e749fa67b86f0a36bc4fde0799b55e2133c17 Mon Sep 17 00:00:00 2001 From: pahearn73 Date: Fri, 18 Oct 2024 12:09:26 -0400 Subject: [PATCH 6/8] feat: RPPL-2520: Analytics plug-in support: Clippy --- core/main/src/processor/metrics_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/main/src/processor/metrics_processor.rs b/core/main/src/processor/metrics_processor.rs index a3d49e031..71d94e13f 100644 --- a/core/main/src/processor/metrics_processor.rs +++ b/core/main/src/processor/metrics_processor.rs @@ -25,7 +25,7 @@ use ripple_sdk::{ }, fb_telemetry::OperationalMetricRequest, }, - gateway::rpc_gateway_api::{CallContext, RpcRequest}, + gateway::rpc_gateway_api::CallContext, }, async_trait::async_trait, extn::{ From 72f7b6be3cf297f676bd7d4ad3c6526f43f44a4f Mon Sep 17 00:00:00 2001 From: pahearn73 Date: Thu, 31 Oct 2024 15:07:38 -0400 Subject: [PATCH 7/8] feat: RPPL-2520: Analytics plug-in support: Fixed ExtnClient handle_vec_stream issue. --- core/sdk/src/extn/client/extn_client.rs | 1 - device/thunder_ripple_sdk/src/processors/thunder_analytics.rs | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/sdk/src/extn/client/extn_client.rs b/core/sdk/src/extn/client/extn_client.rs index e0850fb85..b5d219a7c 100644 --- a/core/sdk/src/extn/client/extn_client.rs +++ b/core/sdk/src/extn/client/extn_client.rs @@ -326,7 +326,6 @@ impl ExtnClient { if let Some(sender) = req_sender { let _ = new_message.callback.insert(sender); } - Self::handle_vec_stream(message, self.event_processors.clone()); } tokio::spawn(async move { diff --git a/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs b/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs index 0bf919654..12d1ec32a 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs @@ -28,7 +28,8 @@ use ripple_sdk::{ client::{ extn_client::ExtnClient, extn_processor::{ - DefaultExtnStreamer, ExtnRequestProcessor, ExtnStreamProcessor, ExtnStreamer, + DefaultExtnStreamer, ExtnEventProcessor, ExtnRequestProcessor, ExtnStreamProcessor, + ExtnStreamer, }, }, extn_client_message::{ExtnMessage, ExtnResponse}, From 491134a3a4ef6e49a8b846698aacc3d26387de11 Mon Sep 17 00:00:00 2001 From: pahearn73 Date: Thu, 31 Oct 2024 15:19:33 -0400 Subject: [PATCH 8/8] feat: RPPL-2520: Analytics plug-in support: Clippy --- device/thunder_ripple_sdk/src/processors/thunder_analytics.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs b/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs index 12d1ec32a..0bf919654 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_analytics.rs @@ -28,8 +28,7 @@ use ripple_sdk::{ client::{ extn_client::ExtnClient, extn_processor::{ - DefaultExtnStreamer, ExtnEventProcessor, ExtnRequestProcessor, ExtnStreamProcessor, - ExtnStreamer, + DefaultExtnStreamer, ExtnRequestProcessor, ExtnStreamProcessor, ExtnStreamer, }, }, extn_client_message::{ExtnMessage, ExtnResponse},