Skip to content

Commit

Permalink
Analytics plug-in support (#635)
Browse files Browse the repository at this point in the history
  • Loading branch information
pahearn73 authored Nov 1, 2024
1 parent f110279 commit b1e3f9d
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 1 deletion.
1 change: 1 addition & 0 deletions core/main/src/firebolt/firebolt_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ impl FireboltGateway {
}
}
let platform_state = self.state.platform_state.clone();

/*
* The reason for spawning a new thread is that when request-1 comes, and it waits for
* user grant. The response from user grant, (eg ChallengeResponse) comes as rpc which
Expand Down
2 changes: 2 additions & 0 deletions core/main/src/processor/metrics_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub async fn send_metric(
debug!("drop data is true, not sending BI metrics");
return Ok(());
}

if let Some(session) = platform_state.session_state.get_account_session() {
let request = BehavioralMetricRequest {
context: Some(platform_state.metrics.get_context()),
Expand All @@ -69,6 +70,7 @@ pub async fn send_metric(
.get_client()
.send_extn_request_transient(request);
}

Err(ripple_sdk::utils::error::RippleError::ProcessorError)
}

Expand Down
16 changes: 16 additions & 0 deletions core/sdk/src/api/firebolt/fb_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,22 @@ pub fn get_metrics_tags(
Some(tags)
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct BehavioralMetricsEvent {
pub event_name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub event_version: Option<String>,
pub event_source: String,
pub event_source_version: String,
pub cet_list: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub epoch_timestamp: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub uptime_timestamp: Option<u64>,
pub event_payload: Value,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions core/sdk/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub mod firebolt {
}

pub mod observability {
pub mod analytics;
pub mod metrics_util;
pub mod operational_metrics;
}
29 changes: 29 additions & 0 deletions core/sdk/src/api/observability/analytics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use serde::{Deserialize, Serialize};

use crate::{
api::firebolt::fb_metrics::BehavioralMetricsEvent,
extn::extn_client_message::{ExtnPayload, ExtnPayloadProvider, ExtnRequest},
framework::ripple_contract::RippleContract,
};

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub enum AnalyticsRequest {
SendMetrics(BehavioralMetricsEvent),
}

impl ExtnPayloadProvider for AnalyticsRequest {
fn get_extn_payload(&self) -> ExtnPayload {
ExtnPayload::Request(ExtnRequest::Analytics(self.clone()))
}

fn get_from_payload(payload: ExtnPayload) -> Option<Self> {
if let ExtnPayload::Request(ExtnRequest::Analytics(analytics_request)) = payload {
return Some(analytics_request);
}
None
}

fn contract() -> RippleContract {
RippleContract::Analytics
}
}
1 change: 0 additions & 1 deletion core/sdk/src/extn/client/extn_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ impl ExtnClient {
if let Some(sender) = req_sender {
let _ = new_message.callback.insert(sender);
}
Self::handle_vec_stream(message, self.event_processors.clone());
}

tokio::spawn(async move {
Expand Down
2 changes: 2 additions & 0 deletions core/sdk/src/extn/extn_client_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use crate::{
},
gateway::rpc_gateway_api::RpcRequest,
manifest::device_manifest::AppLibraryEntry,
observability::analytics::AnalyticsRequest,
protocol::BridgeProtocolRequest,
pubsub::{PubSubEvents, PubSubRequest, PubSubResponse},
session::{AccountSessionRequest, AccountSessionResponse, SessionTokenRequest},
Expand Down Expand Up @@ -314,6 +315,7 @@ pub enum ExtnRequest {
DistributorToken(DistributorTokenRequest),
Context(RippleContextUpdateRequest),
AppCatalog(AppCatalogRequest),
Analytics(AnalyticsRequest),
}

impl ExtnPayloadProvider for ExtnRequest {
Expand Down
1 change: 1 addition & 0 deletions core/sdk/src/framework/ripple_contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ pub enum RippleContract {
Apps,
// Runtime ability for a given distributor to turn off a certian feature
RemoteFeatureControl,
Analytics,
}

pub trait ContractAdjective: serde::ser::Serialize + DeserializeOwned {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use ripple_sdk::api::firebolt::fb_telemetry::OperationalMetricRequest;
use ripple_sdk::api::status_update::ExtnStatus;
use ripple_sdk::log::error;

use crate::processors::thunder_analytics::ThunderAnalyticsProcessor;
use crate::processors::thunder_package_manager::ThunderPackageManagerRequestProcessor;
use crate::processors::thunder_rfc::ThunderRFCProcessor;
use crate::processors::thunder_telemetry::ThunderTelemetryProcessor;
Expand Down Expand Up @@ -70,6 +71,9 @@ impl SetupThunderProcessor {
Err(_) => error!("Telemetry not setup"),
}
}

let thunder_analytics_processor = ThunderAnalyticsProcessor::new(state.clone().state);
extn_client.add_request_processor(thunder_analytics_processor);
extn_client.add_request_processor(ThunderRFCProcessor::new(state.clone().state));
let _ = extn_client.event(ExtnStatus::Ready);
}
Expand Down
3 changes: 3 additions & 0 deletions device/thunder_ripple_sdk/src/client/thunder_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub enum ThunderPlugin {
Hdcp,
Telemetry,
PackageManager,
Analytics,
}
const CONTROLLER_CFG: Cfg = Cfg::new("Controller", false, true);
const DEVICE_INFO_CFG: Cfg = Cfg::new("DeviceInfo", true, false);
Expand All @@ -50,6 +51,7 @@ const LOCATION_SYNC: Cfg = Cfg::new("LocationSync", false, false);
const TTS_CFG: Cfg = Cfg::new("org.rdk.TextToSpeech", false, true);
const TELEMETRY_CFG: Cfg = Cfg::new("org.rdk.Telemetry", false, false);
const PACKAGE_MANAGER_CFG: Cfg = Cfg::new("org.rdk.PackageManager", false, false);
const ANALYTICS_CFG: Cfg = Cfg::new("org.rdk.Analytics", false, false);

impl ThunderPlugin {
pub fn cfg(&self) -> Cfg {
Expand All @@ -69,6 +71,7 @@ impl ThunderPlugin {
TextToSpeech => TTS_CFG,
Telemetry => TELEMETRY_CFG,
PackageManager => PACKAGE_MANAGER_CFG,
Analytics => ANALYTICS_CFG,
}
}
pub fn callsign(&self) -> &str {
Expand Down
1 change: 1 addition & 0 deletions device/thunder_ripple_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub mod processors {
pub mod events {
pub mod thunder_event_handlers;
}
pub mod thunder_analytics;
pub mod thunder_package_manager;
pub mod thunder_persistent_store;
pub mod thunder_remote;
Expand Down
122 changes: 122 additions & 0 deletions device/thunder_ripple_sdk/src/processors/thunder_analytics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2023 Comcast Cable Communications Management, LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
//

use ripple_sdk::{
api::{
device::device_operator::{
DeviceCallRequest, DeviceChannelParams, DeviceOperator, DeviceResponseMessage,
},
firebolt::fb_metrics::BehavioralMetricsEvent,
observability::analytics::AnalyticsRequest,
},
async_trait::async_trait,
extn::{
client::{
extn_client::ExtnClient,
extn_processor::{
DefaultExtnStreamer, ExtnRequestProcessor, ExtnStreamProcessor, ExtnStreamer,
},
},
extn_client_message::{ExtnMessage, ExtnResponse},
},
serde_json,
tokio::sync::mpsc::{Receiver as MReceiver, Sender as MSender},
utils::error::RippleError,
};

use crate::{client::thunder_plugin::ThunderPlugin, thunder_state::ThunderState};

#[derive(Debug)]
pub struct ThunderAnalyticsProcessor {
state: ThunderState,
streamer: DefaultExtnStreamer,
}

impl ThunderAnalyticsProcessor {
pub fn new(state: ThunderState) -> ThunderAnalyticsProcessor {
ThunderAnalyticsProcessor {
state,
streamer: DefaultExtnStreamer::new(),
}
}
}

impl ExtnStreamProcessor for ThunderAnalyticsProcessor {
type VALUE = AnalyticsRequest;
type STATE = ThunderState;

fn get_state(&self) -> Self::STATE {
self.state.clone()
}

fn sender(&self) -> MSender<ExtnMessage> {
self.streamer.sender()
}

fn receiver(&mut self) -> MReceiver<ExtnMessage> {
self.streamer.receiver()
}
}

#[async_trait]
impl ExtnRequestProcessor for ThunderAnalyticsProcessor {
fn get_client(&self) -> ExtnClient {
self.state.get_client()
}

async fn process_request(
state: Self::STATE,
msg: ExtnMessage,
extracted_message: Self::VALUE,
) -> bool {
let response_message = match extracted_message {
AnalyticsRequest::SendMetrics(event) => send_metrics(state.clone(), event).await,
};

let extn_response = match response_message.message["success"].as_bool() {
Some(success) => {
if success {
ExtnResponse::None(())
} else {
ExtnResponse::Error(RippleError::ExtnError)
}
}
None => ExtnResponse::Error(RippleError::ExtnError),
};

Self::respond(state.get_client(), msg, extn_response)
.await
.is_ok()
}
}

async fn send_metrics(
thunder_state: ThunderState,
metrics_event: BehavioralMetricsEvent,
) -> DeviceResponseMessage {
let method: String = ThunderPlugin::Analytics.method("sendEvent");

thunder_state
.get_thunder_client()
.call(DeviceCallRequest {
method,
params: Some(DeviceChannelParams::Json(
serde_json::to_string(&metrics_event).unwrap(),
)),
})
.await
}

0 comments on commit b1e3f9d

Please sign in to comment.