Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analytics plug-in support #635

Merged
merged 13 commits into from
Nov 1, 2024
Merged
1 change: 1 addition & 0 deletions core/main/src/firebolt/firebolt_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ impl FireboltGateway {
}
}
let platform_state = self.state.platform_state.clone();

/*
* The reason for spawning a new thread is that when request-1 comes, and it waits for
* user grant. The response from user grant, (eg ChallengeResponse) comes as rpc which
Expand Down
2 changes: 2 additions & 0 deletions core/main/src/processor/metrics_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub async fn send_metric(
debug!("drop data is true, not sending BI metrics");
return Ok(());
}

if let Some(session) = platform_state.session_state.get_account_session() {
let request = BehavioralMetricRequest {
context: Some(platform_state.metrics.get_context()),
Expand All @@ -69,6 +70,7 @@ pub async fn send_metric(
.get_client()
.send_extn_request_transient(request);
}

Err(ripple_sdk::utils::error::RippleError::ProcessorError)
}

Expand Down
16 changes: 16 additions & 0 deletions core/sdk/src/api/firebolt/fb_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,22 @@ pub fn get_metrics_tags(
Some(tags)
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct BehavioralMetricsEvent {
pub event_name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub event_version: Option<String>,
pub event_source: String,
pub event_source_version: String,
pub cet_list: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub epoch_timestamp: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub uptime_timestamp: Option<u64>,
pub event_payload: Value,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions core/sdk/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub mod firebolt {
}

pub mod observability {
pub mod analytics;
pub mod metrics_util;
pub mod operational_metrics;
}
29 changes: 29 additions & 0 deletions core/sdk/src/api/observability/analytics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use serde::{Deserialize, Serialize};

use crate::{
api::firebolt::fb_metrics::BehavioralMetricsEvent,
extn::extn_client_message::{ExtnPayload, ExtnPayloadProvider, ExtnRequest},
framework::ripple_contract::RippleContract,
};

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub enum AnalyticsRequest {
SendMetrics(BehavioralMetricsEvent),
}

impl ExtnPayloadProvider for AnalyticsRequest {
fn get_extn_payload(&self) -> ExtnPayload {
ExtnPayload::Request(ExtnRequest::Analytics(self.clone()))
}

fn get_from_payload(payload: ExtnPayload) -> Option<Self> {
if let ExtnPayload::Request(ExtnRequest::Analytics(analytics_request)) = payload {
return Some(analytics_request);
}
None
}

fn contract() -> RippleContract {
RippleContract::Analytics
}
}
1 change: 0 additions & 1 deletion core/sdk/src/extn/client/extn_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ impl ExtnClient {
if let Some(sender) = req_sender {
let _ = new_message.callback.insert(sender);
}
Self::handle_vec_stream(message, self.event_processors.clone());
}

tokio::spawn(async move {
Expand Down
2 changes: 2 additions & 0 deletions core/sdk/src/extn/extn_client_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use crate::{
},
gateway::rpc_gateway_api::RpcRequest,
manifest::device_manifest::AppLibraryEntry,
observability::analytics::AnalyticsRequest,
protocol::BridgeProtocolRequest,
pubsub::{PubSubEvents, PubSubRequest, PubSubResponse},
session::{AccountSessionRequest, AccountSessionResponse, SessionTokenRequest},
Expand Down Expand Up @@ -314,6 +315,7 @@ pub enum ExtnRequest {
DistributorToken(DistributorTokenRequest),
Context(RippleContextUpdateRequest),
AppCatalog(AppCatalogRequest),
Analytics(AnalyticsRequest),
}

impl ExtnPayloadProvider for ExtnRequest {
Expand Down
1 change: 1 addition & 0 deletions core/sdk/src/framework/ripple_contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ pub enum RippleContract {
Apps,
// Runtime ability for a given distributor to turn off a certian feature
RemoteFeatureControl,
Analytics,
}

pub trait ContractAdjective: serde::ser::Serialize + DeserializeOwned {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use ripple_sdk::api::firebolt::fb_telemetry::OperationalMetricRequest;
use ripple_sdk::api::status_update::ExtnStatus;
use ripple_sdk::log::error;

use crate::processors::thunder_analytics::ThunderAnalyticsProcessor;
use crate::processors::thunder_package_manager::ThunderPackageManagerRequestProcessor;
use crate::processors::thunder_rfc::ThunderRFCProcessor;
use crate::processors::thunder_telemetry::ThunderTelemetryProcessor;
Expand Down Expand Up @@ -70,6 +71,9 @@ impl SetupThunderProcessor {
Err(_) => error!("Telemetry not setup"),
}
}

let thunder_analytics_processor = ThunderAnalyticsProcessor::new(state.clone().state);
extn_client.add_request_processor(thunder_analytics_processor);
extn_client.add_request_processor(ThunderRFCProcessor::new(state.clone().state));
let _ = extn_client.event(ExtnStatus::Ready);
}
Expand Down
3 changes: 3 additions & 0 deletions device/thunder_ripple_sdk/src/client/thunder_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub enum ThunderPlugin {
Hdcp,
Telemetry,
PackageManager,
Analytics,
}
const CONTROLLER_CFG: Cfg = Cfg::new("Controller", false, true);
const DEVICE_INFO_CFG: Cfg = Cfg::new("DeviceInfo", true, false);
Expand All @@ -50,6 +51,7 @@ const LOCATION_SYNC: Cfg = Cfg::new("LocationSync", false, false);
const TTS_CFG: Cfg = Cfg::new("org.rdk.TextToSpeech", false, true);
const TELEMETRY_CFG: Cfg = Cfg::new("org.rdk.Telemetry", false, false);
const PACKAGE_MANAGER_CFG: Cfg = Cfg::new("org.rdk.PackageManager", false, false);
const ANALYTICS_CFG: Cfg = Cfg::new("org.rdk.Analytics", false, false);

impl ThunderPlugin {
pub fn cfg(&self) -> Cfg {
Expand All @@ -69,6 +71,7 @@ impl ThunderPlugin {
TextToSpeech => TTS_CFG,
Telemetry => TELEMETRY_CFG,
PackageManager => PACKAGE_MANAGER_CFG,
Analytics => ANALYTICS_CFG,
}
}
pub fn callsign(&self) -> &str {
Expand Down
1 change: 1 addition & 0 deletions device/thunder_ripple_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub mod processors {
pub mod events {
pub mod thunder_event_handlers;
}
pub mod thunder_analytics;
pub mod thunder_package_manager;
pub mod thunder_persistent_store;
pub mod thunder_remote;
Expand Down
122 changes: 122 additions & 0 deletions device/thunder_ripple_sdk/src/processors/thunder_analytics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2023 Comcast Cable Communications Management, LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
//

use ripple_sdk::{
api::{
device::device_operator::{
DeviceCallRequest, DeviceChannelParams, DeviceOperator, DeviceResponseMessage,
},
firebolt::fb_metrics::BehavioralMetricsEvent,
observability::analytics::AnalyticsRequest,
},
async_trait::async_trait,
extn::{
client::{
extn_client::ExtnClient,
extn_processor::{
DefaultExtnStreamer, ExtnRequestProcessor, ExtnStreamProcessor, ExtnStreamer,
},
},
extn_client_message::{ExtnMessage, ExtnResponse},
},
serde_json,
tokio::sync::mpsc::{Receiver as MReceiver, Sender as MSender},
utils::error::RippleError,
};

use crate::{client::thunder_plugin::ThunderPlugin, thunder_state::ThunderState};

#[derive(Debug)]
pub struct ThunderAnalyticsProcessor {
state: ThunderState,
streamer: DefaultExtnStreamer,
}

impl ThunderAnalyticsProcessor {
pub fn new(state: ThunderState) -> ThunderAnalyticsProcessor {
ThunderAnalyticsProcessor {
state,
streamer: DefaultExtnStreamer::new(),
}
}
}

impl ExtnStreamProcessor for ThunderAnalyticsProcessor {
type VALUE = AnalyticsRequest;
type STATE = ThunderState;

fn get_state(&self) -> Self::STATE {
self.state.clone()
}

fn sender(&self) -> MSender<ExtnMessage> {
self.streamer.sender()
}

fn receiver(&mut self) -> MReceiver<ExtnMessage> {
self.streamer.receiver()
}
}

#[async_trait]
impl ExtnRequestProcessor for ThunderAnalyticsProcessor {
fn get_client(&self) -> ExtnClient {
self.state.get_client()
}

async fn process_request(
state: Self::STATE,
msg: ExtnMessage,
extracted_message: Self::VALUE,
) -> bool {
let response_message = match extracted_message {
AnalyticsRequest::SendMetrics(event) => send_metrics(state.clone(), event).await,
};

let extn_response = match response_message.message["success"].as_bool() {
Some(success) => {
if success {
ExtnResponse::None(())
} else {
ExtnResponse::Error(RippleError::ExtnError)
}
}
None => ExtnResponse::Error(RippleError::ExtnError),
};

Self::respond(state.get_client(), msg, extn_response)
.await
.is_ok()
}
}

async fn send_metrics(
thunder_state: ThunderState,
metrics_event: BehavioralMetricsEvent,
) -> DeviceResponseMessage {
let method: String = ThunderPlugin::Analytics.method("sendEvent");

thunder_state
.get_thunder_client()
.call(DeviceCallRequest {
method,
params: Some(DeviceChannelParams::Json(
serde_json::to_string(&metrics_event).unwrap(),
)),
})
.await
}
Loading