we-tx-observer provides a convenient way of handling and tracking transactions in the Waves Enterprise blockchain network. It uses a persistent queue to store transactions from WE blocks. The transactions are further passed to handlers configured in the application.
Note: Only PostgreSQL is supported as the underlying database.
The module consists of submodules:
- api - contains abstractions to be used in client code;
- core-spring - provides default implementations for abstractions from the api module;
- domain - observer domain entity models;
- jpa - spring configurations and repositories for working with database;
- starter - spring starter for we-tx-observer includes configuration of all main beans for correct operation;
To start using we-tx-observer you should:
we-tx-observer uses the we-node-client to interact with node, so it is necessary that the NodeBlockingServiceFactory
bean is present in the context.
For more information, see the documentation of the we-node-client and we-sdk-spring (we-starter-node-client) projects:
If you are interested only in handling only specific transactions you should provide a TxEnqueuePredicate
This is an interface with a single isEnqueued(Tx)
method. It is designed to filter transactions that appear in the persistent queue for future handling.
Note: You can omit adding this bean and filter transactions later in your handlers but in that case redundant transaction would be put to persistent queue leading to unnecessary INSERTS and DELETES to the DB.
An example of TxEnqueuePredicate: Kotlin:
class TxEnqueuePredicateImpl(
val txService: TxService,
val contractId: String,
) : TxEnqueuePredicate {
// overridden method
override fun isEnqueued(tx: Tx): Boolean = when (tx) {
// filtering a policy transaction needs to be processed by its policy name
is PolicyDataHashTx -> filterForPolicyDataHashTx(tx)
// filtering transactions for a specific contract by id
is ExecutedContractTx -> tx.contractId().base58ContractId == contractId.base58ContractId
else -> false
private fun filterForPolicyDataHashTx(tx: PolicyDataHashTx): Boolean {
val policyTx = txService.txInfo(tx.policyId.txId).get().tx as CreatePolicyTx
return filterForPolicyTx(policyTx)
private fun filterForPolicyTx(policyTx: CreatePolicyTx) =
class TxEnqueuePredicateImpl implements TxEnqueuePredicate {
private TxService txService;
private String contractId;
// overridden method
public boolean isEnqueued(Tx tx) {
return switch (tx) {
// filtering a policy transaction needs to be processed by its policy name
case tx instanceof PolicyDataHashTx -> filterForPolicyDataHashTx(tx);
// filtering transactions for a specific contract by id
case tx instanceof ExecutedContractTx -> tx.contractId().base58ContractId == contractId.base58ContractId;
private boolean filterForPolicyDataHashTx(PolicyDataHashTx tx) {
CreatePolicyTx policyTx = (CreatePolicyTx) txService.txInfo(tx.getPolicyId().getTxId()).get().tx;
return filterForPolicyTx(policyTx);
private boolean filterForPolicyTx(CreatePolicyTx policyTx) {
return policyTx.getPolicyName().getValue().startsWith("POLICY_NAME");
Specifies the partition for the received transaction (defaultPartitionId
by default).
Used to efficiently distribute transactions across partitions.
By default, all transactions will fall into one partition - defaultPartitionId
Implementation example of PartitionResolver
class UserPartitionResolver(
private val txService: TxService,
) : TxQueuePartitionResolver {
// overridden method
override fun resolvePartitionId(tx: Tx): String? = when (tx) {
is PolicyDataHashTx -> resolveForPolicyDataHashTx(tx)
is ExecutedContractTx -> resolveForContractTx(tx)
else -> null
// Looks for the required key in the state and removes the mapping name. The remaining value will be the partition ID.
// If it does not find it, then the partition for this transaction will be equal to defaultPartitionId
private fun resolveForContractTx(executedContractTx: ExecutedContractTx): String? =
executedContractTx.results.map { param -> param.key.value }
.find { paramKey -> paramKey.startsWith("OBJECT_") }
// Searches for the policy creation transaction to obtain its name.
// If the prefix matches the one you are looking for, the remaining value will be the partition id.
private fun resolveForPolicyDataHashTx(policyDataHashTx: PolicyDataHashTx): String? {
val createPolicyTx = txService.txInfo(policyDataHashTx.policyId.txId).get().tx as CreatePolicyTx
return createPolicyTx.policyName.value
.takeIf { policyName -> policyName.startsWith("OBJECT_") }
class UserPartitionResolver implements TxQueuePartitionResolver {
private TxService txService;
// overridden method
String resolvePartitionId(Tx tx) {
return switch (tx) {
case tx instanceof PolicyDataHashTx -> resolveForPolicyDataHashTx(tx);
case tx instanceof ExecutedContractTx -> resolveForContractTx(tx);
// Looks for the required key in the state and removes the mapping name. The remaining value will be the partition ID.
// If it does not find it, then the partition for this transaction will be equal to defaultPartitionId
private String resolveForContractTx(ExecutedContractTx executedContractTx) {
List<String> list = executedContractTx.getResults().stream()
.filter(it -> it.getKey().getValue().startsWith("OBJECT_"))
.map(it -> it.getKey().getValue())
if (!list.isEmpty()) {
return list.get(0).replaceAll("OBJECT_", "");
} else {
return null;
// Searches for the policy creation transaction to obtain its name.
// If the prefix matches the one you are looking for, the remaining value will be the partition id.
private String resolveForPolicyDataHashTx(PolicyDataHashTx policyDataHashTx) {
CreatePolicyTx createPolicyTx = (CreatePolicyTx) txService.txInfo(policyDataHashTx.getPolicyId.getTxId).get().getTx();
String policyName = createPolicyTx.getPolicyName().getValue();
if (policyName.startsWith("OBJECT_")) {
return policyName.replaceAll("OBJECT_", "");
} else {
return null;
Configurer for basic observer components. Allows you to define all bean-components through one bean:
- TxQueuePartitionResolver;
- TxEnqueuePredicate (default implementation is
); - ObjectMapper;
- PrivateContentResolver;
It can be setup using TxObserverConfigurerBuilder or Kotlin DSL.
fun txObserverConfigurer(): TxObserverConfigurer =
.types(TX_TYPE_3, TX_TYPE_4)
fun txObserverConfigurer(): TxObserverConfigurer = observerConfigurer {
partitionResolver = customPartitionResolver
privateContentResolver = customPrivateContentResolver
predicates {
types(TX_TYPE_3, TX_TYPE_4)
TxObserverConfigurer txObserverConfigurer() {
return new TxObserverConfigurerBuilder()
After identifying filters for transactions, you need to write code for the listener that these filtered transactions will fall into.
KeyFilter - annotation filters the KeyEvent by the keys from the state. It has two optional parameters for filtering:
- the key is in the form of a regular expression;keyPrefix
- the key in the form of a prefix or a full string.
PolicyFilter - annotation which filters privacy data by policy name. It has two optional parameters for filtering:
- the key is in the form of a regular expression;namePrefix
- the key in the form of a prefix or a full string.
MessageFilter - annotation which filters privacy data by comment field when sending privacy/sendData
in json format.
It has three optional parameters for filtering:
- the key in the form of a key of json;metaKeyValue
- the key in the form of a value of json;metaKeyValueRegExp
- the key in the form of a regular expression in json;
Note: Message meta
is derived from privacy data comment parsed as json (SendDataRequest.info.comment
If the comment can't be parsed as JSON than this filter will fail.
MessageFilters - annotation containing an array of MessageFilter
It has a single field:
- array ofMessageFilter
class ExampleListener {
fun keyEventMyContract(
@KeyFilter(keyPrefix = "EXAMPLES_") keyEvent: KeyEvent<String>, // filter for state keys
) {
// do something with the received data
fun onPrivacyContainer(
@PolicyFilter(namePrefix = "POLICY_NAME_") // filter by policy name
@MessageFilter(metaKey = "key", metaKeyValue = "value") // additional filtering parameter by the comment field
privateDataEvent: PrivateDataEvent<String>,
) {
// payload from PrivateDataEvent parameterized
// do something with the received private data
fun onUpdatePolicyTx(
updatePolicyTx: UpdatePolicyTx,
) {
// transaction is filtered by type
// do something with the received UpdatePolicyTx
class ExampleListener {
void keyEventMyContract(
@KeyFilter(keyPrefix = "EXAMPLES_") KeyEvent<String> keyEvent // filter for state keys
) {
// do something with the received data
void onPrivacyContainer(
@PolicyFilter(namePrefix = "POLICY_NAME_") // filter by policy name
@MessageFilter(metaKey = "key", metaKeyValue = "value") // additional filtering parameter by the comment field
PrivateDataEvent<String> privateDataEvent
) {
// payload from PrivateDataEvent parameterized
// do something with the received private data
void onUpdatePolicyTx(UpdatePolicyTx updatePolicyTx) {
// transaction is filtered by type
// do something with the received UpdatePolicyTx
Method keyEventMyContract(KeyEvent<String> keyEvent)
will receive data from the 105 tx after it has been mined.
In this transaction the contract method call(string)
is called and a value with the key EXAMPLES_
is added to the state.
"tx": {
"senderPublicKey": "4qJpHUz8Y6vV5N21JZgUxAB3nNk5GU19SQZbC44fhSNbWJAb3afkD6ARAYAvkHgnb8CJWkmTuVDuc2NpximYVcva",
"fee": 0,
"type": 104,
"params": [
"type": "string",
"value": "call",
"key": "action"
"type": "string",
"value": "test",
"key": "string"
"version": 4,
"contractVersion": 1,
"sender": "3NpkC1FSW9xNfmAMuhRSRArLgnfyGyEry7w",
"feeAssetId": null,
"proofs": [],
"contractId": "Dgk1hR7xRnDT1KJreaXCVtZLrnd5LJ8uUYtoZyQrV1LJ",
"id": "7GExnxgASNMrveEmpHFVAkiHmjbwRj4ediwbE6imJyCo",
"timestamp": 1644263008599
"sender": "3QQAXZAnJ8ppqekcgkoLqVNJEvJ4D8kjbVK",
"proofs": [],
"fee": 0,
"id": "HerQyfkH4ui2K6RbHdxG2tFGnjLjiK4RveMu6CXXpp6E",
"type": 105,
"version": 1,
"results": [
"type": "string",
"value": "{\"example\":\"value\"}",
"key": "EXAMPLES_7GExnxgASNMrveEmpHFVAkiHmjbwRj4ediwbE6imJyCo"
"timestamp": 1644263019011,
"height": 9198547
Method onPrivacyContainer(PrivateDataEvent<String> privateDataEvent)
can handle private data which has been sent using privacy/sendData
"sender": "3HYW75PpAeVukmbYo9PQ3mzSHdKUgEytUUz",
"password": "apgJP9atQccdBPA",
"policyId": "4gZnJvbSBvdGhlciBhbmltYWxzLCB3aGljaC",
"info": {
"filename":"Service contract #100/5.doc",
"size": 2048,
"timestamp": 1000000000,
"author": "[email protected]",
"comment": {
"key": "value"
"data": "TWFuIGlzIGRpc3Rpbmd1aXNoZWQsIG5vdCBvbmx5IGJ5IGhpcyByZWFzb24sIGJ1dCBieSB0aGlzIHNpbmd1bGFyIHBhc3Npb24gZnJvbSBvdGhlciBhbmltYWxzLCB3aGljaCBpcyBhIGx1c3Qgb2YgdGhlIG1pbmQsIHRoYXQgYnkgYSBwZXJzZXZlcmFuY2Ugb2YgZGVsaWdodCBpbiB0aGUgY29udGludWVkIGFuZCBpbmRlZmF0aWdhYmxlIGdlbmVyYXRpb24gb2Yga25vd2xlZGdlLCBleGNlZWRzIHRoZSBzaG9ydCB2ZWhlbWVuY2Ugb2YgYW55IGNhcm5hbCBwbGVhc3VyZS4=",
"hash": "FRog42mnzTA292ukng6PHoEK9Mpx9GZNrEHecfvpwmta",
"broadcast": false
we-tx-observer-starter comes with sane defaults that perform well in most deployments without additional tweaking. However if necessary, it has flexible settings:
Parameter responsible for enabling the observer logic. Default: true
Queue implementation (at the current moment only JPA is available). Default: JPA
Delay for the job which extract transactions to the queue (syncs the blocks). Default: 2000
Max sum for the size of blocks being read at once in DataUnits. Sum of the BlockSize
parameters in the node's response. Default: 2MB
Height of the blockchain from which the blocks will be synced.
It is useful if there is no need to re-read the entire blockchain, e.g. when starting a new app on the existing network.
Usually is specified as the height of the block containing 103 transaction which creates the application's smart contract. Default: 1
Node alias (name) for writing in block_height_info
table. Default: node.
Step for the upper bound of block range requested. Is used only if sum of the size is less than block-size-window
Property that enables using the activation-height
. If set to false than blocks will be synced from
the node's current block height. Default: true.
How much previous blocks should be saved to persistent buffer for resolving of potential fork.
How many blocks to go back in case no common block found in history when handling a fork.
Time (in millisecond) after which the block_history
table will be cleaned. Default: 1800000
Delay (in milliseconds) to wait for the mining of a key block. Default: 200
Checking the height in the background in the table on block_height and on the node.
If the height in the table is greater, it will be reset to the height of the node (provided enabled). Default: false.
Priority offset for partitions after which transactions with the NEW status can become POSTPONED from EnqueuedTxController.postponeErrors()
Default: 100
Default String Id value for partition. This is set as TxQueuePartition.id
if no bean of type TxQueuePartitionResolver
is provided or its resolve method returns null
. Default: defaultPartitionId
Enables ShedLock for having only a single instance per Job when running in a multi-tenant environment.
The implementation used is net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider
. Default: true
The minimum time for which the method will be blocked for other instances (in milliseconds) (This is a Schedlock specific property). Default: 0.
The maximum time for which the method will be blocked for other instances (in milliseconds) (This is a Schedlock specific property). Default: 10000.
Settings for cleaning up the enqueued_tx
Default: true.
Offset for the height of cleaned transactions. Default: 100.
Batch size used when cleaning the table. Default: 100.
Cron expression to configure the cleaning job. Default: 0 0/5 * * * ? (EVERY 5 MINUTES)
Settings for the job that checks privacy availability.
Default: true.
Rate for checking (in milliseconds). Default: 500ms.
Quantity of threads to perform checks. Default: 3
Limit for old EnqueuedTx records in the checked batch. Default: 25
Limit for recent EnqueuedTx records in the checked batch. Default: 10
An example of the observer configuration in application.yml looks like this:
enabled: true
queueMode: JPA
fixedDelay: 2000
blockSizeWindow: 10MB
activation-height: 1
blockHeightWindow: 99
syncHistory: true
blockHistoryDepth: 100
forkNotResolvedHeightDrop: 10
blockHistoryCleanDelay: 1800000
liquidBlockPollingDelay: 200
autoResetHeight: false
errorPriorityOffset: 10
defaultPartitionId: defaultPartitionId
lockEnabled: true
lockAtLeast: 0
lockAtMost: 10000
enabled: true
archiveHeightWindow: 100
deleteBatchSize: 100
cleanCronExpression: 0 0/5 * * * ?
lockAtLeast: 0
lockAtMost: 10000
enabled: true
fixedDelay: 500ms
threadCount: 3
limitForOld: 25
limitForRecent: 10
If you don't specify anything than the default values will be used.