Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
Fix version creation taking forever (#878)
Browse files Browse the repository at this point in the history
* Fix version creation taking forever

* run fmt + prep

* fix tests?
  • Loading branch information
Geometrically authored Feb 5, 2024
1 parent ce3b024 commit 33b2a94
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 140 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 39 additions & 1 deletion src/database/models/ids.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::DatabaseError;
use crate::models::ids::base62_impl::to_base62;
use crate::models::ids::random_base62_rng;
use crate::models::ids::{random_base62_rng, random_base62_rng_range};
use censor::Censor;
use serde::{Deserialize, Serialize};
use sqlx::sqlx_macros::Type;
Expand Down Expand Up @@ -41,6 +41,37 @@ macro_rules! generate_ids {
};
}

macro_rules! generate_bulk_ids {
($vis:vis $function_name:ident, $return_type:ty, $select_stmnt:literal, $id_function:expr) => {
$vis async fn $function_name(
count: usize,
con: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<Vec<$return_type>, DatabaseError> {
let mut rng = rand::thread_rng();
let mut retry_count = 0;

// Check if ID is unique
loop {
let base = random_base62_rng_range(&mut rng, 1, 10) as i64;
let ids = (0..count).map(|x| base + x as i64).collect::<Vec<_>>();

let results = sqlx::query!($select_stmnt, &ids)
.fetch_one(&mut **con)
.await?;

if !results.exists.unwrap_or(true) {
return Ok(ids.into_iter().map(|x| $id_function(x)).collect());
}

retry_count += 1;
if retry_count > ID_RETRY_COUNT {
return Err(DatabaseError::RandomId);
}
}
}
};
}

generate_ids!(
pub generate_project_id,
ProjectId,
Expand Down Expand Up @@ -121,6 +152,13 @@ generate_ids!(
NotificationId
);

generate_bulk_ids!(
pub generate_many_notification_ids,
NotificationId,
"SELECT EXISTS(SELECT 1 FROM notifications WHERE id = ANY($1))",
NotificationId
);

generate_ids!(
pub generate_thread_id,
ThreadId,
Expand Down
52 changes: 15 additions & 37 deletions src/database/models/notification_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::database::{models::DatabaseError, redis::RedisPool};
use crate::models::notifications::NotificationBody;
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

const USER_NOTIFICATIONS_NAMESPACE: &str = "user_notifications";
Expand Down Expand Up @@ -46,60 +45,39 @@ impl NotificationBuilder {
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
redis: &RedisPool,
) -> Result<(), DatabaseError> {
let mut notifications = Vec::new();
for user in users {
let id = generate_notification_id(&mut *transaction).await?;

notifications.push(Notification {
id,
user_id: user,
body: self.body.clone(),
read: false,
created: Utc::now(),
});
}

Notification::insert_many(&notifications, transaction, redis).await?;

Ok(())
}
}
let notification_ids =
generate_many_notification_ids(users.len(), &mut *transaction).await?;

impl Notification {
pub async fn insert_many(
notifications: &[Notification],
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
redis: &RedisPool,
) -> Result<(), DatabaseError> {
let notification_ids = notifications.iter().map(|n| n.id.0).collect_vec();
let user_ids = notifications.iter().map(|n| n.user_id.0).collect_vec();
let bodies = notifications
let body = serde_json::value::to_value(&self.body)?;
let bodies = notification_ids
.iter()
.map(|n| Ok(serde_json::value::to_value(n.body.clone())?))
.collect::<Result<Vec<_>, DatabaseError>>()?;
.map(|_| body.clone())
.collect::<Vec<_>>();

sqlx::query!(
"
INSERT INTO notifications (
id, user_id, body
)
SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::jsonb[])
",
&notification_ids[..],
&user_ids[..],
&notification_ids
.into_iter()
.map(|x| x.0)
.collect::<Vec<_>>()[..],
&users.iter().map(|x| x.0).collect::<Vec<_>>()[..],
&bodies[..],
)
.execute(&mut **transaction)
.await?;

Notification::clear_user_notifications_cache(
notifications.iter().map(|n| &n.user_id),
redis,
)
.await?;
Notification::clear_user_notifications_cache(&users, redis).await?;

Ok(())
}
}

impl Notification {
pub async fn get<'a, 'b, E>(
id: NotificationId,
executor: E,
Expand Down
8 changes: 6 additions & 2 deletions src/models/v3/ids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ pub fn random_base62(n: usize) -> u64 {
/// This method panics if `n` is 0 or greater than 11, since a `u64`
/// can only represent up to 11 character base62 strings
pub fn random_base62_rng<R: rand::RngCore>(rng: &mut R, n: usize) -> u64 {
random_base62_rng_range(rng, n, n)
}

pub fn random_base62_rng_range<R: rand::RngCore>(rng: &mut R, n_min: usize, n_max: usize) -> u64 {
use rand::Rng;
assert!(n > 0 && n <= 11);
assert!(n_min > 0 && n_max <= 11 && n_min <= n_max);
// gen_range is [low, high): max value is `MULTIPLES[n] - 1`,
// which is n characters long when encoded
rng.gen_range(MULTIPLES[n - 1]..MULTIPLES[n])
rng.gen_range(MULTIPLES[n_min - 1]..MULTIPLES[n_max])
}

const MULTIPLES: [u64; 12] = [
Expand Down
100 changes: 0 additions & 100 deletions src/ratelimit/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,103 +141,3 @@ impl Handler<ActorMessage> for MemoryStoreActor {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[actix_rt::test]
async fn test_set() {
let store = MemoryStore::new();
let addr = MemoryStoreActor::from(store.clone()).start();
let res = addr
.send(ActorMessage::Set {
key: "hello".to_string(),
value: 30usize,
expiry: Duration::from_secs(5),
})
.await;
let res = res.expect("Failed to send msg");
match res {
ActorResponse::Set(c) => match c.await {
Ok(()) => {}
Err(e) => panic!("Shouldn't happen {}", &e),
},
_ => panic!("Shouldn't happen!"),
}
}

#[actix_rt::test]
async fn test_get() {
let store = MemoryStore::new();
let addr = MemoryStoreActor::from(store.clone()).start();
let expiry = Duration::from_secs(5);
let res = addr
.send(ActorMessage::Set {
key: "hello".to_string(),
value: 30usize,
expiry,
})
.await;
let res = res.expect("Failed to send msg");
match res {
ActorResponse::Set(c) => match c.await {
Ok(()) => {}
Err(e) => panic!("Shouldn't happen {}", &e),
},
_ => panic!("Shouldn't happen!"),
}
let res2 = addr.send(ActorMessage::Get("hello".to_string())).await;
let res2 = res2.expect("Failed to send msg");
match res2 {
ActorResponse::Get(c) => match c.await {
Ok(d) => {
let d = d.unwrap();
assert_eq!(d, 30usize);
}
Err(e) => panic!("Shouldn't happen {}", &e),
},
_ => panic!("Shouldn't happen!"),
};
}

#[actix_rt::test]
async fn test_expiry() {
let store = MemoryStore::new();
let addr = MemoryStoreActor::from(store.clone()).start();
let expiry = Duration::from_secs(3);
let res = addr
.send(ActorMessage::Set {
key: "hello".to_string(),
value: 30usize,
expiry,
})
.await;
let res = res.expect("Failed to send msg");
match res {
ActorResponse::Set(c) => match c.await {
Ok(()) => {}
Err(e) => panic!("Shouldn't happen {}", &e),
},
_ => panic!("Shouldn't happen!"),
}
assert!(addr.connected());

let res3 = addr.send(ActorMessage::Expire("hello".to_string())).await;
let res3 = res3.expect("Failed to send msg");
match res3 {
ActorResponse::Expire(c) => match c.await {
Ok(dur) => {
let now = Duration::from_secs(3);
if dur > now || dur > now + Duration::from_secs(4) {
panic!("Expiry is invalid!");
}
}
Err(e) => {
panic!("Shouldn't happen: {}", &e);
}
},
_ => panic!("Shouldn't happen!"),
};
}
}

0 comments on commit 33b2a94

Please sign in to comment.