Skip to content

Commit

Permalink
chore: Internal cleanup and optimizations (#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Apr 7, 2024
1 parent 171eebc commit 17337c7
Show file tree
Hide file tree
Showing 14 changed files with 625 additions and 1,812 deletions.
283 changes: 154 additions & 129 deletions extensions/warp-ipfs/src/lib.rs

Large diffs are not rendered by default.

60 changes: 28 additions & 32 deletions extensions/warp-ipfs/src/store/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ impl Discovery {

/// Start discovery task
/// Note: This starting will only work across a provided namespace
#[allow(clippy::collapsible_if)]
pub async fn start(&self) -> Result<(), Error> {
match &self.config {
DiscoveryConfig::Namespace {
Expand Down Expand Up @@ -77,20 +76,18 @@ impl Discovery {
.await
.unwrap_or_default()
&& cached.insert(peer_id)
&& !discovery.contains(peer_id).await
{
if !discovery.contains(peer_id).await {
let entry = DiscoveryEntry::new(
&discovery.ipfs,
peer_id,
discovery.config.clone(),
discovery.events.clone(),
discovery.relays.clone(),
)
.await;
if discovery.entries.write().await.insert(entry.clone())
{
entry.start().await;
}
let entry = DiscoveryEntry::new(
&discovery.ipfs,
peer_id,
discovery.config.clone(),
discovery.events.clone(),
discovery.relays.clone(),
)
.await;
if discovery.entries.write().await.insert(entry.clone()) {
entry.start().await;
}
}
}
Expand Down Expand Up @@ -179,25 +176,24 @@ impl Discovery {
.await
.unwrap_or_default()
&& discovery.ipfs.connect(peer_id).await.is_ok()
&& !discovery.contains(peer_id).await
{
if !discovery.contains(peer_id).await {
let entry = DiscoveryEntry::new(
&discovery.ipfs,
peer_id,
discovery.config.clone(),
discovery.events.clone(),
discovery.relays.clone(),
)
.await;

if discovery
.entries
.write()
.await
.insert(entry.clone())
{
entry.start().await;
}
let entry = DiscoveryEntry::new(
&discovery.ipfs,
peer_id,
discovery.config.clone(),
discovery.events.clone(),
discovery.relays.clone(),
)
.await;

if discovery
.entries
.write()
.await
.insert(entry.clone())
{
entry.start().await;
}
}
}
Expand Down
1 change: 0 additions & 1 deletion extensions/warp-ipfs/src/store/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ impl RootDocument {
#[tracing::instrument(skip(self, ipfs))]
pub async fn verify(&self, ipfs: &Ipfs) -> Result<(), Error> {
let identity: IdentityDocument = ipfs
.dag()
.get_dag(self.identity)
.local()
.deserialized()
Expand Down
228 changes: 23 additions & 205 deletions extensions/warp-ipfs/src/store/document/cache.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,23 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};

use futures::{
channel::{
mpsc::{Receiver, Sender},
oneshot::Sender as OneshotSender,
},
stream::{BoxStream, FuturesUnordered},
SinkExt, StreamExt, TryFutureExt,
StreamExt, TryFutureExt,
};
use libipld::Cid;
use rust_ipfs::{Ipfs, IpfsPath};
use tokio::select;
use tokio_util::sync::{CancellationToken, DropGuard};
use tokio::sync::RwLock;
use warp::{crypto::DID, error::Error};

use super::identity::IdentityDocument;

#[allow(clippy::large_enum_variant)]
enum IdentityCacheCommand {
Insert {
document: IdentityDocument,
response: OneshotSender<Result<Option<IdentityDocument>, Error>>,
},
Get {
did: DID,
response: OneshotSender<Result<IdentityDocument, Error>>,
},
Remove {
did: DID,
response: OneshotSender<Result<(), Error>>,
},
List {
response: OneshotSender<BoxStream<'static, IdentityDocument>>,
},
}

#[derive(Debug, Clone)]
pub struct IdentityCache {
tx: Sender<IdentityCacheCommand>,
_task_cancellation: Arc<DropGuard>,
inner: Arc<RwLock<IdentityCacheInner>>,
}

impl IdentityCache {
pub async fn new(ipfs: &Ipfs, path: Option<PathBuf>) -> Self {
pub async fn new(ipfs: &Ipfs, path: Option<&PathBuf>) -> Self {
let list = match path.as_ref() {
Some(path) => tokio::fs::read(path.join(".cache_id_v0"))
.await
Expand All @@ -52,209 +27,52 @@ impl IdentityCache {
None => None,
};

let (tx, rx) = futures::channel::mpsc::channel(0);

let mut task = IdentityCacheTask {
let inner = IdentityCacheInner {
ipfs: ipfs.clone(),
path,
path: path.cloned(),
list,
rx,
};

let token = CancellationToken::new();
let drop_guard = token.clone().drop_guard();
tokio::spawn(async move {
select! {
_ = token.cancelled() => {}
_ = task.run() => {}
}
});

Self {
tx,
_task_cancellation: Arc::new(drop_guard),
inner: Arc::new(RwLock::new(inner)),
}
}

pub async fn insert(
&self,
document: &IdentityDocument,
) -> Result<Option<IdentityDocument>, Error> {
let (tx, rx) = futures::channel::oneshot::channel();

let _ = self
.tx
.clone()
.send(IdentityCacheCommand::Insert {
document: document.clone(),
response: tx,
})
.await;

rx.await.map_err(anyhow::Error::from)?
let inner = &mut *self.inner.write().await;
inner.insert(document).await
}

pub async fn get(&self, did: &DID) -> Result<IdentityDocument, Error> {
let (tx, rx) = futures::channel::oneshot::channel();

let _ = self
.tx
.clone()
.send(IdentityCacheCommand::Get {
did: did.clone(),
response: tx,
})
.await;

rx.await.map_err(anyhow::Error::from)?
let inner = &*self.inner.read().await;
inner.get(did.clone()).await
}

pub async fn remove(&self, did: &DID) -> Result<(), Error> {
let (tx, rx) = futures::channel::oneshot::channel();

let _ = self
.tx
.clone()
.send(IdentityCacheCommand::Remove {
did: did.clone(),
response: tx,
})
.await;

rx.await.map_err(anyhow::Error::from)?
let inner = &mut *self.inner.write().await;
inner.remove(did).await
}

pub async fn list(&self) -> Result<BoxStream<'static, IdentityDocument>, Error> {
let (tx, rx) = futures::channel::oneshot::channel();

let _ = self
.tx
.clone()
.send(IdentityCacheCommand::List { response: tx })
.await;

rx.await.map_err(anyhow::Error::from).map_err(Error::from)
pub async fn list(&self) -> BoxStream<'static, IdentityDocument> {
let inner = &*self.inner.read().await;
inner.list().await
}
}

struct IdentityCacheTask {
#[derive(Debug)]
struct IdentityCacheInner {
pub ipfs: Ipfs,
pub path: Option<PathBuf>,
pub list: Option<Cid>,
rx: Receiver<IdentityCacheCommand>,
}

impl IdentityCacheTask {
pub async fn run(&mut self) {
// migrate old identity to new
self.migrate().await;
// repin map
self.repin_map().await;

while let Some(command) = self.rx.next().await {
match command {
IdentityCacheCommand::Insert { document, response } => {
_ = response.send(self.insert(document).await)
}
IdentityCacheCommand::Get { did, response } => {
let _ = response.send(self.get(did).await);
}
IdentityCacheCommand::Remove { did, response } => {
let _ = response.send(self.remove(did).await);
}
IdentityCacheCommand::List { response } => {
let _ = response.send(self.list().await);
}
}
}
}

async fn migrate(&mut self) {
if self.list.is_some() {
return;
}

let Some(path) = self.path.clone() else {
return;
};

let Some(cid) = tokio::fs::read(path.join(".cache_id"))
.await
.map(|bytes| String::from_utf8_lossy(&bytes).to_string())
.ok()
.and_then(|cid_str| cid_str.parse::<Cid>().ok())
else {
return;
};

let Ok(list) = self
.ipfs
.get_dag(cid)
.local()
.deserialized::<std::collections::HashSet<IdentityDocument>>()
.await
else {
return;
};

for identity in list {
let id = identity.did.clone();
if let Err(e) = self.insert(identity).await {
tracing::warn!(name = "migration", id = %id, "Failed to migrate identity: {e}");
}
}

if self.ipfs.is_pinned(&cid).await.unwrap_or_default() {
_ = self.ipfs.remove_pin(&cid).await;
}

_ = tokio::fs::remove_file(path.join(".cache_id")).await;
}

async fn repin_map(&mut self) {
let cid = match self.list {
Some(cid) => cid,
None => return,
};

let Ok(list) = self
.ipfs
.get_dag(cid)
.local()
.deserialized::<std::collections::HashMap<String, Cid>>()
.await
else {
return;
};

for cid in list.values() {
if self.ipfs.is_pinned(cid).await.unwrap_or_default() {
//We can ignore if its pinned indirectly via a recursive pin root
_ = self.ipfs.remove_pin(cid).await;
}
}

if self.ipfs.is_pinned(&cid).await.unwrap_or_default() {
if self
.ipfs
.list_pins(Some(rust_ipfs::PinMode::Recursive))
.await
.filter_map(|res| async move { res.ok() })
.any(|(root_cid, _)| async move { root_cid == cid })
.await
{
return;
}
if self.ipfs.remove_pin(&cid).await.is_err() {
return;
}
_ = self.ipfs.insert_pin(&cid).recursive().local().await;
}
}

impl IdentityCacheInner {
async fn insert(
&mut self,
document: IdentityDocument,
document: &IdentityDocument,
) -> Result<Option<IdentityDocument>, Error> {
document.verify()?;

Expand Down Expand Up @@ -291,7 +109,7 @@ impl IdentityCacheTask {

match old_document {
Some(old_document) => {
if !old_document.different(&document) {
if !old_document.different(document) {
return Ok(None);
}

Expand Down Expand Up @@ -381,7 +199,7 @@ impl IdentityCacheTask {
Ok(id)
}

async fn remove(&mut self, did: DID) -> Result<(), Error> {
async fn remove(&mut self, did: &DID) -> Result<(), Error> {
let mut list: HashMap<String, Cid> = match self.list {
Some(cid) => self
.ipfs
Expand Down Expand Up @@ -555,7 +373,7 @@ mod test {
let mut rng = rand::thread_rng();
let cache = pregenerated_cache::<10>().await;

let list = cache.list().await?.collect::<Vec<_>>().await;
let list = cache.list().await.collect::<Vec<_>>().await;

let random_doc = list.choose(&mut rng).expect("exist");

Expand Down
Loading

0 comments on commit 17337c7

Please sign in to comment.