Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Events API: Transaction dropped, sync status, and renames (#1919)
Browse files Browse the repository at this point in the history
  • Loading branch information
RatanRSur authored Sep 11, 2019
1 parent 6cdc090 commit b454627
Show file tree
Hide file tree
Showing 26 changed files with 281 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public void startNode(final PantheonNode node) {
PantheonEvents.class,
new PantheonEventsImpl(
pantheonController.getProtocolManager().getBlockBroadcaster(),
pantheonController.getTransactionPool()));
pantheonController.getTransactionPool(),
pantheonController.getSyncState()));
pantheonPluginContext.startPlugins();

final Runner runner =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import java.util.Objects;

public final class SyncStatus {
public final class SyncStatus implements tech.pegasys.pantheon.plugin.data.SyncStatus {

private final long startingBlock;
private final long currentBlock;
Expand All @@ -26,18 +26,22 @@ public SyncStatus(final long startingBlock, final long currentBlock, final long
this.highestBlock = highestBlock;
}

@Override
public long getStartingBlock() {
return startingBlock;
}

@Override
public long getCurrentBlock() {
return currentBlock;
}

@Override
public long getHighestBlock() {
return highestBlock;
}

@Override
public boolean inSync() {
return currentBlock == highestBlock;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
*/
package tech.pegasys.pantheon.ethereum.core;

import tech.pegasys.pantheon.plugin.data.SyncStatus;
import tech.pegasys.pantheon.plugin.services.PantheonEvents;

import java.util.Optional;

/** Provides an interface to block synchronization processes. */
Expand All @@ -27,12 +30,7 @@ public interface Synchronizer {
*/
Optional<SyncStatus> getSyncStatus();

long observeSyncStatus(final SyncStatusListener listener);
long observeSyncStatus(final PantheonEvents.SyncStatusListener listener);

boolean removeObserver(long observerId);

@FunctionalInterface
interface SyncStatusListener {
void onSyncStatus(final SyncStatus status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import static com.google.common.base.Preconditions.checkNotNull;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastDownloaderFactory;
Expand All @@ -29,7 +28,9 @@
import tech.pegasys.pantheon.ethereum.worldstate.Pruner;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.PantheonMetricCategory;
import tech.pegasys.pantheon.plugin.data.SyncStatus;
import tech.pegasys.pantheon.plugin.services.MetricsSystem;
import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.Subscribers;

Expand Down Expand Up @@ -176,10 +177,11 @@ public Optional<SyncStatus> getSyncStatus() {
if (!running.get()) {
return Optional.empty();
}
if (syncState.syncStatus().getCurrentBlock() == syncState.syncStatus().getHighestBlock()) {
final SyncStatus syncStatus = syncState.syncStatus();
if (syncStatus.inSync()) {
return Optional.empty();
}
return Optional.of(syncState.syncStatus());
return Optional.of(syncStatus);
}

@Override
Expand All @@ -194,6 +196,6 @@ public boolean removeObserver(final long observerId) {
}

private void syncStatusCallback(final SyncStatus status) {
syncStatusListeners.forEach(c -> c.onSyncStatus(status));
syncStatusListeners.forEach(c -> c.onSyncStatusChanged(status));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
import tech.pegasys.pantheon.ethereum.chain.ChainHead;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener;
import tech.pegasys.pantheon.util.Subscribers;

import java.util.Optional;

import com.google.common.annotations.VisibleForTesting;

public class SyncState {
private static final long SYNC_TOLERANCE = 5;
private final Blockchain blockchain;
Expand All @@ -49,17 +51,22 @@ public SyncState(final Blockchain blockchain, final EthPeers ethPeers) {
});
}

private void publishSyncStatus() {
@VisibleForTesting
public void publishSyncStatus() {
final SyncStatus syncStatus = syncStatus();
syncStatusListeners.forEach(c -> c.onSyncStatus(syncStatus));
syncStatusListeners.forEach(c -> c.onSyncStatusChanged(syncStatus));
}

public void addInSyncListener(final InSyncListener observer) {
inSyncListeners.subscribe(observer);
}

public void addSyncStatusListener(final SyncStatusListener observer) {
syncStatusListeners.subscribe(observer);
public long addSyncStatusListener(final SyncStatusListener observer) {
return syncStatusListeners.subscribe(observer);
}

public void removeSyncStatusListener(final long listenerId) {
syncStatusListeners.unsubscribe(listenerId);
}

public SyncStatus syncStatus() {
Expand Down Expand Up @@ -141,10 +148,10 @@ public long bestChainHeight(final long localChainHeight) {
}

private synchronized void checkInSync() {
final boolean currentSyncStatus = isInSync();
if (lastInSync != currentSyncStatus) {
lastInSync = currentSyncStatus;
inSyncListeners.forEach(c -> c.onSyncStatusChanged(currentSyncStatus));
final boolean currentInSync = isInSync();
if (lastInSync != currentInSync) {
lastInSync = currentInSync;
inSyncListeners.forEach(c -> c.onSyncStatusChanged(currentInSync));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener;
import tech.pegasys.pantheon.ethereum.eth.manager.ChainState;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.util.Collections;
Expand Down Expand Up @@ -237,7 +237,7 @@ public void shouldSendSyncStatusWhenBlockIsAddedToTheChain() {
new BlockBody(Collections.emptyList(), Collections.emptyList()))),
blockchain);

verify(syncStatusListener).onSyncStatus(eq(syncState.syncStatus()));
verify(syncStatusListener).onSyncStatusChanged(eq(syncState.syncStatus()));
}

private void setupOutOfSyncState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import tech.pegasys.pantheon.ethereum.core.Account;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.core.WorldState;
Expand All @@ -38,6 +37,7 @@
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.Capability;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.rlp.RLPException;
import tech.pegasys.pantheon.plugin.data.SyncStatus;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.graphql.internal.pojoadapter;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.plugin.data.SyncStatus;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
*/
package tech.pegasys.pantheon.ethereum.jsonrpc.health;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.jsonrpc.health.HealthService.HealthCheck;
import tech.pegasys.pantheon.ethereum.jsonrpc.health.HealthService.ParamSource;
import tech.pegasys.pantheon.ethereum.p2p.network.P2PNetwork;
import tech.pegasys.pantheon.plugin.data.SyncStatus;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.results;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.plugin.data.SyncStatus;

import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

/**
* The SubscriptionManager is responsible for managing subscriptions and sending messages to the
* clients that have an active subscription subscription.
* clients that have an active subscription.
*/
public class SubscriptionManager extends AbstractVerticle {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
*/
package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.syncing;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.Subscription;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;
import tech.pegasys.pantheon.plugin.data.SyncStatus;

public class SyncingSubscriptionService {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.jsonrpc.health.HealthService.ParamSource;
import tech.pegasys.pantheon.ethereum.p2p.network.P2PNetwork;
import tech.pegasys.pantheon.plugin.data.SyncStatus;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -141,6 +141,7 @@ public void shouldNotBeReadyWhenCustomMaxBlocksBehindIsInvalid() {
}

private Optional<SyncStatus> createSyncStatus(final int currentBlock, final int highestBlock) {
return Optional.of(new SyncStatus(0, currentBlock, highestBlock));
return Optional.of(
new tech.pegasys.pantheon.ethereum.core.SyncStatus(0, currentBlock, highestBlock));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.JsonRpcRequest;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcResponse;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcSuccessResponse;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult;
import tech.pegasys.pantheon.plugin.data.SyncStatus;

import java.util.Optional;

Expand Down Expand Up @@ -66,7 +66,8 @@ public void shouldReturnFalseWhenSyncStatusIsEmpty() {
@Test
public void shouldReturnExpectedValueWhenSyncStatusIsNotEmpty() {
final JsonRpcRequest request = requestWithParams();
final SyncStatus expectedSyncStatus = new SyncStatus(0, 1, 2);
final SyncStatus expectedSyncStatus =
new tech.pegasys.pantheon.ethereum.core.SyncStatus(0, 1, 2);
final JsonRpcResponse expectedResponse =
new JsonRpcSuccessResponse(request.getId(), new SyncingResult(expectedSyncStatus));
final Optional<SyncStatus> optionalSyncStatus = Optional.of(expectedSyncStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;
import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void shouldSendSyncStatusWhenReceiveSyncStatus() {
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatus(syncStatus);
syncStatusListener.onSyncStatusChanged(syncStatus);

verify(subscriptionManager)
.sendMessage(eq(subscription.getSubscriptionId()), eq(expectedSyncingResult));
Expand All @@ -91,7 +91,7 @@ public void shouldSendNotSyncingStatusWhenReceiveSyncStatusAtHead() {
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatus(syncStatus);
syncStatusListener.onSyncStatusChanged(syncStatus);

verify(subscriptionManager)
.sendMessage(eq(subscription.getSubscriptionId()), any(NotSynchronisingResult.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

import static com.google.common.base.Preconditions.checkNotNull;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.p2p.peers.EnodeURL;
import tech.pegasys.pantheon.ethereum.permissioning.node.NodePermissioningProvider;
import tech.pegasys.pantheon.metrics.PantheonMetricCategory;
import tech.pegasys.pantheon.plugin.data.SyncStatus;
import tech.pegasys.pantheon.plugin.services.MetricsSystem;
import tech.pegasys.pantheon.plugin.services.metrics.Counter;

Expand Down
Loading

0 comments on commit b454627

Please sign in to comment.