Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

analytics #695

Merged
merged 5 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ BEEHIIV_API_KEY=none

ANALYTICS_ALLOWED_ORIGINS='["http://127.0.0.1:3000", "http://localhost:3000", "https://modrinth.com", "https://www.modrinth.com", "*"]'

CLICKHOUSE_URL=http:/localhost:8123
CLICKHOUSE_URL=http://localhost:8123
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=
CLICKHOUSE_DATABASE=staging_ariadne
Expand Down
189 changes: 189 additions & 0 deletions src/clickhouse/fetch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::sync::Arc;

use crate::{
models::ids::{ProjectId, VersionId},
routes::ApiError,
};
use chrono::NaiveDate;
use serde::{Deserialize, Serialize};

#[derive(clickhouse::Row, Serialize, Deserialize, Clone, Debug)]
pub struct ReturnPlaytimes {
pub time: u64,
pub id: u64,
pub total_seconds: u64,
}

#[derive(clickhouse::Row, Serialize, Deserialize, Clone, Debug)]
pub struct ReturnCountry {
pub country: String,
pub id: u64,
pub total_views: u64,
pub total_downloads: u64,
}

#[derive(clickhouse::Row, Serialize, Deserialize, Clone, Debug)]
pub struct ReturnViews {
pub time: u64,
pub id: u64,
pub total_views: u64,
}

// Only one of project_id or version_id should be used
// Fetches playtimes as a Vec of ReturnPlaytimes
pub async fn fetch_playtimes(
projects: Option<Vec<ProjectId>>,
versions: Option<Vec<VersionId>>,
start_date: NaiveDate,
end_date: NaiveDate,
resolution_minute: u32,
client: Arc<clickhouse::Client>,
) -> Result<Vec<ReturnPlaytimes>, ApiError> {
let project_or_version = if projects.is_some() && versions.is_none() {
"project_id"
} else if versions.is_some() {
"version_id"
} else {
return Err(ApiError::InvalidInput(
"Only one of 'project_id' or 'version_id' should be used.".to_string(),
));
};

let mut query = client
.query(&format!(
"
SELECT
toYYYYMMDDhhmmss(toStartOfInterval(recorded, toIntervalMinute(?)) AS time),
{project_or_version},
SUM(seconds) AS total_seconds
FROM playtime
WHERE time >= toDate(?) AND time <= toDate(?)
AND {project_or_version} IN ?
GROUP BY
time,
project_id,
{project_or_version}
"
))
.bind(resolution_minute)
.bind(start_date)
.bind(end_date);

if projects.is_some() {
query = query.bind(projects.unwrap().iter().map(|x| x.0).collect::<Vec<_>>());
} else if versions.is_some() {
query = query.bind(versions.unwrap().iter().map(|x| x.0).collect::<Vec<_>>());
}

Ok(query.fetch_all().await?)
}

// Fetches views as a Vec of ReturnViews
pub async fn fetch_views(
projects: Option<Vec<ProjectId>>,
versions: Option<Vec<VersionId>>,
start_date: NaiveDate,
end_date: NaiveDate,
resolution_minutes: u32,
client: Arc<clickhouse::Client>,
) -> Result<Vec<ReturnViews>, ApiError> {
let project_or_version = if projects.is_some() && versions.is_none() {
"project_id"
} else if versions.is_some() {
"version_id"
} else {
return Err(ApiError::InvalidInput(
"Only one of 'project_id' or 'version_id' should be used.".to_string(),
));
};

let mut query = client
.query(&format!(
"
SELECT
toYYYYMMDDhhmmss((toStartOfInterval(recorded, toIntervalMinute(?)) AS time)),
{project_or_version},
count(id) AS total_views
FROM views
WHERE time >= toDate(?) AND time <= toDate(?)
AND {project_or_version} IN ?
GROUP BY
time,
{project_or_version}
"
))
.bind(resolution_minutes)
.bind(start_date)
.bind(end_date);

if projects.is_some() {
query = query.bind(projects.unwrap().iter().map(|x| x.0).collect::<Vec<_>>());
} else if versions.is_some() {
query = query.bind(versions.unwrap().iter().map(|x| x.0).collect::<Vec<_>>());
}

Ok(query.fetch_all().await?)
}

// Fetches countries as a Vec of ReturnCountry
pub async fn fetch_countries(
projects: Option<Vec<ProjectId>>,
versions: Option<Vec<VersionId>>,
start_date: NaiveDate,
end_date: NaiveDate,
client: Arc<clickhouse::Client>,
) -> Result<Vec<ReturnCountry>, ApiError> {
let project_or_version = if projects.is_some() && versions.is_none() {
"project_id"
} else if versions.is_some() {
"version_id"
} else {
return Err(ApiError::InvalidInput(
"Only one of 'project_id' or 'version_id' should be used.".to_string(),
));
};

let mut query = client.query(&format!(
"
WITH view_grouping AS (
SELECT
country,
{project_or_version},
count(id) AS total_views
FROM views
WHERE toYYYYMMDDhhmmss(recorded) >= toYYYYMMDDhhmmss(toDate(?)) AND toYYYYMMDDhhmmss(recorded) <= toYYYYMMDDhhmmss(toDate(?))
GROUP BY
country,
{project_or_version}
),
download_grouping AS (
SELECT
country,
{project_or_version},
count(id) AS total_downloads
FROM downloads
WHERE toYYYYMMDDhhmmss(recorded) >= toYYYYMMDDhhmmss(toDate(?)) AND toYYYYMMDDhhmmss(recorded) <= toYYYYMMDDhhmmss(toDate(?))
GROUP BY
country,
{project_or_version}
)

SELECT
v.country,
v.{project_or_version},
v.total_views,
d.total_downloads
FROM view_grouping AS v
LEFT JOIN download_grouping AS d ON (v.country = d.country) AND (v.{project_or_version} = d.{project_or_version})
WHERE {project_or_version} IN ?
"
)).bind(start_date).bind(end_date).bind(start_date).bind(end_date);

if projects.is_some() {
query = query.bind(projects.unwrap().iter().map(|x| x.0).collect::<Vec<_>>());
} else if versions.is_some() {
query = query.bind(versions.unwrap().iter().map(|x| x.0).collect::<Vec<_>>());
}

Ok(query.fetch_all().await?)
}
4 changes: 4 additions & 0 deletions src/clickhouse/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use hyper::client::HttpConnector;
use hyper_tls::{native_tls, HttpsConnector};

mod fetch;

pub use fetch::*;

pub async fn init_client() -> clickhouse::error::Result<clickhouse::Client> {
let database = dotenvy::var("CLICKHOUSE_DATABASE").unwrap();

Expand Down
10 changes: 5 additions & 5 deletions src/models/analytics.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use clickhouse::Row;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use std::hash::{Hash, Hasher};
use std::net::Ipv6Addr;
use uuid::Uuid;

#[derive(Row, Serialize, Clone)]
#[derive(Row, Serialize, Deserialize, Clone)]
pub struct Download {
#[serde(with = "uuid::serde::compact")]
pub id: Uuid,
Expand Down Expand Up @@ -41,7 +41,7 @@ impl Hash for Download {
}
}

#[derive(Row, Serialize, Clone)]
#[derive(Row, Serialize, Deserialize, Clone)]
pub struct PageView {
#[serde(with = "uuid::serde::compact")]
pub id: Uuid,
Expand Down Expand Up @@ -76,12 +76,12 @@ impl Hash for PageView {
}
}

#[derive(Row, Serialize, Clone)]
#[derive(Row, Serialize, Deserialize, Clone, Debug)]
pub struct Playtime {
#[serde(with = "uuid::serde::compact")]
pub id: Uuid,
pub recorded: i64,
pub seconds: u16,
pub seconds: u64,

// Modrinth User ID for logged in users (unused atm)
pub user_id: u64,
Expand Down
4 changes: 2 additions & 2 deletions src/models/projects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use serde::{Deserialize, Serialize};
use validator::Validate;

/// The ID of a specific project, encoded as base62 for usage in the API
#[derive(Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Debug)]
#[serde(from = "Base62Id")]
#[serde(into = "Base62Id")]
pub struct ProjectId(pub u64);

/// The ID of a specific version of a project
#[derive(Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
#[derive(Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Hash, Debug)]
#[serde(from = "Base62Id")]
#[serde(into = "Base62Id")]
pub struct VersionId(pub u64);
Expand Down
8 changes: 4 additions & 4 deletions src/routes/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ pub async fn page_view_ingest(
Ok(HttpResponse::NoContent().body(""))
}

#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
pub struct PlaytimeInput {
seconds: u16,
loader: String,
Expand Down Expand Up @@ -205,10 +205,10 @@ pub async fn playtime_ingest(
.add_playtime(Playtime {
id: Default::default(),
recorded: Utc::now().timestamp_nanos() / 100_000,
seconds: playtime.seconds,
seconds: playtime.seconds as u64,
user_id: user.id.0,
project_id: version.inner.id.0 as u64,
version_id: version.inner.project_id.0 as u64,
project_id: version.inner.project_id.0 as u64,
version_id: version.inner.id.0 as u64,
loader: playtime.loader,
game_version: playtime.game_version,
parent: playtime.parent.map(|x| x.0).unwrap_or(0),
Expand Down
Loading
Loading