Skip to content

Commit

Permalink
Merge pull request #306 from nervosnetwork/fix-session-bloking-detection
Browse files Browse the repository at this point in the history
fix: Fix session bloking detection
  • Loading branch information
driftluo authored Jan 27, 2021
2 parents 5cf596f + 32323b3 commit 18068e5
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 17 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.3.7

### Bug Fix
- Blocking session detection(#306)

## 0.3.6

### Bug Fix
Expand Down
2 changes: 1 addition & 1 deletion tentacle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tentacle"
version = "0.3.6"
version = "0.3.7"
license = "MIT"
description = "Minimal implementation for a multiplexed p2p network framework."
authors = ["piaoliu <[email protected]>", "Nervos Core Dev <[email protected]>"]
Expand Down
39 changes: 25 additions & 14 deletions tentacle/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::{
prelude::*,
stream::{FusedStream, StreamExt},
};
use log::{debug, error, log_enabled, trace};
use log::{debug, error, log_enabled, trace, warn};
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -378,19 +378,30 @@ where
.values_mut()
.filter(|control| !control.buffer.is_empty())
{
control.try_send(cx);
if control.inner.pending_data_size() > self.config.session_config.send_buffer_size {
self.handle.handle_error(
&mut self.service_context,
ServiceError::SessionBlocked {
session_context: control.inner.clone(),
},
);
// clean message and try to close this session
control.buffer.clear();
let id = control.inner.id;
control.push(Priority::High, SessionEvent::SessionClose { id });
control.try_send(cx);
if let SendResult::Pending = control.try_send(cx) {
if control.inner.pending_data_size() > self.config.session_config.send_buffer_size {
self.handle.handle_error(
&mut self.service_context,
ServiceError::SessionBlocked {
session_context: control.inner.clone(),
},
);

warn!(
"session {:?} unable to send message, \
user allow buffer size: {}, \
current buffer size: {}, so kill it",
control.inner,
self.config.session_config.send_buffer_size,
control.inner.pending_data_size()
);

// clean message and try to close this session
control.buffer.clear();
let id = control.inner.id;
control.push(Priority::High, SessionEvent::SessionClose { id });
control.try_send(cx);
}
}
}
}
Expand Down
18 changes: 16 additions & 2 deletions tentacle/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::{channel::mpsc, prelude::*, stream::iter, SinkExt};
use log::{debug, error, log_enabled, trace};
use log::{debug, error, log_enabled, trace, warn};
use std::collections::{HashMap, HashSet};
use std::{
io::{self, ErrorKind},
Expand Down Expand Up @@ -354,7 +354,21 @@ impl Session {
.values_mut()
.filter(|buffer| !buffer.is_empty())
{
buffer.try_send(cx);
if let SendResult::Pending = buffer.try_send(cx) {
if self.context.pending_data_size() > self.config.send_buffer_size {
self.state = SessionState::Abnormal;
warn!(
"session {:?} unable to send message, \
user allow buffer size: {}, \
current buffer size: {}, so kill it",
self.context,
self.config.send_buffer_size,
self.context.pending_data_size()
);
buffer.clear();
}
break;
}
}
}

Expand Down
123 changes: 123 additions & 0 deletions tentacle/tests/test_large_pending_messge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use bytes::Bytes;
use futures::{channel, StreamExt};
use std::thread;
use tentacle::{
builder::{MetaBuilder, ServiceBuilder},
context::{ProtocolContext, ProtocolContextMutRef, ServiceContext},
multiaddr::Multiaddr,
secio::SecioKeyPair,
service::{ProtocolHandle, ProtocolMeta, Service, ServiceError, TargetProtocol},
traits::{ServiceHandle, ServiceProtocol},
ProtocolId,
};

pub fn create<F>(secio: bool, meta: ProtocolMeta, shandle: F) -> Service<F>
where
F: ServiceHandle + Unpin,
{
let builder = ServiceBuilder::default()
.insert_protocol(meta)
.set_send_buffer_size(1)
.set_recv_buffer_size(1);

if secio {
builder
.key_pair(SecioKeyPair::secp256k1_generated())
.build(shandle)
} else {
builder.build(shandle)
}
}

struct PHandle;

impl ServiceProtocol for PHandle {
fn init(&mut self, _context: &mut ProtocolContext) {}

fn connected(&mut self, context: ProtocolContextMutRef, _version: &str) {
if context.session.ty.is_inbound() {
let data = Bytes::from(vec![0; 1024 * 1024 * 8]);
loop {
let _res = context.send_message(data.clone());
}
}
}

fn received(&mut self, _context: ProtocolContextMutRef, _data: bytes::Bytes) {
thread::sleep(::std::time::Duration::from_secs(10));
}
}

fn create_meta(id: ProtocolId) -> ProtocolMeta {
MetaBuilder::new()
.id(id)
.service_handle(move || {
if id == 0.into() {
ProtocolHandle::Neither
} else {
let handle = Box::new(PHandle);
ProtocolHandle::Callback(handle)
}
})
.build()
}

#[derive(Clone)]
pub struct SHandle;

impl ServiceHandle for SHandle {
fn handle_error(&mut self, _control: &mut ServiceContext, error: ServiceError) {
match error {
ServiceError::SessionBlocked { .. } => (),
e => panic!("Unexpected error: {:?}", e),
}
}
}

fn test_large_message(secio: bool) {
let meta_1 = create_meta(1.into());
let (addr_sender, addr_receiver) = channel::oneshot::channel::<Multiaddr>();

thread::spawn(move || {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let mut service = create(secio, meta_1, SHandle);
rt.block_on(async move {
let listen_addr = service
.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.await
.unwrap();
let _res = addr_sender.send(listen_addr);
loop {
if service.next().await.is_none() {
break;
}
}
});
});

let meta = create_meta(1.into());
let mut rt = tokio::runtime::Runtime::new().unwrap();
let mut service = create(secio, meta, SHandle);
rt.block_on(async move {
let listen_addr = addr_receiver.await.unwrap();
service
.dial(listen_addr, TargetProtocol::All)
.await
.unwrap();
loop {
if service.next().await.is_none() {
break;
}
}
});
}

#[test]
fn test_large_message_with_secio() {
test_large_message(true)
}

#[test]
fn test_large_message_with_no_secio() {
test_large_message(false)
}

0 comments on commit 18068e5

Please sign in to comment.