From 8aa93dd0788dd66fd417c3468a8807d4f6ba18df Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Thu, 14 Mar 2024 23:24:31 +0100 Subject: [PATCH] internal/server/ship_processor.go: break processBlock into smaller functions. --- internal/server/helpers.go | 25 ++++ internal/server/ship_processor.go | 232 +++++++++++++++--------------- 2 files changed, 139 insertions(+), 118 deletions(-) create mode 100644 internal/server/helpers.go diff --git a/internal/server/helpers.go b/internal/server/helpers.go new file mode 100644 index 0000000..f7ce531 --- /dev/null +++ b/internal/server/helpers.go @@ -0,0 +1,25 @@ +package server + +import "github.com/eoscanada/eos-go/ship" + +// convert a ActionTrace to ActionTraceV1 +func toActionTraceV1(trace *ship.ActionTrace) *ship.ActionTraceV1 { + if trace_v0, ok := trace.Impl.(*ship.ActionTraceV0); ok { + // convert to v1 + return &ship.ActionTraceV1{ + ActionOrdinal: trace_v0.ActionOrdinal, + CreatorActionOrdinal: trace_v0.CreatorActionOrdinal, + Receipt: trace_v0.Receipt, + Receiver: trace_v0.Receiver, + Act: trace_v0.Act, + ContextFree: trace_v0.ContextFree, + Elapsed: trace_v0.Elapsed, + Console: trace_v0.Console, + AccountRamDeltas: trace_v0.AccountRamDeltas, + Except: trace_v0.Except, + ErrorCode: trace_v0.ErrorCode, + ReturnValue: []byte{}, + } + } + return trace.Impl.(*ship.ActionTraceV1) +} diff --git a/internal/server/ship_processor.go b/internal/server/ship_processor.go index eac7a1f..aa4017a 100644 --- a/internal/server/ship_processor.go +++ b/internal/server/ship_processor.go @@ -142,6 +142,119 @@ func (processor *ShipProcessor) GetCurrentBlock() uint32 { return processor.state.CurrentBlock } +func (processor *ShipProcessor) broadcastAction(act *message.ActionTrace) { + payload, err := processor.encode(*act) + if err != nil { + log.WithField("act", act).Warn("failed to encode action") + return + } + + channels := []api.Channel{ + api.ActionChannel{}.Channel(), + api.ActionChannel{Name: act.Name}.Channel(), + api.ActionChannel{Contract: act.Contract}.Channel(), + api.ActionChannel{Name: act.Name, Contract: act.Contract}.Channel(), + } + + for _, channel := range channels { + processor.queueMessage(channel, payload) + } +} + +func (processor *ShipProcessor) processTransactionTrace(log *log.Entry, block *ship.SignedBlockBytes, trace *ship.TransactionTraceV0) { + logger := log.WithField("type", "trace").WithField("tx_id", trace.ID.String()).Dup() + + transaction := message.TransactionTrace{ + ID: trace.ID.String(), + BlockNum: block.BlockNumber(), + Timestamp: block.Timestamp.UTC(), + Status: trace.Status.String(), + CPUUsageUS: trace.CPUUsageUS, + NetUsage: trace.NetUsage, + NetUsageWords: uint32(trace.NetUsageWords), + Elapsed: int64(trace.Elapsed), + Scheduled: trace.Scheduled, + Except: trace.Except, + Error: trace.ErrorCode, + } + + // Actions + for _, actionTraceVar := range trace.ActionTraces { + + actionTrace := toActionTraceV1(actionTraceVar) + actMsg := processor.proccessActionTrace(logger, actionTrace) + if actMsg != nil { + actMsg.TxID = trace.ID.String() + actMsg.BlockNum = block.BlockNumber() + actMsg.Timestamp = block.Timestamp.UTC() + + processor.broadcastAction(actMsg) + + transaction.ActionTraces = append(transaction.ActionTraces, *actMsg) + } + } + + processor.encodeQueue(api.TransactionChannel, transaction) +} + +func (processor *ShipProcessor) proccessActionTrace(logger *log.Entry, trace *ship.ActionTraceV1) *message.ActionTrace { + // Check if actions updates an abi. + if trace.Act.Account == processor.syscontract && trace.Act.Name == eos.ActionName("setabi") { + err := processor.updateAbiFromAction(trace.Act) + if err != nil { + logger.WithError(err).Warn("Failed to update abi") + } + } + + act := &message.ActionTrace{ + Name: trace.Act.Name.String(), + Contract: trace.Act.Account.String(), + Receiver: trace.Receiver.String(), + FirstReceiver: trace.Act.Account.String() == trace.Receiver.String(), + } + + if trace.Receipt != nil { + receipt := trace.Receipt.Impl.(*ship.ActionReceiptV0) + act.Receipt = &message.ActionReceipt{ + Receiver: receipt.Receiver.String(), + ActDigest: receipt.ActDigest.String(), + GlobalSequence: receipt.GlobalSequence, + RecvSequence: receipt.RecvSequence, + CodeSequence: uint32(receipt.CodeSequence), + ABISequence: uint32(receipt.ABISequence), + } + + for _, auth := range receipt.AuthSequence { + act.Receipt.AuthSequence = append(act.Receipt.AuthSequence, message.AccountAuthSequence{ + Account: auth.Account.String(), + Sequence: auth.Sequence, + }) + } + } + + for _, auth := range trace.Act.Authorization { + act.Authorization = append(act.Authorization, message.PermissionLevel{ + Actor: auth.Actor.String(), + Permission: auth.Permission.String(), + }) + } + + ABI, err := processor.abi.GetAbi(trace.Act.Account) + if err == nil { + if err = decode(ABI, trace.Act, &act.Data); err != nil { + logger.WithFields(log.Fields{ + "contract": trace.Act.Account, + "action": trace.Act.Name, + }).WithError(err).Warn("Failed to decode action") + } + } else { + logger.WithField("contract", trace.Act.Account). + WithError(err).Error("Failed to get abi for contract") + } + + return act +} + // Callback function called by shipclient.Stream when a new block arrives. func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { // Check to see if we have a microfork and post a message to @@ -178,124 +291,7 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { // Process traces if block.Traces != nil && len(block.Traces.Elem) > 0 { for _, trace := range block.Traces.AsTransactionTracesV0() { - - logger := mainLogger.WithField("type", "trace").WithField("tx_id", trace.ID.String()).Dup() - - transaction := message.TransactionTrace{ - ID: trace.ID.String(), - BlockNum: block.Block.BlockNumber(), - Timestamp: block.Block.Timestamp.UTC(), - Status: trace.Status.String(), - CPUUsageUS: trace.CPUUsageUS, - NetUsage: trace.NetUsage, - NetUsageWords: uint32(trace.NetUsageWords), - Elapsed: int64(trace.Elapsed), - Scheduled: trace.Scheduled, - Except: trace.Except, - Error: trace.ErrorCode, - } - - // Actions - for _, actionTraceVar := range trace.ActionTraces { - var act_trace *ship.ActionTraceV1 - - if trace_v0, ok := actionTraceVar.Impl.(*ship.ActionTraceV0); ok { - // convert to v1 - act_trace = &ship.ActionTraceV1{ - ActionOrdinal: trace_v0.ActionOrdinal, - CreatorActionOrdinal: trace_v0.CreatorActionOrdinal, - Receipt: trace_v0.Receipt, - Receiver: trace_v0.Receiver, - Act: trace_v0.Act, - ContextFree: trace_v0.ContextFree, - Elapsed: trace_v0.Elapsed, - Console: trace_v0.Console, - AccountRamDeltas: trace_v0.AccountRamDeltas, - Except: trace_v0.Except, - ErrorCode: trace_v0.ErrorCode, - ReturnValue: []byte{}, - } - } else { - act_trace = actionTraceVar.Impl.(*ship.ActionTraceV1) - } - - // Check if actions updates an abi. - if act_trace.Act.Account == processor.syscontract && act_trace.Act.Name == eos.ActionName("setabi") { - err := processor.updateAbiFromAction(act_trace.Act) - if err != nil { - logger.WithError(err).Warn("Failed to update abi") - } - } - - act := message.ActionTrace{ - TxID: trace.ID.String(), - BlockNum: block.Block.BlockNumber(), - Timestamp: block.Block.Timestamp.UTC(), - Name: act_trace.Act.Name.String(), - Contract: act_trace.Act.Account.String(), - Receiver: act_trace.Receiver.String(), - FirstReceiver: act_trace.Act.Account.String() == act_trace.Receiver.String(), - } - - if act_trace.Receipt != nil { - receipt := act_trace.Receipt.Impl.(*ship.ActionReceiptV0) - act.Receipt = &message.ActionReceipt{ - Receiver: receipt.Receiver.String(), - ActDigest: receipt.ActDigest.String(), - GlobalSequence: receipt.GlobalSequence, - RecvSequence: receipt.RecvSequence, - CodeSequence: uint32(receipt.CodeSequence), - ABISequence: uint32(receipt.ABISequence), - } - - for _, auth := range receipt.AuthSequence { - act.Receipt.AuthSequence = append(act.Receipt.AuthSequence, message.AccountAuthSequence{ - Account: auth.Account.String(), - Sequence: auth.Sequence, - }) - } - } - - for _, auth := range act_trace.Act.Authorization { - act.Authorization = append(act.Authorization, message.PermissionLevel{ - Actor: auth.Actor.String(), - Permission: auth.Permission.String(), - }) - } - - ABI, err := processor.abi.GetAbi(act_trace.Act.Account) - if err == nil { - if err = decode(ABI, act_trace.Act, &act.Data); err != nil { - logger.WithFields(log.Fields{ - "contract": act_trace.Act.Account, - "action": act_trace.Act.Name, - }).WithError(err).Warn("Failed to decode action") - } - } else { - logger.WithField("contract", act_trace.Act.Account). - WithError(err).Error("Failed to get abi for contract") - } - - payload, err := processor.encode(act) - if err != nil { - continue - } - - transaction.ActionTraces = append(transaction.ActionTraces, act) - - channels := []api.Channel{ - api.ActionChannel{}.Channel(), - api.ActionChannel{Name: act.Name}.Channel(), - api.ActionChannel{Contract: act.Contract}.Channel(), - api.ActionChannel{Name: act.Name, Contract: act.Contract}.Channel(), - } - - for _, channel := range channels { - processor.queueMessage(channel, payload) - } - } - - processor.encodeQueue(api.TransactionChannel, transaction) + processor.processTransactionTrace(mainLogger, block.Block, trace) } }