Skip to content

Commit

Permalink
Merge branch 'main' into feat/shuttle-gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 committed Oct 6, 2023
2 parents e4ad296 + 15c125a commit 24ed259
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 142 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ either = "1"
void = "1"

#ipfs dependency
rust-ipfs = "0.4.5"
rust-ipfs = "0.5.0"


# Blink related crates
# av-data is needed to use libaom. need to ensure that Warp and libaom use the same version of av-data
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.aarch64-android
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | bash -s -- -y
# RUN . $HOME/.cargo/env && rustup target add aarch64-unknown-linux-gnu
# RUN . $HOME/.cargo/env && rustup toolchain install stable-aarch64-unknown-linux-gnu
RUN . $HOME/.cargo/env && rustup target add aarch64-linux-android
RUN . $HOME/.cargo/env && cargo install --git https://github.com/tauri-apps/tauri-mobile
RUN . $HOME/.cargo/env && cargo install --git https://github.com/tauri-apps/cargo-mobile2

WORKDIR /root/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.23.1/cmake-3.23.1.tar.gz
Expand Down
19 changes: 12 additions & 7 deletions extensions/warp-ipfs/src/store/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,11 @@ impl ConversationDocument {

let ipfs = ipfs.clone();
let stream = async_stream::stream! {

let mut remaining = option.limit();
for (index, document) in messages.iter().enumerate() {
if remaining.as_ref().map(|x| *x == 0).unwrap_or_default() {
break;
}
if let Some(range) = option.range() {
if range.start > index || range.end < index {
continue
Expand All @@ -388,20 +391,22 @@ impl ConversationDocument {
continue
}
}

if let Ok(message) = document.resolve(&ipfs, &did, keystore.as_ref()).await {
if option.pinned() && !message.pinned() {
continue;
}
if let Some(keyword) = option.keyword() {
if message
let should_yield = if let Some(keyword) = option.keyword() {
message
.value()
.iter()
.any(|line| line.to_lowercase().contains(&keyword.to_lowercase()))
{
yield message;
}
} else {
true
};
if should_yield {
if let Some(remaining) = remaining.as_mut() {
*remaining = remaining.saturating_sub(1);
}
yield message;
}
}
Expand Down
2 changes: 1 addition & 1 deletion extensions/warp-ipfs/src/store/document/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub async fn unixfs_fetch(
let fut = async {
let stream = ipfs
.unixfs()
.cat(IpfsPath::from(cid), None, &[], local)
.cat(IpfsPath::from(cid), None, &[], local, None)
.await
.map_err(anyhow::Error::from)?;

Expand Down
2 changes: 1 addition & 1 deletion extensions/warp-ipfs/src/store/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl FileStore {
let mut index_stream = self
.ipfs
.unixfs()
.cat(IpfsPath::from(cid), None, &[], true)
.cat(IpfsPath::from(cid), None, &[], true, None)
.await
.map_err(anyhow::Error::from)?
.boxed();
Expand Down
6 changes: 4 additions & 2 deletions extensions/warp-ipfs/src/store/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ impl IdentityStore {
None,
&[],
false,
None,
)
.await?
.boxed();
Expand Down Expand Up @@ -1222,6 +1223,7 @@ impl IdentityStore {
None,
&[],
false,
None,
)
.await?
.boxed();
Expand Down Expand Up @@ -1322,7 +1324,7 @@ impl IdentityStore {
async move {
let mut stream = ipfs
.unixfs()
.cat(picture, None, &[], false)
.cat(picture, None, &[], false, None)
.await?
.boxed();

Expand Down Expand Up @@ -1350,7 +1352,7 @@ impl IdentityStore {
async move {
let mut stream = ipfs
.unixfs()
.cat(banner, None, &[], false)
.cat(banner, None, &[], false, None)
.await?
.boxed();
while let Some(_d) = stream.next().await {
Expand Down
174 changes: 48 additions & 126 deletions extensions/warp-ipfs/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ use futures::channel::mpsc::{unbounded, Sender, UnboundedSender};
use futures::channel::oneshot::{self, Sender as OneshotSender};
use futures::stream::{FuturesUnordered, SelectAll};
use futures::{SinkExt, Stream, StreamExt};
use rust_ipfs::libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use rust_ipfs::{Ipfs, PeerId, SubscriptionStream};
use rust_ipfs::{Ipfs, IpfsPath, PeerId, SubscriptionStream};

use libipld::Cid;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tokio::sync::broadcast::{self, Receiver as BroadcastReceiver, Sender as BroadcastSender};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_stream::wrappers::ReadDirStream;
Expand Down Expand Up @@ -47,7 +45,7 @@ use super::document::utils::{GetLocalDag, ToCid};
use super::friends::FriendsStore;
use super::identity::IdentityStore;
use super::keystore::Keystore;
use super::{did_to_libp2p_pub, verify_serde_sig, ConversationEvents, DidExt, MessagingEvents};
use super::{did_to_libp2p_pub, verify_serde_sig, ConversationEvents, MessagingEvents};

const PERMIT_AMOUNT: usize = 1;

Expand Down Expand Up @@ -926,32 +924,6 @@ impl MessageStore {
spam_check(&mut message, self.spam_filter.clone())?;
let conversation_id = message.conversation_id();

if message.message_type() == MessageType::Attachment
&& direction == MessageDirection::In
{
if let Some(fs) = self.filesystem.clone() {
let dir = fs.root_directory();
for file in message.attachments() {
let original = file.name();
let mut inc = 0;
loop {
if dir.has_item(&original) {
if inc >= 20 {
break;
}
inc += 1;
file.set_name(&format!("{original}-{inc}"));
continue;
}
break;
}
if let Err(e) = dir.add_file(file) {
error!("Error adding file to constellation: {e}");
}
}
}
}

let message_id = message.id();

let message_document =
Expand Down Expand Up @@ -3506,111 +3478,61 @@ impl MessageStore {
.cloned()
.ok_or(Error::FileNotFound)?;

let root = constellation.root_directory();
if !root.has_item(&attachment.name()) {
root.add_file(attachment.clone())?;
}
let _root = constellation.root_directory();

let ipfs = self.ipfs.clone();
let constellation = constellation.clone();
let own_did = self.did.clone();
let reference = attachment
.reference()
.and_then(|reference| IpfsPath::from_str(&reference).ok())
.ok_or(Error::FileNotFound)?;

let ipfs = self.ipfs.clone();
let _constellation = constellation.clone();
let progress_stream = async_stream::stream! {
yield Progression::CurrentProgress {
name: attachment.name(),
current: 0,
total: Some(attachment.size()),
};
yield Progression::CurrentProgress {
name: attachment.name(),
current: 0,
total: Some(attachment.size()),
};

let did = message.sender();
if !did.eq(&own_did) {
if let Ok(peer_id) = did.to_peer_id() {
//This is done to insure we can successfully exchange blocks
let opt = DialOpts::peer_id(peer_id).condition(PeerCondition::NotDialing).build();
if let Err(e) = ipfs.connect(opt).await {
warn!("Issue performing a connection to peer: {e}");
}
}
}
let stream = match ipfs.get_unixfs(reference, &path).await {
Ok(stream) => stream,
Err(e) => {
yield Progression::ProgressFailed {
name: attachment.name(),
last_size: None,
error: Some(e.to_string()),
};
return;
},
};

let mut file = match tokio::fs::File::create(&path).await {
Ok(file) => file,
Err(e) => {
error!("Error creating file: {e}");
yield Progression::ProgressFailed {
name: attachment.name(),
last_size: None,
error: Some(e.to_string()),
for await event in stream {
match event {
rust_ipfs::unixfs::UnixfsStatus::ProgressStatus { written, total_size } => {
yield Progression::CurrentProgress {
name: attachment.name(),
current: written,
total: total_size
};
return;
}
};

let stream = match constellation.get_stream(&attachment.name()).await {
Ok(s) => s,
Err(e) => {
error!("Error creating stream: {e}");
yield Progression::ProgressFailed {
name: attachment.name(),
last_size: None,
error: Some(e.to_string()),
},
rust_ipfs::unixfs::UnixfsStatus::CompletedStatus { total_size, .. } => {
yield Progression::ProgressComplete {
name: attachment.name(),
total: total_size,
};
return;
}
};

let mut written = 0;
let mut failed = false;
for await res in stream {
match res {
Ok(bytes) => match file.write_all(&bytes).await {
Ok(_) => {
written += bytes.len();
yield Progression::CurrentProgress {
name: attachment.name(),
current: written,
total: Some(attachment.size()),
};
}
Err(e) => {
error!("Error writing to disk: {e}");
yield Progression::ProgressFailed {
name: attachment.name(),
last_size: Some(written),
error: Some(e.to_string()),
};
failed = true;
break;
}
},
Err(e) => {
error!("Error reading from stream: {e}");
yield Progression::ProgressFailed {
name: attachment.name(),
last_size: Some(written),
error: Some(e.to_string()),
};
failed = true;
break;
},
rust_ipfs::unixfs::UnixfsStatus::FailedStatus { written, error, .. } => {
if let Err(e) = tokio::fs::remove_file(&path).await {
error!("Error removing file: {e}");
}
}
}

if failed {
if let Err(e) = tokio::fs::remove_file(&path).await {
error!("Error removing file: {e}");
}
}

if !failed {
if let Err(e) = file.flush().await {
error!("Error flushing stream: {e}");
}
yield Progression::ProgressComplete {
name: attachment.name(),
total: Some(written),
};
yield Progression::ProgressFailed {
name: attachment.name(),
last_size: Some(written),
error: error.map(|e| e.to_string()),
};
},
}
}
};

Ok(ConstellationProgressStream(progress_stream.boxed()))
Expand Down
6 changes: 3 additions & 3 deletions warp/src/raygun/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub struct MessageOptions {
keyword: Option<String>,
pinned: bool,
range: Option<Range<usize>>,
limit: Option<i64>,
limit: Option<u8>,
skip: Option<i64>,
}

Expand All @@ -204,7 +204,7 @@ impl MessageOptions {
self
}

pub fn set_limit(mut self, limit: i64) -> MessageOptions {
pub fn set_limit(mut self, limit: u8) -> MessageOptions {
self.limit = Some(limit);
self
}
Expand Down Expand Up @@ -253,7 +253,7 @@ impl MessageOptions {
}

impl MessageOptions {
pub fn limit(&self) -> Option<i64> {
pub fn limit(&self) -> Option<u8> {
self.limit
}

Expand Down

0 comments on commit 24ed259

Please sign in to comment.