Skip to content

Commit

Permalink
Merge branch 'main' into refactor/discovery-tasks-r0
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Dec 4, 2024
2 parents 82fce4f + c08b8bf commit 5f8539a
Show file tree
Hide file tree
Showing 7 changed files with 735 additions and 89 deletions.
22 changes: 5 additions & 17 deletions extensions/warp-ipfs/src/store/event_subscription.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::rt::{Executor, LocalExecutor};
use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor};
use futures::{
channel::{
mpsc::{channel, Receiver, Sender},
Expand All @@ -7,11 +7,9 @@ use futures::{
stream::BoxStream,
SinkExt, StreamExt,
};
use std::collections::VecDeque;
use std::fmt::Debug;
use std::task::{Poll, Waker};
use std::{collections::VecDeque, sync::Arc};
use tokio::select;
use tokio_util::sync::{CancellationToken, DropGuard};
use warp::error::Error;

#[allow(clippy::large_enum_variant)]
Expand All @@ -27,7 +25,7 @@ enum Command<T: Clone + Debug + Send + 'static> {
#[derive(Clone, Debug)]
pub struct EventSubscription<T: Clone + Debug + Send + 'static> {
tx: Sender<Command<T>>,
_task_cancellation: Arc<DropGuard>,
_handle: AbortableJoinHandle<()>,
}

impl<T: Clone + Debug + Send + 'static> EventSubscription<T> {
Expand All @@ -43,19 +41,9 @@ impl<T: Clone + Debug + Send + 'static> EventSubscription<T> {
rx,
};

let token = CancellationToken::new();
let drop_guard = token.clone().drop_guard();
executor.dispatch(async move {
select! {
_ = token.cancelled() => {}
_ = task.run() => {}
}
});
let _handle = executor.spawn_abortable(async move { task.run().await });

Self {
tx,
_task_cancellation: Arc::new(drop_guard),
}
Self { tx, _handle }
}

pub async fn subscribe<'a>(&self) -> Result<BoxStream<'a, T>, Error> {
Expand Down
18 changes: 5 additions & 13 deletions extensions/warp-ipfs/src/store/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use futures_finally::try_stream::FinallyTryStreamExt;

use rust_ipfs::{unixfs::UnixfsStatus, Ipfs, IpfsPath};

use tokio_util::sync::{CancellationToken, DropGuard};
use tracing::{Instrument, Span};
use warp::{
constellation::{
Expand All @@ -31,7 +30,7 @@ use super::{
document::root::RootDocumentMap, event_subscription::EventSubscription,
MAX_THUMBNAIL_STREAM_SIZE,
};
use crate::rt::{Executor, LocalExecutor};
use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor};
use crate::{
config::{self, Config},
thumbnail::ThumbnailGenerator,
Expand All @@ -44,7 +43,7 @@ pub struct FileStore {
path: Arc<RwLock<PathBuf>>,
config: config::Config,
command_sender: mpsc::Sender<FileTaskCommand>,
_guard: Arc<DropGuard>,
_handle: AbortableJoinHandle<()>,
}

impl FileStore {
Expand All @@ -55,6 +54,7 @@ impl FileStore {
constellation_tx: EventSubscription<ConstellationEventKind>,
span: &Span,
) -> Self {
let executor = LocalExecutor;
let config = config.clone();

let index = Directory::new("root");
Expand Down Expand Up @@ -92,24 +92,16 @@ impl FileStore {
let signal = Some(task.signal_tx.clone());
index.rebuild_paths(&signal);

let token = CancellationToken::new();
let _guard = Arc::new(token.clone().drop_guard());

let span = span.clone();

LocalExecutor.dispatch(async move {
tokio::select! {
_ = task.run().instrument(span) => {}
_ = token.cancelled() => {}
}
});
let _handle = executor.spawn_abortable(async move { task.run().await }.instrument(span));

FileStore {
index,
config,
path,
command_sender,
_guard,
_handle,
}
}
}
Expand Down
20 changes: 3 additions & 17 deletions extensions/warp-ipfs/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use ipld_core::cid::Cid;
use rust_ipfs::{Ipfs, PeerId};

use serde::{Deserialize, Serialize};
use tokio_util::sync::{CancellationToken, DropGuard};
use uuid::Uuid;

use super::{document::root::RootDocumentMap, ds_key::DataStoreKey, PeerIdExt};
Expand Down Expand Up @@ -79,7 +78,7 @@ pub type DownloadStream = BoxStream<'static, Result<Bytes, std::io::Error>>;
#[derive(Clone)]
pub struct MessageStore {
inner: Arc<tokio::sync::RwLock<ConversationInner>>,
_task_cancellation: Arc<DropGuard>,
_handle: AbortableJoinHandle<()>,
}

impl MessageStore {
Expand All @@ -94,9 +93,6 @@ impl MessageStore {
let executor = LocalExecutor;
tracing::info!("Initializing MessageStore");

let token = CancellationToken::new();
let drop_guard = token.clone().drop_guard();

let root = identity.root_document().clone();

let mut inner = ConversationInner {
Expand Down Expand Up @@ -127,19 +123,9 @@ impl MessageStore {
identity: identity.clone(),
};

executor.dispatch({
async move {
tokio::select! {
_ = token.cancelled() => {}
_ = task.run() => {}
}
}
});
let _handle = executor.spawn_abortable(task.run());

Self {
inner,
_task_cancellation: Arc::new(drop_guard),
}
Self { inner, _handle }
}
}

Expand Down
145 changes: 125 additions & 20 deletions extensions/warp-ipfs/src/store/message/community_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use warp::raygun::community::{
};
use warp::raygun::{
AttachmentEventStream, ConversationImage, Location, MessageEvent, MessageOptions,
MessageReference, MessageStatus, Messages, MessagesType, PinState, RayGunEventKind,
ReactionState,
MessageReference, MessageStatus, MessageType, Messages, MessagesType, PinState,
RayGunEventKind, ReactionState,
};
use warp::{
crypto::{cipher::Cipher, generate},
Expand Down Expand Up @@ -68,6 +68,8 @@ use crate::{
},
};

use super::attachment::AttachmentStream;

type AttachmentOneshot = (MessageDocument, oneshot::Sender<Result<(), Error>>);

#[allow(dead_code)]
Expand Down Expand Up @@ -321,7 +323,7 @@ pub struct CommunityTask {
community_id: Uuid,
ipfs: Ipfs,
root: RootDocumentMap,
_file: FileStore,
file: FileStore,
identity: IdentityStore,
discovery: Discovery,
pending_key_exchange: IndexMap<DID, Vec<(Vec<u8>, bool)>>,
Expand All @@ -332,7 +334,7 @@ pub struct CommunityTask {
event_stream: SubscriptionStream,
request_stream: SubscriptionStream,

_attachment_tx: futures::channel::mpsc::Sender<AttachmentOneshot>,
attachment_tx: futures::channel::mpsc::Sender<AttachmentOneshot>,
attachment_rx: futures::channel::mpsc::Receiver<AttachmentOneshot>,
message_command: futures::channel::mpsc::Sender<MessageCommand>,
event_broadcast: tokio::sync::broadcast::Sender<MessageEventKind>,
Expand Down Expand Up @@ -403,7 +405,7 @@ impl CommunityTask {
community_id,
ipfs: ipfs.clone(),
root: root.clone(),
_file: file.clone(),
file: file.clone(),
identity: identity.clone(),
discovery: discovery.clone(),
pending_key_exchange: Default::default(),
Expand All @@ -414,7 +416,7 @@ impl CommunityTask {
request_stream,
event_stream,

_attachment_tx: atx,
attachment_tx: atx,
attachment_rx: arx,
event_broadcast: btx,
_event_subscription,
Expand Down Expand Up @@ -3304,29 +3306,132 @@ impl CommunityTask {
}
pub async fn attach_to_community_channel_message(
&mut self,
_channel_id: Uuid,
_message_id: Option<Uuid>,
_locations: Vec<Location>,
_message: Vec<String>,
channel_id: Uuid,
message_id: Option<Uuid>,
locations: Vec<Location>,
messages: Vec<String>,
) -> Result<(Uuid, AttachmentEventStream), Error> {
Err(Error::Unimplemented)
let own_did = &self.identity.did_key();
if !self.document.has_channel_permission(
own_did,
&CommunityChannelPermission::SendMessages,
channel_id,
) {
return Err(Error::Unauthorized);
}
if !self.document.has_channel_permission(
own_did,
&CommunityChannelPermission::SendAttachments,
channel_id,
) {
return Err(Error::Unauthorized);
}

let keystore = pubkey_or_keystore(&*self)?;

let stream = AttachmentStream::new(
&self.ipfs,
self.root.keypair(),
&self.identity.did_key(),
&self.file,
channel_id,
keystore,
self.attachment_tx.clone(),
)
.set_reply(message_id)
.set_locations(locations)?
.set_lines(messages)?;

let message_id = stream.message_id();

Ok((message_id, stream.boxed()))
}
pub async fn download_from_community_channel_message(
&self,
_channel_id: Uuid,
_message_id: Uuid,
_file: String,
_path: PathBuf,
channel_id: Uuid,
message_id: Uuid,
file: String,
path: PathBuf,
) -> Result<ConstellationProgressStream, Error> {
Err(Error::Unimplemented)
let own_did = &self.identity.did_key();
if !self.document.has_channel_permission(
own_did,
&CommunityChannelPermission::ViewChannel,
channel_id,
) {
return Err(Error::Unauthorized);
}

let channel = match self.document.channels.get(&channel_id.to_string()) {
Some(c) => c,
None => return Err(Error::CommunityChannelDoesntExist),
};

let members = self
.document
.participants()
.iter()
.filter_map(|did| did.to_peer_id().ok())
.collect::<Vec<_>>();

let message = channel.get_message_document(&self.ipfs, message_id).await?;

if message.message_type != MessageType::Attachment {
return Err(Error::InvalidMessage);
}

let attachment = message
.attachments()
.iter()
.find(|attachment| attachment.name == file)
.ok_or(Error::FileNotFound)?;

let stream = attachment.download(&self.ipfs, path, &members, None);

Ok(stream)
}
pub async fn download_stream_from_community_channel_message(
&self,
_channel_id: Uuid,
_message_id: Uuid,
_file: String,
channel_id: Uuid,
message_id: Uuid,
file: String,
) -> Result<BoxStream<'static, Result<Bytes, std::io::Error>>, Error> {
Err(Error::Unimplemented)
let own_did = &self.identity.did_key();
if !self.document.has_channel_permission(
own_did,
&CommunityChannelPermission::ViewChannel,
channel_id,
) {
return Err(Error::Unauthorized);
}

let channel = match self.document.channels.get(&channel_id.to_string()) {
Some(c) => c,
None => return Err(Error::CommunityChannelDoesntExist),
};

let members = self
.document
.participants()
.iter()
.filter_map(|did| did.to_peer_id().ok())
.collect::<Vec<_>>();

let message = channel.get_message_document(&self.ipfs, message_id).await?;

if message.message_type != MessageType::Attachment {
return Err(Error::InvalidMessage);
}

let attachment = message
.attachments()
.iter()
.find(|attachment| attachment.name == file)
.ok_or(Error::FileNotFound)?;

let stream = attachment.download_stream(&self.ipfs, &members, None);

Ok(stream)
}

async fn store_direct_for_attachment(&mut self, message: MessageDocument) -> Result<(), Error> {
Expand Down
Loading

0 comments on commit 5f8539a

Please sign in to comment.