Skip to content

Commit

Permalink
Merge branch 'main' into feat/shuttle-gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Oct 22, 2023
2 parents f2411b8 + 4247e1b commit 34c278a
Show file tree
Hide file tree
Showing 15 changed files with 996 additions and 308 deletions.
8 changes: 4 additions & 4 deletions extensions/warp-ipfs/examples/identity-interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ async fn main() -> anyhow::Result<()> {
}
};

if let Err(e) = account.update_identity(IdentityUpdate::Picture(picture.to_string())).await {
if let Err(e) = account.update_identity(IdentityUpdate::Picture(picture.as_bytes().to_vec())).await {
writeln!(stdout, "Error updating picture: {e}")?;
continue;
}
Expand All @@ -705,7 +705,7 @@ async fn main() -> anyhow::Result<()> {
}
};

if let Err(e) = account.update_identity(IdentityUpdate::Banner(banner.to_string())).await {
if let Err(e) = account.update_identity(IdentityUpdate::Banner(banner.as_bytes().to_vec())).await {
writeln!(stdout, "Error updating banner: {e}")?;
continue;
}
Expand Down Expand Up @@ -780,8 +780,8 @@ async fn main() -> anyhow::Result<()> {
created.to_string(),
modified.to_string(),
identity.status_message().unwrap_or_default(),
(!profile_banner.is_empty()).to_string(),
(!profile_picture.is_empty()).to_string(),
(!profile_banner.data().is_empty()).to_string(),
(!profile_picture.data().is_empty()).to_string(),
platform.to_string(),
format!("{status:?}"),
]);
Expand Down
6 changes: 4 additions & 2 deletions extensions/warp-ipfs/examples/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,17 @@ async fn main() -> anyhow::Result<()> {
}
Some("/list-conversations") => {
let mut table = Table::new();
table.set_header(vec!["Name", "ID", "Recipients"]);
table.set_header(vec!["Name", "ID", "Created", "Updated", "Recipients"]);
let list = chat.list_conversations().await?;
for convo in list.iter() {
let mut recipients = vec![];
for recipient in convo.recipients() {
let username = get_username(new_account.clone(), recipient.clone()).await.unwrap_or_else(|_| recipient.to_string());
recipients.push(username);
}
table.add_row(vec![convo.name().unwrap_or_default(), convo.id().to_string(), recipients.join(",").to_string()]);
let created = convo.created();
let modified = convo.modified();
table.add_row(vec![convo.name().unwrap_or_default(), convo.id().to_string(), created.to_string(), modified.to_string(), recipients.join(",").to_string()]);
}
writeln!(stdout, "{table}")?;
},
Expand Down
4 changes: 2 additions & 2 deletions extensions/warp-ipfs/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
str::FromStr,
time::Duration,
};
use warp::multipass::identity::Identity;
use warp::{multipass::identity::Identity, constellation::file::FileType};

#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
Expand Down Expand Up @@ -250,7 +250,7 @@ pub enum UpdateEvents {
}

pub type DefaultPfpFn =
std::sync::Arc<dyn Fn(&Identity) -> Result<Vec<u8>, std::io::Error> + Send + Sync + 'static>;
std::sync::Arc<dyn Fn(&Identity) -> Result<(Vec<u8>, FileType), std::io::Error> + Send + Sync + 'static>;

#[derive(Default, Clone, Serialize, Deserialize)]
pub enum StoreOffline {
Expand Down
110 changes: 81 additions & 29 deletions extensions/warp-ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use ipfs::{DhtMode, Ipfs, Keypair, PeerId, Protocol, UninitializedIpfs};
use warp::crypto::{KeyMaterial, DID};
use warp::error::Error;
use warp::multipass::identity::{
Identifier, Identity, IdentityProfile, IdentityUpdate, Relationship,
Identifier, Identity, IdentityImage, IdentityProfile, IdentityUpdate, Relationship,
};
use warp::multipass::{
identity, Friends, FriendsEvent, IdentityImportOption, IdentityInformation, ImportLocation,
Expand Down Expand Up @@ -718,15 +718,31 @@ impl MultiPass for WarpIpfs {

trace!("image size = {}", len);

let cid = store
.store_photo(
futures::stream::iter(Ok::<_, std::io::Error>(Ok(serde_json::to_vec(
&data,
)?)))
.boxed(),
Some(2 * 1024 * 1024),
)
.await?;
let (data, format) = tokio::task::spawn_blocking(move || {
let cursor = std::io::Cursor::new(data);

let image = image::io::Reader::new(cursor).with_guessed_format()?;

let format = image
.format()
.and_then(|format| ExtensionType::try_from(format).ok())
.unwrap_or(ExtensionType::Other);

let inner = image.into_inner();

let data = inner.into_inner();
Ok::<_, Error>((data, format))
})
.await
.map_err(anyhow::Error::from)??;

let cid = store::document::image_dag::store_photo(
&self.ipfs()?,
futures::stream::iter(Ok::<_, std::io::Error>(Ok(data))).boxed(),
format.into(),
Some(2 * 1024 * 1024),
)
.await?;

debug!("Image cid: {cid}");

Expand All @@ -753,7 +769,7 @@ impl MultiPass for WarpIpfs {
)));
}

let file = tokio::fs::File::open(path).await?;
let file = tokio::fs::File::open(&path).await?;

let metadata = file.metadata().await?;

Expand All @@ -768,6 +784,12 @@ impl MultiPass for WarpIpfs {
});
}

let extension = path
.extension()
.and_then(OsStr::to_str)
.map(ExtensionType::from)
.unwrap_or(ExtensionType::Other);

trace!("image size = {}", len);

let stream = async_stream::stream! {
Expand All @@ -789,9 +811,13 @@ impl MultiPass for WarpIpfs {
}
};

let cid = store
.store_photo(stream.boxed(), Some(2 * 1024 * 1024))
.await?;
let cid = store::document::image_dag::store_photo(
&self.ipfs()?,
stream.boxed(),
extension.into(),
Some(2 * 1024 * 1024),
)
.await?;

debug!("Image cid: {cid}");

Expand Down Expand Up @@ -831,15 +857,31 @@ impl MultiPass for WarpIpfs {

trace!("image size = {}", len);

let cid = store
.store_photo(
futures::stream::iter(Ok::<_, std::io::Error>(Ok(serde_json::to_vec(
&data,
)?)))
.boxed(),
Some(2 * 1024 * 1024),
)
.await?;
let (data, format) = tokio::task::spawn_blocking(move || {
let cursor = std::io::Cursor::new(data);

let image = image::io::Reader::new(cursor).with_guessed_format()?;

let format = image
.format()
.and_then(|format| ExtensionType::try_from(format).ok())
.unwrap_or(ExtensionType::Other);

let inner = image.into_inner();

let data = inner.into_inner();
Ok::<_, Error>((data, format))
})
.await
.map_err(anyhow::Error::from)??;

let cid = store::document::image_dag::store_photo(
&self.ipfs()?,
futures::stream::iter(Ok::<_, std::io::Error>(Ok(data))).boxed(),
format.into(),
Some(2 * 1024 * 1024),
)
.await?;

debug!("Image cid: {cid}");

Expand All @@ -866,7 +908,7 @@ impl MultiPass for WarpIpfs {
)));
}

let file = tokio::fs::File::open(path).await?;
let file = tokio::fs::File::open(&path).await?;

let metadata = file.metadata().await?;

Expand All @@ -881,6 +923,12 @@ impl MultiPass for WarpIpfs {
});
}

let extension = path
.extension()
.and_then(OsStr::to_str)
.map(ExtensionType::from)
.unwrap_or(ExtensionType::Other);

trace!("image size = {}", len);

let stream = async_stream::stream! {
Expand All @@ -902,9 +950,13 @@ impl MultiPass for WarpIpfs {
}
};

let cid = store
.store_photo(stream.boxed(), Some(2 * 1024 * 1024))
.await?;
let cid = store::document::image_dag::store_photo(
&self.ipfs()?,
stream.boxed(),
extension.into(),
Some(2 * 1024 * 1024),
)
.await?;

debug!("Image cid: {cid}");

Expand Down Expand Up @@ -1205,12 +1257,12 @@ impl FriendsEvent for WarpIpfs {

#[async_trait::async_trait]
impl IdentityInformation for WarpIpfs {
async fn identity_picture(&self, did: &DID) -> Result<String, Error> {
async fn identity_picture(&self, did: &DID) -> Result<IdentityImage, Error> {
let store = self.identity_store(true).await?;
store.identity_picture(did).await
}

async fn identity_banner(&self, did: &DID) -> Result<String, Error> {
async fn identity_banner(&self, did: &DID) -> Result<IdentityImage, Error> {
let store = self.identity_store(true).await?;
store.identity_banner(did).await
}
Expand Down
28 changes: 20 additions & 8 deletions extensions/warp-ipfs/src/store/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct ConversationDocument {
pub name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub creator: Option<DID>,
pub created: DateTime<Utc>,
pub modified: DateTime<Utc>,
pub conversation_type: ConversationType,
pub recipients: Vec<DID>,
pub excluded: HashMap<DID, String>,
Expand All @@ -57,7 +59,8 @@ impl From<&Conversation> for ConversationDocument {
id: conversation.id(),
name: conversation.name(),
creator: conversation.creator(),

created: conversation.created(),
modified: conversation.modified(),
conversation_type: conversation.conversation_type(),
recipients: conversation.recipients(),
excluded: Default::default(),
Expand Down Expand Up @@ -130,12 +133,15 @@ impl ConversationDocument {
}

impl ConversationDocument {
#[allow(clippy::too_many_arguments)]
pub fn new(
did: &DID,
name: Option<String>,
mut recipients: Vec<DID>,
id: Option<Uuid>,
conversation_type: ConversationType,
created: Option<DateTime<Utc>>,
modified: Option<DateTime<Utc>>,
creator: Option<DID>,
signature: Option<String>,
) -> Result<Self, Error> {
Expand All @@ -152,11 +158,16 @@ impl ConversationDocument {
let messages = None;
let excluded = Default::default();

let created = created.unwrap_or(Utc::now());
let modified = modified.unwrap_or(created);

let mut document = Self {
id,
name,
recipients,
creator,
created,
modified,
conversation_type,
excluded,
messages,
Expand Down Expand Up @@ -196,6 +207,8 @@ impl ConversationDocument {
ConversationType::Direct,
None,
None,
None,
None,
)
}

Expand All @@ -207,6 +220,8 @@ impl ConversationDocument {
recipients.to_vec(),
conversation_id,
ConversationType::Group,
None,
None,
Some(did.clone()),
None,
)
Expand Down Expand Up @@ -301,6 +316,7 @@ impl ConversationDocument {
ipfs: &Ipfs,
list: BTreeSet<MessageDocument>,
) -> Result<(), Error> {
self.modified = Utc::now();
let cid = list.to_cid(ipfs).await?;
self.messages = Some(cid);
Ok(())
Expand Down Expand Up @@ -534,13 +550,7 @@ impl ConversationDocument {

impl From<ConversationDocument> for Conversation {
fn from(document: ConversationDocument) -> Self {
let mut conversation = Conversation::default();
conversation.set_id(document.id);
conversation.set_name(document.name());
conversation.set_creator(document.creator.clone());
conversation.set_conversation_type(document.conversation_type);
conversation.set_recipients(document.recipients());
conversation
Conversation::from(&document)
}
}

Expand All @@ -552,6 +562,8 @@ impl From<&ConversationDocument> for Conversation {
conversation.set_creator(document.creator.clone());
conversation.set_conversation_type(document.conversation_type);
conversation.set_recipients(document.recipients());
conversation.set_created(document.created);
conversation.set_modified(document.modified);
conversation
}
}
Expand Down
1 change: 1 addition & 0 deletions extensions/warp-ipfs/src/store/document.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod cache;
pub mod conversation;
pub mod identity;
pub mod image_dag;
pub mod root;
pub mod utils;

Expand Down
5 changes: 3 additions & 2 deletions extensions/warp-ipfs/src/store/document/identity.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::{DateTime, Utc};
use futures::{StreamExt, TryStreamExt};
use libipld::Cid;
use rust_ipfs::{Ipfs, IpfsPath};
use rust_ipfs::{Ipfs, IpfsPath, PeerId};
use serde::{Deserialize, Serialize};
use std::{hash::Hash, time::Duration};
use warp::{
Expand Down Expand Up @@ -232,14 +232,15 @@ pub async fn unixfs_fetch(
ipfs: &Ipfs,
cid: Cid,
timeout: Option<Duration>,
peers: &[PeerId],
local: bool,
limit: Option<usize>,
) -> Result<Vec<u8>, Error> {
let timeout = timeout.or(Some(std::time::Duration::from_secs(15)));

let mut stream = ipfs
.unixfs()
.cat(IpfsPath::from(cid), None, &[], local, timeout)
.cat(IpfsPath::from(cid), None, peers, local, timeout)
.await
.map_err(anyhow::Error::from)?
.boxed();
Expand Down
Loading

0 comments on commit 34c278a

Please sign in to comment.