Skip to content

Commit

Permalink
Merge pull request hyperledger-archives#1505 from Artemkaaas/bugfix/n…
Browse files Browse the repository at this point in the history
…ode-timeout

Restart catchup in case of outdated cache
  • Loading branch information
jovfer authored Mar 13, 2019
2 parents b8ae96d + 2e9e053 commit 8bb0f1c
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Jenkinsfile.cd
Original file line number Diff line number Diff line change
Expand Up @@ -1429,7 +1429,7 @@ def shell(command) {
}

def setupRust() {
shell("rustup default 1.31.0")
shell("rustup default 1.32.0")
}

def androidPublishing() {
Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ def shell(command) {
}

def setupRust() {
shell("rustup default 1.31.0")
shell("rustup default 1.32.0")
}

def setupBrewPackages() {
Expand Down
4 changes: 1 addition & 3 deletions libindy/ci/amazon.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ RUN wget https://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-mav
RUN sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo
RUN yum install -y apache-maven

ENV RUST_ARCHIVE=rust-1.31.0-x86_64-unknown-linux-gnu.tar.gz
ENV RUST_ARCHIVE=rust-1.32.0-x86_64-unknown-linux-gnu.tar.gz
ENV RUST_DOWNLOAD_URL=https://static.rust-lang.org/dist/$RUST_ARCHIVE

RUN mkdir -p /rust
Expand Down Expand Up @@ -74,6 +74,4 @@ RUN cd /tmp && \
RUN useradd -ms /bin/bash -u $uid indy
USER indy

RUN cargo install --git https://github.com/DSRCorporation/cargo-test-xunit

WORKDIR /home/indy
2 changes: 1 addition & 1 deletion libindy/ci/ubuntu.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
RUN useradd -ms /bin/bash -u $uid indy
USER indy

RUN curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain 1.31.0
RUN curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain 1.32.0
ENV PATH /home/indy/.cargo/bin:$PATH

# Install clippy to the Rust toolchain
Expand Down
67 changes: 51 additions & 16 deletions libindy/src/services/pool/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub enum CatchupProgress {
MerkleTree,
),
NotNeeded(MerkleTree),
Restart(MerkleTree),
InProgress,
}

Expand Down Expand Up @@ -51,34 +52,68 @@ pub fn check_nodes_responses_on_status(nodes_votes: &HashMap<(String, usize, Opt
node_cnt: usize,
f: usize,
pool_name: &str) -> IndyResult<CatchupProgress> {
if let Some((most_popular_vote, votes_cnt)) = nodes_votes.iter().map(|(key, val)| (key, val.len())).max_by_key(|entry| entry.1) {
let is_consensus_reached = votes_cnt == node_cnt - f;
if is_consensus_reached {
if most_popular_vote.0.eq("timeout") {
return Err(err_msg(IndyErrorKind::PoolTimeout, "Pool timeout"));
}

return _try_to_catch_up(most_popular_vote, merkle_tree).or_else(|err| {
let (votes, timeout_votes): (HashMap<&(String, usize, Option<Vec<String>>), usize>, HashMap<&(String, usize, Option<Vec<String>>), usize>) =
nodes_votes
.iter()
.map(|(key, val)| (key, val.len()))
.partition(|((key, _, _), _)| key != "timeout");

let most_popular_not_timeout =
votes
.iter()
.max_by_key(|entry| entry.1);

let timeout_votes = timeout_votes.iter().last();

if let Some((most_popular_not_timeout_vote, votes_cnt)) = most_popular_not_timeout {
if *votes_cnt == f + 1 {
return _try_to_catch_up(most_popular_not_timeout_vote, merkle_tree).or_else(|err| {
if merkle_tree_factory::drop_cache(pool_name).is_ok() {
let merkle_tree = merkle_tree_factory::create(pool_name)?;
_try_to_catch_up(most_popular_vote, &merkle_tree)
_try_to_catch_up(most_popular_not_timeout_vote, &merkle_tree)
} else {
Err(err)
}
});
} else {
let reps_cnt: usize = nodes_votes.values().map(|set| set.len()).sum();
let positive_votes_cnt = votes_cnt + (node_cnt - reps_cnt);
let is_consensus_reachable = positive_votes_cnt < node_cnt - f;
if is_consensus_reachable {
//TODO: maybe we should change the error, but it was made to escape changing of ErrorCode returned to client
return Err(err_msg(IndyErrorKind::PoolTimeout, "No consensus possible"));
}
return _if_consensus_reachable(nodes_votes, node_cnt, *votes_cnt, f, pool_name);
}
} else if let Some((_, votes_cnt)) = timeout_votes {
if *votes_cnt == node_cnt - f {
return _try_to_restart_catch_up(pool_name, err_msg(IndyErrorKind::PoolTimeout, "Pool timeout"));
} else {
return _if_consensus_reachable(nodes_votes, node_cnt, *votes_cnt, f, pool_name);
}
}
Ok(CatchupProgress::InProgress)
}

fn _if_consensus_reachable(nodes_votes: &HashMap<(String, usize, Option<Vec<String>>), HashSet<String>>,
node_cnt: usize,
votes_cnt: usize,
f: usize,
pool_name: &str) -> IndyResult<CatchupProgress> {
let reps_cnt: usize = nodes_votes.values().map(HashSet::len).sum();
let positive_votes_cnt = votes_cnt + (node_cnt - reps_cnt);
let is_consensus_not_reachable = positive_votes_cnt < node_cnt - f;
if is_consensus_not_reachable {
//TODO: maybe we should change the error, but it was made to escape changing of ErrorCode returned to client
_try_to_restart_catch_up(pool_name, err_msg(IndyErrorKind::PoolTimeout, "No consensus possible"))
} else {
Ok(CatchupProgress::InProgress)
}
}


fn _try_to_restart_catch_up(pool_name: &str, err: IndyError) -> IndyResult<CatchupProgress> {
if merkle_tree_factory::drop_cache(pool_name).is_ok() {
let merkle_tree = merkle_tree_factory::create(pool_name)?;
return Ok(CatchupProgress::Restart(merkle_tree));
} else {
return Err(err);
}
}

fn _try_to_catch_up(ledger_status: &(String, usize, Option<Vec<String>>), merkle_tree: &MerkleTree) -> IndyResult<CatchupProgress> {
let &(ref target_mt_root, target_mt_size, ref hashes) = ledger_status;
let cur_mt_size = merkle_tree.count();
Expand Down
3 changes: 3 additions & 0 deletions libindy/src/services/pool/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ pub enum PoolEvent {
usize, //target_mt_size
MerkleTree,
),
CatchupRestart(
MerkleTree,
),
CatchupTargetNotFound(IndyError),
#[allow(dead_code)] //FIXME
PoolOutdated,
Expand Down
24 changes: 19 additions & 5 deletions libindy/src/services/pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,17 @@ impl<T: Networker, R: RequestHandler<T>> PoolSM<T, R> {
CommandExecutor::instance().send(Command::Pool(pc)).unwrap();
PoolState::Terminated(state.into())
}
PoolEvent::CatchupRestart(merkle_tree) => {
if let Ok((nodes, remotes)) = _get_nodes_and_remotes(&merkle_tree) {
state.networker.borrow_mut().process_event(Some(NetworkerEvent::NodesStateUpdated(remotes)));
state.request_handler = R::new(state.networker.clone(), _get_f(nodes.len()), &vec![], &nodes, None, &pool_name, timeout, extended_timeout);
let ls = _ledger_status(&merkle_tree);
state.request_handler.process_event(Some(RequestEvent::LedgerStatus(ls, None, Some(merkle_tree))));
PoolState::GettingCatchupTarget(state)
} else {
PoolState::Terminated(state.into())
}
}
PoolEvent::CatchupTargetFound(target_mt_root, target_mt_size, merkle_tree) => {
if let Ok((nodes, remotes)) = _get_nodes_and_remotes(&merkle_tree) {
state.networker.borrow_mut().process_event(Some(NetworkerEvent::NodesStateUpdated(remotes)));
Expand Down Expand Up @@ -605,19 +616,22 @@ fn _get_request_handler_with_ledger_status_sent<T: Networker, R: RequestHandler<
};
networker.borrow_mut().process_event(Some(NetworkerEvent::NodesStateUpdated(remotes)));
let mut request_handler = R::new(networker.clone(), _get_f(nodes.len()), &vec![], &nodes, None, pool_name, timeout, extended_timeout);
let ls = _ledger_status(&merkle);
request_handler.process_event(Some(RequestEvent::LedgerStatus(ls, None, Some(merkle))));
Ok(request_handler)
}

fn _ledger_status(merkle: &MerkleTree) -> LedgerStatus{
let protocol_version = ProtocolVersion::get();

let ls = LedgerStatus {
LedgerStatus {
txnSeqNo: merkle.count(),
merkleRoot: merkle.root_hash().as_slice().to_base58(),
ledgerId: 0,
ppSeqNo: None,
viewNo: None,
protocolVersion: if protocol_version > 1 { Some(protocol_version) } else { None },
};

request_handler.process_event(Some(RequestEvent::LedgerStatus(ls, None, Some(merkle))));
Ok(request_handler)
}
}

fn _get_nodes_and_remotes(merkle: &MerkleTree) -> IndyResult<(HashMap<String, Option<VerKey>>, Vec<RemoteNode>)> {
Expand Down
23 changes: 16 additions & 7 deletions libindy/src/services/pool/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,12 +546,20 @@ impl<T: Networker> RequestSM<T> {
pool_name: &str) -> (RequestState<T>, Option<PoolEvent>) {
let (finished, result) = RequestSM::_process_catchup_target(mt_root, sz, cons_proof,
&node_alias, &mut state, f, nodes, pool_name);
if finished {
state.networker.borrow_mut().process_event(Some(NetworkerEvent::CleanTimeout(req_id, None)));
(RequestState::finish(), result)
} else {
state.networker.borrow_mut().process_event(Some(NetworkerEvent::CleanTimeout(req_id, Some(node_alias))));
(RequestState::CatchupConsensus(state), result)

match (finished, result) {
(true, result) => {
state.networker.borrow_mut().process_event(Some(NetworkerEvent::CleanTimeout(req_id, None)));
(RequestState::finish(), result)
},
(false, Some(PoolEvent::CatchupRestart(merkle_tree))) => {
state.networker.borrow_mut().process_event(Some(NetworkerEvent::CleanTimeout(req_id, None)));
(RequestState::CatchupConsensus(state), Some(PoolEvent::CatchupRestart(merkle_tree)))
},
(false, result) => {
state.networker.borrow_mut().process_event(Some(NetworkerEvent::CleanTimeout(req_id, Some(node_alias))));
(RequestState::CatchupConsensus(state), result)
}
}
}

Expand Down Expand Up @@ -579,6 +587,7 @@ impl<T: Networker> RequestSM<T> {
&pool_name) {
Ok(CatchupProgress::InProgress) => (false, None),
Ok(CatchupProgress::NotNeeded(merkle_tree)) => (true, Some(PoolEvent::Synced(merkle_tree))),
Ok(CatchupProgress::Restart(merkle_tree)) => (false, Some(PoolEvent::CatchupRestart(merkle_tree))),
Ok(CatchupProgress::ShouldBeStarted(target_mt_root, target_mt_size, merkle_tree)) =>
(true, Some(PoolEvent::CatchupTargetFound(target_mt_root, target_mt_size, merkle_tree))),
Err(err) => (true, Some(PoolEvent::CatchupTargetNotFound(err))),
Expand Down Expand Up @@ -1501,7 +1510,7 @@ pub mod tests {
assert_match!(&RequestState::CatchupConsensus(_), &request_handler.request_wrapper.as_ref().unwrap().state);

request_handler.process_event(Some(RequestEvent::LedgerStatus(LedgerStatus::default(), Some("n3".to_string()), Some(MerkleTree::default()))));
assert_match!(&RequestState::CatchupConsensus(_), &request_handler.request_wrapper.as_ref().unwrap().state);
assert_match!(&RequestState::Finish(_), &request_handler.request_wrapper.as_ref().unwrap().state);
request_handler.process_event(Some(RequestEvent::LedgerStatus(LedgerStatus::default(), Some("n4".to_string()), Some(MerkleTree::default()))));
assert_match!(RequestState::Finish(_), request_handler.request_wrapper.unwrap().state);
}
Expand Down
2 changes: 1 addition & 1 deletion vcx/ci/libindy.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ RUN curl -sL https://deb.nodesource.com/setup_8.x | bash - \
&& apt-get install -y nodejs

# Install Rust
ARG RUST_VER="1.31.0"
ARG RUST_VER="1.32.0"
ENV RUST_ARCHIVE=rust-${RUST_VER}-x86_64-unknown-linux-gnu.tar.gz
ENV RUST_DOWNLOAD_URL=https://static.rust-lang.org/dist/$RUST_ARCHIVE

Expand Down
2 changes: 1 addition & 1 deletion vcx/ci/libvcx.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ARG uid=1000
RUN useradd -ms /bin/bash -u $uid vcx
USER vcx

RUN curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain 1.31.0
RUN curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain 1.32.0
ENV PATH /home/vcx/.cargo/bin:$PATH
WORKDIR /home/vcx
ENV PATH /home/vcx:$PATH
Expand Down
2 changes: 1 addition & 1 deletion vcx/libvcx/build_scripts/ios/mac/mac.01.libindy.setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fi

if [[ $RUSTUP_VERSION =~ ^'rustup ' ]]; then
rustup update
rustup default 1.31.0
rustup default 1.32.0
rustup component add rls-preview rust-analysis rust-src
echo "Using rustc version $(rustc --version)"
rustup target remove aarch64-linux-android armv7-linux-androideabi arm-linux-androideabi i686-linux-android x86_64-linux-android
Expand Down
2 changes: 1 addition & 1 deletion vcx/wrappers/java/ci/android.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ RUN yes | .//home/android/android-sdk-linux/tools/bin/sdkmanager "ndk-bundle"
RUN echo "android ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers

USER android
RUN curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain 1.31.0
RUN curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain 1.32.0
ENV PATH /home/android/.cargo/bin:$PATH

0 comments on commit 8bb0f1c

Please sign in to comment.