Skip to content

Commit

Permalink
Merge branch 'develop' into exec/support-more-rocksdb-compression-type
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangsoledad authored Oct 24, 2024
2 parents 4fdce3f + 25e3736 commit 21e0802
Show file tree
Hide file tree
Showing 84 changed files with 2,400 additions and 525 deletions.
618 changes: 406 additions & 212 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ ckb-build-info = { path = "util/build-info", version = "= 0.119.0-pre" }
[dependencies]
ckb-build-info = { path = "util/build-info", version = "= 0.119.0-pre" }
ckb-bin = { path = "ckb-bin", version = "= 0.119.0-pre" }
console-subscriber = { version = "0.2.0", optional = true }
console-subscriber = { version = "0.4.0", optional = true }

[dev-dependencies]

Expand Down Expand Up @@ -60,6 +60,7 @@ members = [
"util/dao/utils",
"traits",
"spec",
"util/fee-estimator",
"util/proposal-table",
"script",
"util/app-config",
Expand Down
1 change: 0 additions & 1 deletion benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ ckb-test-chain-utils = { path = "../util/test-chain-utils", version = "= 0.119.0
ckb-dao-utils = { path = "../util/dao/utils", version = "= 0.119.0-pre" }
ckb-dao = { path = "../util/dao", version = "= 0.119.0-pre" }
ckb-system-scripts = { version = "= 0.5.4" }
lazy_static = "1.3.0"
ckb-crypto = { path = "../util/crypto", version = "= 0.119.0-pre" }
ckb-jsonrpc-types = { path = "../util/jsonrpc-types", version = "= 0.119.0-pre" }
ckb-verification = { path = "../verification", version = "= 0.119.0-pre" }
Expand Down
28 changes: 13 additions & 15 deletions benches/benches/benchmarks/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use ckb_types::{
utilities::difficulty_to_compact,
H160, H256, U256,
};
use lazy_static::lazy_static;
use rand::random;
use std::collections::HashSet;

Expand Down Expand Up @@ -200,19 +199,19 @@ pub fn gen_always_success_block(
const PRIVKEY: H256 = h256!("0xb2b3324cece882bca684eaf202667bb56ed8e8c2fd4b4dc71f615ebd6d9055a5");
const PUBKEY_HASH: H160 = h160!("0x779e5930892a0a9bf2fedfe048f685466c7d0396");

lazy_static! {
static ref SECP_DATA_CELL: (CellOutput, Bytes) = {
let raw_data = BUNDLED_CELL
.get("specs/cells/secp256k1_data")
.expect("load secp256k1_blake160_sighash_all");
let data: Bytes = raw_data.to_vec().into();
static SECP_DATA_CELL: std::sync::LazyLock<(CellOutput, Bytes)> = std::sync::LazyLock::new(|| {
let raw_data = BUNDLED_CELL
.get("specs/cells/secp256k1_data")
.expect("load secp256k1_blake160_sighash_all");
let data: Bytes = raw_data.to_vec().into();

let cell = CellOutput::new_builder()
.capacity(Capacity::bytes(data.len()).unwrap().pack())
.build();
(cell, data)
};
static ref SECP_CELL: (CellOutput, Bytes, Script) = {
let cell = CellOutput::new_builder()
.capacity(Capacity::bytes(data.len()).unwrap().pack())
.build();
(cell, data)
});
static SECP_CELL: std::sync::LazyLock<(CellOutput, Bytes, Script)> =
std::sync::LazyLock::new(|| {
let raw_data = BUNDLED_CELL
.get("specs/cells/secp256k1_blake160_sighash_all")
.expect("load secp256k1_blake160_sighash_all");
Expand All @@ -229,8 +228,7 @@ lazy_static! {
.build();

(cell, data, script)
};
}
});

pub fn secp_cell() -> &'static (CellOutput, Bytes, Script) {
&SECP_CELL
Expand Down
9 changes: 6 additions & 3 deletions chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ dashmap = "4.0"
ckb-test-chain-utils = { path = "../util/test-chain-utils", version = "= 0.119.0-pre" }
ckb-dao-utils = { path = "../util/dao/utils", version = "= 0.119.0-pre" }
ckb-reward-calculator = { path = "../util/reward-calculator", version = "= 0.119.0-pre" }
ckb-tx-pool = { path = "../tx-pool", version = "= 0.119.0-pre", features = ["internal"] }
ckb-tx-pool = { path = "../tx-pool", version = "= 0.119.0-pre", features = [
"internal",
] }
ckb-jsonrpc-types = { path = "../util/jsonrpc-types", version = "= 0.119.0-pre" }
ckb-network = { path = "../network", version = "= 0.119.0-pre" }
lazy_static = "1.4"
tempfile.workspace = true
ckb-systemtime = { path = "../util/systemtime", version = "= 0.119.0-pre" ,features = ["enable_faketime"]}
ckb-systemtime = { path = "../util/systemtime", version = "= 0.119.0-pre", features = [
"enable_faketime",
] }
ckb-logger-service = { path = "../util/logger-service", version = "= 0.119.0-pre" }

[features]
Expand Down
15 changes: 0 additions & 15 deletions chain/src/tests/block_assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use ckb_types::{
};
use ckb_verification::{BlockVerifier, HeaderVerifier};
use ckb_verification_traits::{Switch, Verifier};
use lazy_static::lazy_static;
use std::sync::Arc;

fn start_chain(consensus: Option<Consensus>) -> (ChainController, Shared) {
Expand Down Expand Up @@ -57,20 +56,6 @@ fn start_chain(consensus: Option<Consensus>) -> (ChainController, Shared) {
(chain_controller, shared)
}

lazy_static! {
static ref BASIC_BLOCK_SIZE: u64 = {
let (_chain_controller, shared) = start_chain(None);

let block_template = shared
.get_block_template(None, None, None)
.unwrap()
.unwrap();

let block: Block = block_template.into();
block.serialized_size_without_uncle_proposals() as u64
};
}

#[test]
fn test_get_block_template() {
let (_chain_controller, shared) = start_chain(None);
Expand Down
8 changes: 8 additions & 0 deletions chain/src/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ impl ConsumeUnverifiedBlockProcessor {
db_txn.insert_epoch_ext(&epoch.last_block_hash_in_previous_epoch(), &epoch)?;
}

let in_ibd = self.shared.is_initial_block_download();

if new_best_block {
info!(
"[verify block] new best block found: {} => {:#x}, difficulty diff = {:#x}, unverified_tip: {}",
Expand Down Expand Up @@ -368,6 +370,9 @@ impl ConsumeUnverifiedBlockProcessor {
) {
error!("[verify block] notify update_tx_pool_for_reorg error {}", e);
}
if let Err(e) = tx_pool_controller.update_ibd_state(in_ibd) {
error!("Notify update_ibd_state error {}", e);
}
}

self.shared
Expand Down Expand Up @@ -395,6 +400,9 @@ impl ConsumeUnverifiedBlockProcessor {
if let Err(e) = tx_pool_controller.notify_new_uncle(block.as_uncle()) {
error!("[verify block] notify new_uncle error {}", e);
}
if let Err(e) = tx_pool_controller.update_ibd_state(in_ibd) {
error!("Notify update_ibd_state error {}", e);
}
}
}
Ok(true)
Expand Down
4 changes: 2 additions & 2 deletions ckb-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ ckb-constant = { path = "../util/constant", version = "= 0.119.0-pre" }
base64 = "0.21.0"
tempfile.workspace = true
rayon = "1.0"
sentry = { version = "0.26.0", optional = true }
sentry = { version = "0.34.0", optional = true }
is-terminal = "0.4.7"
fdlimit = "0.2.1"
ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.119.0-pre" }
tokio = { version = "1", features = ["sync"] }

[target.'cfg(not(target_os="windows"))'.dependencies]
daemonize = { version = "0.5.0" }
nix = { version = "0.24.0", default-features = false, features = ["signal"] }
nix = { version = "0.29.0", default-features = false, features = ["signal"] }
colored = "2.0"

[features]
Expand Down
1 change: 0 additions & 1 deletion db-migration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ ckb-error = { path = "../error", version = "= 0.119.0-pre" }
ckb-db-schema = { path = "../db-schema", version = "= 0.119.0-pre" }
ckb-channel = { path = "../util/channel", version = "= 0.119.0-pre" }
ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.119.0-pre" }
once_cell = "1.8.0"
indicatif = "0.16"
console = ">=0.9.1, <1.0.0"

Expand Down
4 changes: 2 additions & 2 deletions db-migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ use ckb_logger::{debug, error, info};
use ckb_stop_handler::register_thread;
use console::Term;
pub use indicatif::{HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
use once_cell::sync::OnceCell;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::OnceLock;
use std::thread;
use std::thread::JoinHandle;

/// Shutdown flag for background migration.
pub static SHUTDOWN_BACKGROUND_MIGRATION: OnceCell<bool> = OnceCell::new();
pub static SHUTDOWN_BACKGROUND_MIGRATION: OnceLock<bool> = OnceLock::new();

#[cfg(test)]
mod tests;
Expand Down
4 changes: 3 additions & 1 deletion error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ repository = "https://github.com/nervosnetwork/ckb"
thiserror = "1.0.22"
anyhow = "1.0.34"
ckb-occupied-capacity = { path = "../util/occupied-capacity", version = "= 0.119.0-pre" }
derive_more = { version = "0.99.0", default-features = false, features = ["display"] }
derive_more = { version = "1", default-features = false, features = [
"display",
] }
2 changes: 1 addition & 1 deletion error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl fmt::Display for AnyError {

impl fmt::Debug for AnyError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(f)
derive_more::Display::fmt(self, f)
}
}
/// Return whether the error's kind is `InternalErrorKind::Database`
Expand Down
10 changes: 8 additions & 2 deletions miner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@ rand_distr = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
ckb-jsonrpc-types = { path = "../util/jsonrpc-types", version = "= 0.119.0-pre" }
hyper = { version = "0.14", features = ["client", "http2", "http1", "server"] }
hyper-tls = "0.5"
hyper = { version = "1", features = ["client", "http2", "http1", "server"] }
hyper-util = { version = "0.1", features = [
"server-auto",
"server-graceful",
"client-legacy",
] }
http-body-util = "0.1"
hyper-tls = "0.6"
futures = "0.3"
lru = "0.7.1"
ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.119.0-pre" }
Expand Down
94 changes: 62 additions & 32 deletions miner/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,39 @@ use ckb_types::{
H256,
};
use futures::prelude::*;
use http_body_util::{BodyExt, Empty, Full};
use hyper::{
body::{to_bytes, Buf, Bytes},
body::{Buf, Bytes},
header::{HeaderValue, CONTENT_TYPE},
service::{make_service_fn, service_fn},
Body, Client as HttpClient, Error as HyperError, Method, Request, Response, Server, Uri,
service::service_fn,
Error as HyperError, Request, Response, Uri,
};
use hyper_util::{
client::legacy::{Client as HttpClient, Error as ClientError},
rt::TokioExecutor,
server::{conn::auto, graceful::GracefulShutdown},
};
use jsonrpc_core::{
error::Error as RpcFail, error::ErrorCode as RpcFailCode, id::Id, params::Params,
request::MethodCall, response::Output, version::Version,
};
use serde_json::error::Error as JsonError;
use serde_json::{self, json, Value};
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::{convert::Into, time};
use tokio::sync::{mpsc, oneshot};
use tokio::{
net::TcpListener,
sync::{mpsc, oneshot},
};

type RpcRequest = (oneshot::Sender<Result<Bytes, RpcError>>, MethodCall);

#[derive(Debug)]
pub enum RpcError {
Http(HyperError),
Client(ClientError),
Canceled, //oneshot canceled
Json(JsonError),
Fail(RpcFail),
Expand All @@ -53,7 +62,7 @@ impl Rpc {
let stop_rx: CancellationToken = new_tokio_exit_rx();

let https = hyper_tls::HttpsConnector::new();
let client = HttpClient::builder().build(https);
let client = HttpClient::builder(TokioExecutor::new()).build::<_, Full<Bytes>>(https);
let loop_handle = handle.clone();
handle.spawn(async move {
loop {
Expand All @@ -62,24 +71,23 @@ impl Rpc {
let (sender, call): RpcRequest = item;
let req_url = url.clone();
let request_json = serde_json::to_vec(&call).expect("valid rpc call");
let mut req = Request::new(Body::from(request_json));
*req.method_mut() = Method::POST;
*req.uri_mut() = req_url;
req.headers_mut()
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));

let mut req = Request::builder().uri(req_url).method("POST").header(CONTENT_TYPE, "application/json");

if let Some(value) = parse_authorization(&url) {
req.headers_mut()
.append(hyper::header::AUTHORIZATION, value);
req = req
.header(hyper::header::AUTHORIZATION, value);
}
let req = req.body(Full::new(Bytes::from(request_json))).unwrap();
let client = client.clone();
loop_handle.spawn(async move {
let request = match client
.request(req)
.await
.map(|res|res.into_body())
{
Ok(body) => to_bytes(body).await.map_err(RpcError::Http),
Err(err) => Err(RpcError::Http(err)),
Ok(body) => BodyExt::collect(body).await.map_err(RpcError::Http).map(|t| t.to_bytes()),
Err(err) => Err(RpcError::Client(err)),
};
if sender.send(request).is_err() {
error!("rpc response send back error")
Expand Down Expand Up @@ -224,23 +232,42 @@ Otherwise ckb-miner will malfunction and stop submitting valid blocks after a ce
}

async fn listen_block_template_notify(&self, addr: SocketAddr) {
let client = self.clone();
let make_service = make_service_fn(move |_conn| {
let client = client.clone();
let service = service_fn(move |req| handle(client.clone(), req));
async move { Ok::<_, Infallible>(service) }
});

let server = Server::bind(&addr).serve(make_service);
let listener = TcpListener::bind(addr).await.unwrap();
let server = auto::Builder::new(TokioExecutor::new());
let graceful = GracefulShutdown::new();
let stop_rx: CancellationToken = new_tokio_exit_rx();
let graceful = server.with_graceful_shutdown(async move {
stop_rx.cancelled().await;
info!("Miner client received exit signal. Exit now");
});

if let Err(e) = graceful.await {
error!("server error: {}", e);
loop {
let client = self.clone();
let handle = service_fn(move |req| handle(client.clone(), req));
tokio::select! {
conn = listener.accept() => {
let (stream, _) = match conn {
Ok(conn) => conn,
Err(e) => {
info!("accept error: {}", e);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
};
let stream = hyper_util::rt::TokioIo::new(Box::pin(stream));
let conn = server.serve_connection_with_upgrades(stream, handle);

let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
if let Err(err) = conn.await {
info!("connection error: {}", err);
}
});
},
_ = stop_rx.cancelled() => {
info!("Miner client received exit signal. Exit now");
break;
}
}
}
drop(listener);
graceful.shutdown().await;
}

async fn poll_block_template(&self) {
Expand Down Expand Up @@ -327,14 +354,17 @@ Otherwise ckb-miner will malfunction and stop submitting valid blocks after a ce

type Error = Box<dyn std::error::Error + Send + Sync>;

async fn handle(client: Client, req: Request<Body>) -> Result<Response<Body>, Error> {
let body = hyper::body::aggregate(req).await?;
async fn handle(
client: Client,
req: Request<hyper::body::Incoming>,
) -> Result<Response<Empty<Bytes>>, Error> {
let body = BodyExt::collect(req).await?.aggregate();

if let Ok(template) = serde_json::from_reader(body.reader()) {
client.update_block_template(template);
}

Ok(Response::new(Body::empty()))
Ok(Response::new(Empty::new()))
}

async fn parse_response<T: serde::de::DeserializeOwned>(output: Output) -> Result<T, RpcError> {
Expand Down
Loading

0 comments on commit 21e0802

Please sign in to comment.