Skip to content
This repository has been archived by the owner on Oct 19, 2023. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
valaises committed Oct 19, 2023
1 parent 7f2c854 commit 24a774d
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/telemetry_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,13 @@ pub async fn compress_basic_telemetry_to_file(
}
// even if there's an error with i/o, storage is now clear, preventing infinite memory growth
info!("basic telemetry save \"{}\"", fn_net.to_str().unwrap());
let io_result = _file_save(fn_net.clone(), big_json_net).await;
let io_result = file_save(fn_net.clone(), big_json_net).await;
if io_result.is_err() {
error!("error: {}", io_result.err().unwrap());
}
}

async fn _file_save(path: PathBuf, json: serde_json::Value) -> Result<(), String> {
pub async fn file_save(path: PathBuf, json: serde_json::Value) -> Result<(), String> {
let mut f = tokio::fs::File::create(path).await.map_err(|e| format!("{:?}", e))?;
f.write_all(serde_json::to_string_pretty(&json).unwrap().as_bytes()).await.map_err(|e| format!("{}", e))?;
Ok(())
Expand Down
64 changes: 62 additions & 2 deletions src/telemetry_snippets.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use tracing::info;
use tracing::{error, info};
use std::sync::Arc;
use tokio::sync::RwLock as ARwLock;
use std::sync::RwLock as StdRwLock;
use chrono::{Local};
use std::path::PathBuf;
// use std::collections::HashMap;
// use reqwest_eventsource::Event;
// use futures::StreamExt;
// use async_stream::stream;
// use serde_json::json;
// use crate::caps::CodeAssistantCaps;
use crate::telemetry_basic::file_save;
use crate::call_validation;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use crate::global_context;
use crate::completion_cache;
use crate::telemetry_storage;
Expand Down Expand Up @@ -56,6 +60,7 @@ pub struct SnippetTelemetry {
// pub remaining_percent_300s: f64,
// pub remaining_percent_walkaway: f64,
// pub walkaway_ms: u64,
pub created_at: i64
}

pub fn snippet_register(
Expand All @@ -71,6 +76,7 @@ pub fn snippet_register(
accepted: false,
corrected_by_user: "".to_string(),
remaining_percent_30s: 0.0,
created_at: Local::now().timestamp(),
};
storage_locked.tele_snippet_next_id += 1;
storage_locked.tele_snippets.push(snip);
Expand Down Expand Up @@ -108,7 +114,7 @@ pub async fn snippet_accepted(
return false;
}

pub async fn sources_changed(
pub async fn sources_changed( // TODO
gcx: Arc<ARwLock<global_context::GlobalContext>>,
uri: &String,
text: &String,
Expand All @@ -135,10 +141,12 @@ pub async fn sources_changed(
if !orig_text.is_some() {
continue;
}
let time_from_creation = Local::now().timestamp() - snip.created_at;
let (valid1, mut gray_suggested) = if_head_tail_equal_return_added_text(
orig_text.unwrap(),
text
);
snip.corrected_by_user = gray_suggested.clone();
gray_suggested = gray_suggested.replace("\r", "");
info!("valid1: {:?}, gray_suggested: {:?}", valid1, gray_suggested);
info!("orig grey_text: {:?}", snip.grey_text);
Expand Down Expand Up @@ -219,3 +227,55 @@ pub fn unchanged_percentage(
let largest_of_two = text_a.len().max(text_b.len());
(common as f64) / (largest_of_two as f64)
}

fn _compress_telemetry_snippets(
storage: Arc<StdRwLock<telemetry_storage::Storage>>,
) -> serde_json::Value {
let mut records = serde_json::json!([]);
{
let storage_locked = storage.read().unwrap();
for rec in storage_locked.tele_snippets.iter() {
let json_dict = serde_json::to_value(rec).unwrap();
records.as_array_mut().unwrap().push(json_dict);
}
}
records
}


pub async fn compress_telemetry_snippets_to_file(
cx: Arc<ARwLock<global_context::GlobalContext>>,
) {
let now = chrono::Local::now();
let cache_dir: PathBuf;
let storage: Arc<StdRwLock<telemetry_storage::Storage>>;
let enduser_client_version;
{
let cx_locked = cx.read().await;
storage = cx_locked.telemetry.clone();
cache_dir = cx_locked.cache_dir.clone();
enduser_client_version = cx_locked.cmdline.enduser_client_version.clone();
}
let dir = cache_dir.join("telemetry").join("compressed");
tokio::fs::create_dir_all(dir.clone()).await.unwrap_or_else(|_| {});

let records = _compress_telemetry_snippets(storage.clone());
let fn_snip = dir.join(format!("{}-snip.json", now.format("%Y%m%d-%H%M%S")));
let mut big_json_snip = json!({
"records": records,
"ts_end": now.timestamp(),
"teletype": "snippets",
"enduser_client_version": enduser_client_version,
});
{
let mut storage_locked = storage.write().unwrap();
storage_locked.tele_snippets.clear();
big_json_snip.as_object_mut().unwrap().insert("ts_start".to_string(), json!(storage_locked.last_flushed_ts));
storage_locked.last_flushed_ts = now.timestamp();
}
let io_result = file_save(fn_snip, big_json_snip).await;
if io_result.is_err() {
error!("cannot save telemetry file: {}", io_result.err().unwrap());
}
}

50 changes: 32 additions & 18 deletions src/telemetry_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::sync::RwLock as StdRwLock;
use std::path::PathBuf;
use tokio::sync::RwLock as ARwLock;
use serde_json::json;
use tracing::log::Level;
use tracing::log::Level::Error;

use crate::caps::CodeAssistantCaps;
use crate::global_context;
Expand Down Expand Up @@ -87,40 +89,51 @@ pub async fn send_telemetry_files_to_mothership(
dir_compressed: PathBuf,
dir_sent: PathBuf,
telemetry_basic_dest: String,
telemetry_corrected_snippets_dest: String,
api_key: String,
) {
// Send files found in dir_compressed, move to dir_sent if successful.
let files = _sorted_files(dir_compressed.clone()).await;
let http_client = reqwest::Client::new();
for path in files {
async fn send_telemetry_file(
path: &PathBuf,
telemetry_dest: &String,
api_key: &String,
) -> Result<(), String>{
let contents_maybe = _read_file(path.clone()).await;
if contents_maybe.is_err() {
error!("cannot read {}: {}", path.display(), contents_maybe.err().unwrap());
break;
return Err(format!("cannot read {}: {}", path.display(), contents_maybe.err().unwrap()));
}
let contents = contents_maybe.unwrap();
info!("sending telemetry file\n{}\nto url\n{}", path.to_str().unwrap(), telemetry_basic_dest);
let resp_maybe = http_client.post(telemetry_basic_dest.clone())
.body(contents)
.header(reqwest::header::AUTHORIZATION, format!("Bearer {}", api_key))
.header(reqwest::header::CONTENT_TYPE, format!("application/json"))
.send().await;
info!("sending telemetry file\n{}\nto url\n{}", path.to_str().unwrap(), telemetry_dest);
let resp_maybe = reqwest::Client::new().post(telemetry_dest.clone())
.body(contents)
.header(reqwest::header::AUTHORIZATION, format!("Bearer {}", api_key))
.header(reqwest::header::CONTENT_TYPE, format!("application/json"))
.send().await;
if resp_maybe.is_err() {
error!("telemetry send failed: {}\ndest url was\n{}", resp_maybe.err().unwrap(), telemetry_basic_dest);
break;
return Err(format!("telemetry send failed: {}\ndest url was\n{}", resp_maybe.err().unwrap(), telemetry_dest));
}
let resp = resp_maybe.unwrap();
if resp.status()!= reqwest::StatusCode::OK {
error!("telemetry send failed: {}\ndest url was\n{}", resp.status(), telemetry_basic_dest);
break;
return Err(format!("telemetry send failed: {}\ndest url was\n{}", resp.status(), telemetry_dest));
}
let resp_body = resp.text().await.unwrap_or_else(|_| "-empty-".to_string());
info!("telemetry send success, response:\n{}", resp_body);
let resp_json = serde_json::from_str::<serde_json::Value>(&resp_body).unwrap_or_else(|_| json!({}));
let retcode = resp_json["retcode"].as_str().unwrap_or("").to_string();
if retcode != "OK" {
error!("retcode is not OK");
break;
return Err("retcode is not OK".to_string());
}
Ok(())
}

// Send files found in dir_compressed, move to dir_sent if successful.
let files = _sorted_files(dir_compressed.clone()).await;
for path in files {
if path.to_str().unwrap().ends_with("-net.json") {
send_telemetry_file(&path, &telemetry_basic_dest, &api_key).await;
} else if path.to_str().unwrap().ends_with("-snip.json") {
send_telemetry_file(&path, &telemetry_corrected_snippets_dest, &api_key).await;
} else {
continue;
}
let new_path = dir_sent.join(path.file_name().unwrap());
info!("success, moving file to {}", new_path.to_str().unwrap());
Expand Down Expand Up @@ -154,6 +167,7 @@ pub async fn telemetry_full_cycle(
telemetry_basic_dest = caps.unwrap().read().unwrap().telemetry_basic_dest.clone();
}
telemetry_basic::compress_basic_telemetry_to_file(global_context.clone()).await;
telemetry_snippets::compress_telemetry_snippets_to_file(global_context.clone()).await;
let dir_compressed = cache_dir.join("telemetry").join("compressed");
let dir_sent = cache_dir.join("telemetry").join("sent");
if mothership_enabled && !telemetry_basic_dest.is_empty() && !skip_sending_part {
Expand Down

0 comments on commit 24a774d

Please sign in to comment.