Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into refactor/discovery-ta…
Browse files Browse the repository at this point in the history
…sks-r0
  • Loading branch information
dariusc93 committed Dec 7, 2024
2 parents ea87613 + 8280796 commit afbc02a
Show file tree
Hide file tree
Showing 6 changed files with 468 additions and 65 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
version = "0.1.0"
edition = "2021"
license = "MIT"
rust-version = "1.67"
rust-version = "1.83"
repository = "https://github.com/Satellite-im/Warp"


Expand Down Expand Up @@ -96,7 +96,7 @@ void = "1"
indexmap = { version = "2.4.0", features = ["serde"] }

# ipfs dependency
rust-ipfs = "0.13.0"
rust-ipfs = "0.14.0"

# wasm crates
wasm-bindgen = "0.2"
Expand Down
6 changes: 6 additions & 0 deletions extensions/warp-ipfs/src/store/community.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ pub struct CommunityDocument {
#[serde(default)]
pub deleted: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub icon: Option<Cid>,
#[serde(skip_serializing_if = "Option::is_none")]
pub banner: Option<Cid>,
#[serde(skip_serializing_if = "Option::is_none")]
pub signature: Option<String>,
}

Expand Down Expand Up @@ -226,6 +230,8 @@ impl CommunityDocument {
permissions,
invites: IndexMap::new(),
deleted: false,
icon: None,
banner: None,
signature: None,
};
document.sign(keypair)?;
Expand Down
244 changes: 217 additions & 27 deletions extensions/warp-ipfs/src/store/message/community_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use indexmap::{IndexMap, IndexSet};
use ipld_core::cid::Cid;
use rust_ipfs::libp2p::gossipsub::Message;
use rust_ipfs::p2p::MultiaddrExt;
use rust_ipfs::Ipfs;
use rust_ipfs::{Ipfs, IpfsPath};
use rust_ipfs::{PeerId, SubscriptionStream};
use serde::{Deserialize, Serialize};
use std::borrow::BorrowMut;
Expand Down Expand Up @@ -47,13 +47,17 @@ use crate::store::community::{
};
use crate::store::conversation::message::MessageDocument;
use crate::store::discovery::Discovery;
use crate::store::document::files::FileDocument;
use crate::store::document::image_dag::ImageDag;
use crate::store::ds_key::DataStoreKey;
use crate::store::event_subscription::EventSubscription;
use crate::store::topics::PeerTopic;
use crate::store::{
CommunityUpdateKind, ConversationEvents, MAX_COMMUNITY_CHANNELS, MAX_COMMUNITY_DESCRIPTION,
CommunityUpdateKind, ConversationEvents, ConversationImageType, MAX_COMMUNITY_CHANNELS,
MAX_COMMUNITY_DESCRIPTION, MAX_CONVERSATION_BANNER_SIZE, MAX_CONVERSATION_ICON_SIZE,
MAX_MESSAGE_SIZE, MAX_REACTIONS, MIN_MESSAGE_SIZE, SHUTTLE_TIMEOUT,
};
use crate::utils::{ByteCollection, ExtensionType};
use crate::{
// rt::LocalExecutor,
store::{
Expand Down Expand Up @@ -702,19 +706,25 @@ impl CommunityTask {
let _ = response.send(result);
}
CommunityTaskCommand::GetCommunityIcon { response } => {
let result = self.get_community_icon().await;
let result = self.get_community_image(ConversationImageType::Icon).await;
let _ = response.send(result);
}
CommunityTaskCommand::GetCommunityBanner { response } => {
let result = self.get_community_banner().await;
let result = self
.get_community_image(ConversationImageType::Banner)
.await;
let _ = response.send(result);
}
CommunityTaskCommand::EditCommunityIcon { response, location } => {
let result = self.edit_community_icon(location).await;
let result = self
.edit_community_image(location, ConversationImageType::Icon)
.await;
let _ = response.send(result);
}
CommunityTaskCommand::EditCommunityBanner { response, location } => {
let result = self.edit_community_banner(location).await;
let result = self
.edit_community_image(location, ConversationImageType::Banner)
.await;
let _ = response.send(result);
}
CommunityTaskCommand::CreateCommunityInvite {
Expand Down Expand Up @@ -1379,32 +1389,194 @@ impl CommunityTask {
)
.await
}
async fn get_community_image(
&self,
image_type: ConversationImageType,
) -> Result<ConversationImage, Error> {
let (cid, max_size) = match image_type {
ConversationImageType::Icon => {
let cid = self.document.icon.ok_or(Error::Other)?;
(cid, MAX_CONVERSATION_ICON_SIZE)
}
ConversationImageType::Banner => {
let cid = self.document.banner.ok_or(Error::Other)?;
(cid, MAX_CONVERSATION_BANNER_SIZE)
}
};

pub async fn get_community_icon(&self) -> Result<ConversationImage, Error> {
Err(Error::Unimplemented)
}
pub async fn get_community_banner(&self) -> Result<ConversationImage, Error> {
Err(Error::Unimplemented)
}
pub async fn edit_community_icon(&mut self, _location: Location) -> Result<(), Error> {
let own_did = &self.identity.did_key();
if !self
.document
.has_permission(own_did, &CommunityPermission::EditIcon)
{
return Err(Error::Unauthorized);
let dag: ImageDag = self.ipfs.get_dag(cid).deserialized().await?;

if dag.size > max_size as _ {
return Err(Error::InvalidLength {
context: "image".into(),
current: dag.size as _,
minimum: None,
maximum: Some(max_size),
});
}
Err(Error::Unimplemented)

let image = self
.ipfs
.cat_unixfs(dag.link)
.max_length(dag.size as _)
.await
.map_err(anyhow::Error::from)?;

let mut img = ConversationImage::default();
img.set_image_type(dag.mime);
img.set_data(image.into());
Ok(img)
}
pub async fn edit_community_banner(&mut self, _location: Location) -> Result<(), Error> {
async fn edit_community_image(
&mut self,
location: Location,
image_type: ConversationImageType,
) -> Result<(), Error> {
let max_size = match image_type {
ConversationImageType::Banner => MAX_CONVERSATION_BANNER_SIZE,
ConversationImageType::Icon => MAX_CONVERSATION_ICON_SIZE,
};
let own_did = &self.identity.did_key();
if !self
.document
.has_permission(own_did, &CommunityPermission::EditBanner)
{
return Err(Error::Unauthorized);
match image_type {
ConversationImageType::Icon => {
if !self
.document
.has_permission(own_did, &CommunityPermission::EditIcon)
{
return Err(Error::Unauthorized);
}
}
ConversationImageType::Banner => {
if !self
.document
.has_permission(own_did, &CommunityPermission::EditBanner)
{
return Err(Error::Unauthorized);
}
}
}
Err(Error::Unimplemented)
let (cid, size, ext) = match location {
Location::Constellation { path } => {
let file = self
.file
.root_directory()
.get_item_by_path(&path)
.and_then(|item| item.get_file())?;

let extension = file.file_type();

if file.size() > max_size {
return Err(Error::InvalidLength {
context: "image".into(),
current: file.size(),
minimum: Some(1),
maximum: Some(max_size),
});
}

let document = FileDocument::new(&self.ipfs, &file).await?;
let cid = document
.reference
.as_ref()
.and_then(|reference| IpfsPath::from_str(reference).ok())
.and_then(|path| path.root().cid().copied())
.ok_or(Error::OtherWithContext("invalid reference".into()))?;

(cid, document.size, extension)
}
Location::Disk { path } => {
#[cfg(target_arch = "wasm32")]
{
_ = path;
unreachable!()
}
#[cfg(not(target_arch = "wasm32"))]
{
use crate::utils::ReaderStream;
use tokio_util::compat::TokioAsyncReadCompatExt;

let extension = path
.extension()
.and_then(std::ffi::OsStr::to_str)
.map(ExtensionType::from)
.unwrap_or(ExtensionType::Other)
.into();

let file = tokio::fs::File::open(path).await?;
let size = file.metadata().await?.len() as _;
let stream =
ReaderStream::from_reader_with_cap(file.compat(), 512, Some(max_size))
.boxed();
let path = self.ipfs.add_unixfs(stream).pin(false).await?;
let cid = path.root().cid().copied().expect("valid cid in path");
(cid, size, extension)
}
}
Location::Stream {
// NOTE: `name` and `size` would not be used here as we are only storing the data. If we are to store in constellation too, we would make use of these fields
name: _,
size: _,
stream,
} => {
let bytes = ByteCollection::new_with_max_capacity(stream, max_size).await?;

let bytes_len = bytes.len();

let path = self.ipfs.add_unixfs(bytes.clone()).pin(false).await?;
let cid = path.root().cid().copied().expect("valid cid in path");

let cursor = std::io::Cursor::new(bytes);

let image = image::ImageReader::new(cursor).with_guessed_format()?;

let format = image
.format()
.and_then(|format| ExtensionType::try_from(format).ok())
.unwrap_or(ExtensionType::Other)
.into();

(cid, bytes_len, format)
}
};

let dag = ImageDag {
link: cid,
size: size as _,
mime: ext,
};

let cid = self.ipfs.put_dag(dag).await?;

let kind = match image_type {
ConversationImageType::Icon => {
self.document.icon.replace(cid);
CommunityUpdateKind::EditIcon
}
ConversationImageType::Banner => {
self.document.banner.replace(cid);
CommunityUpdateKind::EditBanner
}
};

self.set_document().await?;

let event = CommunityMessagingEvents::UpdateCommunity {
community: self.document.clone(),
kind,
};

let message_event = match image_type {
ConversationImageType::Icon => MessageEventKind::EditedCommunityIcon {
community_id: self.community_id,
},
ConversationImageType::Banner => MessageEventKind::EditedCommunityBanner {
community_id: self.community_id,
},
};

let _ = self.event_broadcast.send(message_event);

self.publish(None, event, true, vec![]).await
}

pub async fn create_community_invite(
Expand Down Expand Up @@ -4159,6 +4331,24 @@ async fn message_event(
tracing::warn!(%community_id, error = %e, "Error broadcasting event");
}
}
CommunityUpdateKind::EditIcon => {
this.replace_document(community).await?;
if let Err(e) = this
.event_broadcast
.send(MessageEventKind::EditedCommunityIcon { community_id })
{
tracing::warn!(%community_id, error = %e, "Error broadcasting event");
}
}
CommunityUpdateKind::EditBanner => {
this.replace_document(community).await?;
if let Err(e) = this
.event_broadcast
.send(MessageEventKind::EditedCommunityBanner { community_id })
{
tracing::warn!(%community_id, error = %e, "Error broadcasting event");
}
}
CommunityUpdateKind::GrantCommunityPermission {
permission,
role_id,
Expand Down
2 changes: 2 additions & 0 deletions extensions/warp-ipfs/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ pub enum CommunityUpdateKind {
EditCommunityDescription {
description: Option<String>,
},
EditIcon,
EditBanner,
GrantCommunityPermission {
permission: CommunityPermission,
role_id: RoleId,
Expand Down
Loading

0 comments on commit afbc02a

Please sign in to comment.