-
Notifications
You must be signed in to change notification settings - Fork 87
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add blockfrost mode to hydra-chain-observer (#1631)
<!-- Describe your change here --> 🥶 Added **Blockfrost Mode** to `hydra-chain-observer`. 🥶 The *network id* and *block time* are derived from the configured `BLOCKFROST_TOKEN_PATH`. 🥶 Implemented a naive roll-forward approach: - 🧊 We start following the chain from a given block hash or the tip (latest block). - 🧊 We check if the current block is within the safe zone to be processed, using the "number of block confirmations" > Based on some [reference](https://cardano.stackexchange.com/questions/8760/what-is-your-comfort-level-for-number-of-confirmations-and-why) from a not-so-stranger on the internet. - 🧊 From the transaction hashes of the block, we fetch the transactions in CBOR representations. - 🧊 We then deserialise them into Cardano API transactions, allowing us to collect head observations by reusing existing code. - 🧊 Finally, using the next block hash information from the block, we repeat the process. 🥶 Note: If any "retriable error" occurs during roll-forward, we wait based on the known *block time* before restarting using latest known fetched block and UTxO view (collected observations). --- <!-- Consider each and tick it off one way or the other --> * [x] CHANGELOG updated or not needed * [x] Documentation updated or not needed * [x] Haddocks updated or not needed * [x] No new TODOs introduced or explained herafter --------- Co-authored-by: Sebastian Nagel <[email protected]> Co-authored-by: Noon <[email protected]>
- Loading branch information
1 parent
d51977e
commit 3bd4fc3
Showing
21 changed files
with
742 additions
and
280 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,37 @@ | ||
# Hydra Chain Observer | ||
|
||
A small executable which connects to a chain like the `hydra-node`, but puts any | ||
observations as traces onto `stdout`. | ||
A lightweight executable designed to connect to a blockchain, such as the `hydra-node`, and streams chain observations as traces to `stdout`. | ||
It supports two modes of operation: **Direct** connection to a node via socket, and connection through **Blockfrost** API. | ||
|
||
To run, pass a `--node-socket`, corresponding network id and optionally | ||
`--start-chain-from`. For example: | ||
## Direct Mode | ||
|
||
To run the observer in Direct Mode, provide the following arguments: | ||
- `--node-socket`: path to the node socket file. | ||
- network id: `--testnet-magic` (with magic number) for the testnet or `--mainnet` for the mainnet. | ||
- (optional) `--start-chain-from`: specify a chain point (SLOT.HEADER_HASH) to start observing from. | ||
|
||
For example: | ||
|
||
``` shell | ||
hydra-chain-observer \ | ||
hydra-chain-observer direct \ | ||
--node-socket testnets/preprod/node.socket \ | ||
--testnet-magic 1 \ | ||
--start-chain-from "41948777.5d34af0f42be9823ebd35c2d83d5d879c5615ac17f7158bb9aa4ef89072455a7" | ||
``` | ||
|
||
|
||
## Blockfrost Mode | ||
|
||
To run the observer in Blockfrost Mode, provide the following arguments: | ||
- `--project-path`: file path to your Blockfrost project API token hash. | ||
> expected to be prefixed with environment (e.g. testnetA3C2E...) | ||
- (optional) `--start-chain-from`: specify a chain point (SLOT.HEADER_HASH) to start observing from. | ||
|
||
For example: | ||
|
||
``` shell | ||
hydra-chain-observer blockfrost \ | ||
--project-path $PROJECT_TOKEN_HASH_PATH \ | ||
--start-chain-from "41948777.5d34af0f42be9823ebd35c2d83d5d879c5615ac17f7158bb9aa4ef89072455a7" | ||
``` | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
227 changes: 227 additions & 0 deletions
227
hydra-chain-observer/src/Hydra/Blockfrost/ChainObserver.hs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,227 @@ | ||
{-# LANGUAGE DuplicateRecordFields #-} | ||
|
||
module Hydra.Blockfrost.ChainObserver where | ||
|
||
import Hydra.Prelude | ||
|
||
import Blockfrost.Client ( | ||
BlockfrostClientT, | ||
runBlockfrost, | ||
) | ||
import Blockfrost.Client qualified as Blockfrost | ||
import Control.Concurrent.Class.MonadSTM ( | ||
MonadSTM (readTVarIO), | ||
newTVarIO, | ||
writeTVar, | ||
) | ||
import Control.Retry (constantDelay, retrying) | ||
import Data.ByteString.Base16 qualified as Base16 | ||
import Hydra.Cardano.Api ( | ||
ChainPoint (..), | ||
HasTypeProxy (..), | ||
Hash, | ||
NetworkId (..), | ||
NetworkMagic (..), | ||
SerialiseAsCBOR (..), | ||
SlotNo (..), | ||
Tx, | ||
UTxO, | ||
serialiseToRawBytes, | ||
) | ||
import Hydra.Cardano.Api.Prelude ( | ||
BlockHeader (..), | ||
) | ||
import Hydra.Chain.Direct.Handlers (convertObservation) | ||
import Hydra.ChainObserver.NodeClient ( | ||
ChainObservation (..), | ||
ChainObserverLog (..), | ||
NodeClient (..), | ||
ObserverHandler, | ||
logOnChainTx, | ||
observeAll, | ||
) | ||
import Hydra.Logging (Tracer, traceWith) | ||
import Hydra.Tx (IsTx (..)) | ||
|
||
data APIBlockfrostError | ||
= BlockfrostError Text | ||
| DecodeError Text | ||
| NotEnoughBlockConfirmations Blockfrost.BlockHash | ||
| MissingBlockNo Blockfrost.BlockHash | ||
| MissingNextBlockHash Blockfrost.BlockHash | ||
deriving (Show, Exception) | ||
|
||
runBlockfrostM :: | ||
(MonadIO m, MonadThrow m) => | ||
Blockfrost.Project -> | ||
BlockfrostClientT IO a -> | ||
m a | ||
runBlockfrostM prj action = do | ||
result <- liftIO $ runBlockfrost prj action | ||
case result of | ||
Left err -> throwIO (BlockfrostError $ show err) | ||
Right val -> pure val | ||
|
||
blockfrostClient :: | ||
Tracer IO ChainObserverLog -> | ||
FilePath -> | ||
Integer -> | ||
NodeClient IO | ||
blockfrostClient tracer projectPath blockConfirmations = do | ||
NodeClient | ||
{ follow = \startChainFrom observerHandler -> do | ||
prj <- Blockfrost.projectFromFile projectPath | ||
|
||
Blockfrost.Block{_blockHash = (Blockfrost.BlockHash genesisBlockHash)} <- | ||
runBlockfrostM prj (Blockfrost.getBlock (Left 0)) | ||
|
||
Blockfrost.Genesis | ||
{ _genesisActiveSlotsCoefficient | ||
, _genesisSlotLength | ||
, _genesisNetworkMagic | ||
} <- | ||
runBlockfrostM prj Blockfrost.getLedgerGenesis | ||
|
||
let networkId = fromNetworkMagic _genesisNetworkMagic | ||
traceWith tracer ConnectingToExternalNode{networkId} | ||
|
||
chainPoint <- | ||
case startChainFrom of | ||
Just point -> pure point | ||
Nothing -> do | ||
toChainPoint <$> runBlockfrostM prj Blockfrost.getLatestBlock | ||
|
||
traceWith tracer StartObservingFrom{chainPoint} | ||
|
||
let blockTime = realToFrac _genesisSlotLength / realToFrac _genesisActiveSlotsCoefficient | ||
|
||
let blockHash = fromChainPoint chainPoint genesisBlockHash | ||
|
||
stateTVar <- newTVarIO (blockHash, mempty) | ||
void $ | ||
retrying (retryPolicy blockTime) shouldRetry $ \_ -> do | ||
loop tracer prj networkId blockTime observerHandler blockConfirmations stateTVar | ||
`catch` \(ex :: APIBlockfrostError) -> | ||
pure $ Left ex | ||
} | ||
where | ||
shouldRetry _ = \case | ||
Right{} -> pure False | ||
Left err -> pure $ isRetryable err | ||
|
||
retryPolicy blockTime = constantDelay (truncate blockTime * 1000 * 1000) | ||
|
||
-- | Iterative process that follows the chain using a naive roll-forward approach, | ||
-- keeping track of the latest known current block and UTxO view. | ||
-- This process operates at full speed without waiting between calls, | ||
-- favoring the catch-up process. | ||
loop :: | ||
(MonadIO m, MonadThrow m, MonadSTM m) => | ||
Tracer m ChainObserverLog -> | ||
Blockfrost.Project -> | ||
NetworkId -> | ||
DiffTime -> | ||
ObserverHandler m -> | ||
Integer -> | ||
TVar m (Blockfrost.BlockHash, UTxO) -> | ||
m a | ||
loop tracer prj networkId blockTime observerHandler blockConfirmations stateTVar = do | ||
current <- readTVarIO stateTVar | ||
next <- rollForward tracer prj networkId observerHandler blockConfirmations current | ||
atomically $ writeTVar stateTVar next | ||
loop tracer prj networkId blockTime observerHandler blockConfirmations stateTVar | ||
|
||
-- | From the current block and UTxO view, we collect Hydra observations | ||
-- and yield the next block and adjusted UTxO view. | ||
rollForward :: | ||
(MonadIO m, MonadThrow m) => | ||
Tracer m ChainObserverLog -> | ||
Blockfrost.Project -> | ||
NetworkId -> | ||
ObserverHandler m -> | ||
Integer -> | ||
(Blockfrost.BlockHash, UTxO) -> | ||
m (Blockfrost.BlockHash, UTxO) | ||
rollForward tracer prj networkId observerHandler blockConfirmations (blockHash, utxo) = do | ||
block@Blockfrost.Block | ||
{ _blockHash | ||
, _blockConfirmations | ||
, _blockNextBlock | ||
, _blockHeight | ||
} <- | ||
runBlockfrostM prj $ Blockfrost.getBlock (Right blockHash) | ||
|
||
-- Check if block within the safe zone to be processes | ||
when (_blockConfirmations < blockConfirmations) $ | ||
throwIO (NotEnoughBlockConfirmations _blockHash) | ||
|
||
-- Check if block contains a reference to its next | ||
nextBlockHash <- maybe (throwIO $ MissingNextBlockHash _blockHash) pure _blockNextBlock | ||
|
||
-- Search block transactions | ||
txHashes <- runBlockfrostM prj . Blockfrost.allPages $ \p -> | ||
Blockfrost.getBlockTxs' (Right _blockHash) p Blockfrost.def | ||
|
||
-- Collect CBOR representations | ||
cborTxs <- traverse (runBlockfrostM prj . Blockfrost.getTxCBOR) txHashes | ||
|
||
-- Convert to cardano-api Tx | ||
receivedTxs <- mapM toTx cborTxs | ||
let receivedTxIds = txId <$> receivedTxs | ||
let point = toChainPoint block | ||
traceWith tracer RollForward{point, receivedTxIds} | ||
|
||
-- Collect head observations | ||
let (adjustedUTxO, observations) = observeAll networkId utxo receivedTxs | ||
let onChainTxs = mapMaybe convertObservation observations | ||
forM_ onChainTxs (traceWith tracer . logOnChainTx) | ||
|
||
blockNo <- maybe (throwIO $ MissingBlockNo _blockHash) (pure . fromInteger) _blockHeight | ||
let observationsAt = HeadObservation point blockNo <$> onChainTxs | ||
|
||
-- Call observer handler | ||
observerHandler $ | ||
if null observationsAt | ||
then [Tick point blockNo] | ||
else observationsAt | ||
|
||
-- Next | ||
pure (nextBlockHash, adjustedUTxO) | ||
|
||
-- * Helpers | ||
|
||
isRetryable :: APIBlockfrostError -> Bool | ||
isRetryable (BlockfrostError _) = True | ||
isRetryable (DecodeError _) = False | ||
isRetryable (NotEnoughBlockConfirmations _) = True | ||
isRetryable (MissingBlockNo _) = True | ||
isRetryable (MissingNextBlockHash _) = True | ||
|
||
toChainPoint :: Blockfrost.Block -> ChainPoint | ||
toChainPoint Blockfrost.Block{_blockSlot, _blockHash} = | ||
ChainPoint slotNo headerHash | ||
where | ||
slotNo :: SlotNo | ||
slotNo = maybe 0 (fromInteger . Blockfrost.unSlot) _blockSlot | ||
|
||
headerHash :: Hash BlockHeader | ||
headerHash = fromString . toString $ Blockfrost.unBlockHash _blockHash | ||
|
||
fromNetworkMagic :: Integer -> NetworkId | ||
fromNetworkMagic = \case | ||
0 -> Mainnet | ||
magicNbr -> Testnet (NetworkMagic (fromInteger magicNbr)) | ||
|
||
toTx :: MonadThrow m => Blockfrost.TransactionCBOR -> m Tx | ||
toTx (Blockfrost.TransactionCBOR txCbor) = | ||
case decodeBase16 txCbor of | ||
Left decodeErr -> throwIO . DecodeError $ "Bad Base16 Tx CBOR: " <> decodeErr | ||
Right bytes -> | ||
case deserialiseFromCBOR (proxyToAsType (Proxy @Tx)) bytes of | ||
Left deserializeErr -> throwIO . DecodeError $ "Bad Tx CBOR: " <> show deserializeErr | ||
Right tx -> pure tx | ||
|
||
fromChainPoint :: ChainPoint -> Text -> Blockfrost.BlockHash | ||
fromChainPoint chainPoint genesisBlockHash = case chainPoint of | ||
ChainPoint _ headerHash -> Blockfrost.BlockHash (decodeUtf8 . Base16.encode . serialiseToRawBytes $ headerHash) | ||
ChainPointAtGenesis -> Blockfrost.BlockHash genesisBlockHash |
Oops, something went wrong.