diff --git a/src/telemetry_basic.rs b/src/telemetry_basic.rs index 89139ea..da7beb8 100644 --- a/src/telemetry_basic.rs +++ b/src/telemetry_basic.rs @@ -135,13 +135,13 @@ pub async fn compress_basic_telemetry_to_file( } // even if there's an error with i/o, storage is now clear, preventing infinite memory growth info!("basic telemetry save \"{}\"", fn_net.to_str().unwrap()); - let io_result = _file_save(fn_net.clone(), big_json_net).await; + let io_result = file_save(fn_net.clone(), big_json_net).await; if io_result.is_err() { error!("error: {}", io_result.err().unwrap()); } } -async fn _file_save(path: PathBuf, json: serde_json::Value) -> Result<(), String> { +pub async fn file_save(path: PathBuf, json: serde_json::Value) -> Result<(), String> { let mut f = tokio::fs::File::create(path).await.map_err(|e| format!("{:?}", e))?; f.write_all(serde_json::to_string_pretty(&json).unwrap().as_bytes()).await.map_err(|e| format!("{}", e))?; Ok(()) diff --git a/src/telemetry_snippets.rs b/src/telemetry_snippets.rs index 3cf05be..780e6d5 100644 --- a/src/telemetry_snippets.rs +++ b/src/telemetry_snippets.rs @@ -1,16 +1,20 @@ -use tracing::info; +use tracing::{error, info}; use std::sync::Arc; use tokio::sync::RwLock as ARwLock; use std::sync::RwLock as StdRwLock; +use chrono::{Local}; +use std::path::PathBuf; // use std::collections::HashMap; // use reqwest_eventsource::Event; // use futures::StreamExt; // use async_stream::stream; // use serde_json::json; // use crate::caps::CodeAssistantCaps; +use crate::telemetry_basic::file_save; use crate::call_validation; use serde::Deserialize; use serde::Serialize; +use serde_json::json; use crate::global_context; use crate::completion_cache; use crate::telemetry_storage; @@ -56,6 +60,7 @@ pub struct SnippetTelemetry { // pub remaining_percent_300s: f64, // pub remaining_percent_walkaway: f64, // pub walkaway_ms: u64, + pub created_at: i64 } pub fn snippet_register( @@ -71,6 +76,7 @@ pub fn snippet_register( accepted: false, corrected_by_user: "".to_string(), remaining_percent_30s: 0.0, + created_at: Local::now().timestamp(), }; storage_locked.tele_snippet_next_id += 1; storage_locked.tele_snippets.push(snip); @@ -108,7 +114,7 @@ pub async fn snippet_accepted( return false; } -pub async fn sources_changed( +pub async fn sources_changed( // TODO gcx: Arc>, uri: &String, text: &String, @@ -135,10 +141,12 @@ pub async fn sources_changed( if !orig_text.is_some() { continue; } + let time_from_creation = Local::now().timestamp() - snip.created_at; let (valid1, mut gray_suggested) = if_head_tail_equal_return_added_text( orig_text.unwrap(), text ); + snip.corrected_by_user = gray_suggested.clone(); gray_suggested = gray_suggested.replace("\r", ""); info!("valid1: {:?}, gray_suggested: {:?}", valid1, gray_suggested); info!("orig grey_text: {:?}", snip.grey_text); @@ -219,3 +227,55 @@ pub fn unchanged_percentage( let largest_of_two = text_a.len().max(text_b.len()); (common as f64) / (largest_of_two as f64) } + +fn _compress_telemetry_snippets( + storage: Arc>, +) -> serde_json::Value { + let mut records = serde_json::json!([]); + { + let storage_locked = storage.read().unwrap(); + for rec in storage_locked.tele_snippets.iter() { + let json_dict = serde_json::to_value(rec).unwrap(); + records.as_array_mut().unwrap().push(json_dict); + } + } + records +} + + +pub async fn compress_telemetry_snippets_to_file( + cx: Arc>, +) { + let now = chrono::Local::now(); + let cache_dir: PathBuf; + let storage: Arc>; + let enduser_client_version; + { + let cx_locked = cx.read().await; + storage = cx_locked.telemetry.clone(); + cache_dir = cx_locked.cache_dir.clone(); + enduser_client_version = cx_locked.cmdline.enduser_client_version.clone(); + } + let dir = cache_dir.join("telemetry").join("compressed"); + tokio::fs::create_dir_all(dir.clone()).await.unwrap_or_else(|_| {}); + + let records = _compress_telemetry_snippets(storage.clone()); + let fn_snip = dir.join(format!("{}-snip.json", now.format("%Y%m%d-%H%M%S"))); + let mut big_json_snip = json!({ + "records": records, + "ts_end": now.timestamp(), + "teletype": "snippets", + "enduser_client_version": enduser_client_version, + }); + { + let mut storage_locked = storage.write().unwrap(); + storage_locked.tele_snippets.clear(); + big_json_snip.as_object_mut().unwrap().insert("ts_start".to_string(), json!(storage_locked.last_flushed_ts)); + storage_locked.last_flushed_ts = now.timestamp(); + } + let io_result = file_save(fn_snip, big_json_snip).await; + if io_result.is_err() { + error!("cannot save telemetry file: {}", io_result.err().unwrap()); + } +} + diff --git a/src/telemetry_storage.rs b/src/telemetry_storage.rs index a4b8451..982cc8a 100644 --- a/src/telemetry_storage.rs +++ b/src/telemetry_storage.rs @@ -5,6 +5,8 @@ use std::sync::RwLock as StdRwLock; use std::path::PathBuf; use tokio::sync::RwLock as ARwLock; use serde_json::json; +use tracing::log::Level; +use tracing::log::Level::Error; use crate::caps::CodeAssistantCaps; use crate::global_context; @@ -87,40 +89,51 @@ pub async fn send_telemetry_files_to_mothership( dir_compressed: PathBuf, dir_sent: PathBuf, telemetry_basic_dest: String, + telemetry_corrected_snippets_dest: String, api_key: String, ) { - // Send files found in dir_compressed, move to dir_sent if successful. - let files = _sorted_files(dir_compressed.clone()).await; - let http_client = reqwest::Client::new(); - for path in files { + async fn send_telemetry_file( + path: &PathBuf, + telemetry_dest: &String, + api_key: &String, + ) -> Result<(), String>{ let contents_maybe = _read_file(path.clone()).await; if contents_maybe.is_err() { - error!("cannot read {}: {}", path.display(), contents_maybe.err().unwrap()); - break; + return Err(format!("cannot read {}: {}", path.display(), contents_maybe.err().unwrap())); } let contents = contents_maybe.unwrap(); - info!("sending telemetry file\n{}\nto url\n{}", path.to_str().unwrap(), telemetry_basic_dest); - let resp_maybe = http_client.post(telemetry_basic_dest.clone()) - .body(contents) - .header(reqwest::header::AUTHORIZATION, format!("Bearer {}", api_key)) - .header(reqwest::header::CONTENT_TYPE, format!("application/json")) - .send().await; + info!("sending telemetry file\n{}\nto url\n{}", path.to_str().unwrap(), telemetry_dest); + let resp_maybe = reqwest::Client::new().post(telemetry_dest.clone()) + .body(contents) + .header(reqwest::header::AUTHORIZATION, format!("Bearer {}", api_key)) + .header(reqwest::header::CONTENT_TYPE, format!("application/json")) + .send().await; if resp_maybe.is_err() { - error!("telemetry send failed: {}\ndest url was\n{}", resp_maybe.err().unwrap(), telemetry_basic_dest); - break; + return Err(format!("telemetry send failed: {}\ndest url was\n{}", resp_maybe.err().unwrap(), telemetry_dest)); } let resp = resp_maybe.unwrap(); if resp.status()!= reqwest::StatusCode::OK { - error!("telemetry send failed: {}\ndest url was\n{}", resp.status(), telemetry_basic_dest); - break; + return Err(format!("telemetry send failed: {}\ndest url was\n{}", resp.status(), telemetry_dest)); } let resp_body = resp.text().await.unwrap_or_else(|_| "-empty-".to_string()); info!("telemetry send success, response:\n{}", resp_body); let resp_json = serde_json::from_str::(&resp_body).unwrap_or_else(|_| json!({})); let retcode = resp_json["retcode"].as_str().unwrap_or("").to_string(); if retcode != "OK" { - error!("retcode is not OK"); - break; + return Err("retcode is not OK".to_string()); + } + Ok(()) + } + + // Send files found in dir_compressed, move to dir_sent if successful. + let files = _sorted_files(dir_compressed.clone()).await; + for path in files { + if path.to_str().unwrap().ends_with("-net.json") { + send_telemetry_file(&path, &telemetry_basic_dest, &api_key).await; + } else if path.to_str().unwrap().ends_with("-snip.json") { + send_telemetry_file(&path, &telemetry_corrected_snippets_dest, &api_key).await; + } else { + continue; } let new_path = dir_sent.join(path.file_name().unwrap()); info!("success, moving file to {}", new_path.to_str().unwrap()); @@ -154,6 +167,7 @@ pub async fn telemetry_full_cycle( telemetry_basic_dest = caps.unwrap().read().unwrap().telemetry_basic_dest.clone(); } telemetry_basic::compress_basic_telemetry_to_file(global_context.clone()).await; + telemetry_snippets::compress_telemetry_snippets_to_file(global_context.clone()).await; let dir_compressed = cache_dir.join("telemetry").join("compressed"); let dir_sent = cache_dir.join("telemetry").join("sent"); if mothership_enabled && !telemetry_basic_dest.is_empty() && !skip_sending_part {