Skip to content

Commit

Permalink
add portal support for substrate
Browse files Browse the repository at this point in the history
  • Loading branch information
belopash committed Dec 6, 2024
1 parent 05fd295 commit 50a3fde
Show file tree
Hide file tree
Showing 9 changed files with 341 additions and 134 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/substrate-processor",
"comment": "add portal api support",
"type": "minor"
}
],
"packageName": "@subsquid/substrate-processor"
}
26 changes: 14 additions & 12 deletions evm/evm-processor/src/ds-archive/portal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,15 @@ export class EvmPortal implements DataSource<Block, DataRequest> {
): AsyncIterable<Batch<Block>> {
let height = new Throttler(() => this.client.getHeight(), 20_000)

let top = await height.get()
let top = await height.call()
for (let req of requests) {
let from = req.range.from
let to = req.range.to
if (top < from && stopOnHead) return
let lastBlock = req.range.from - 1
let endBlock = req.range.to || Infinity
let query = makeQuery(req)

let query = makeQuery({
...req,
range: {from, to},
})
for await (let batch of this.client.stream(query, stopOnHead)) {
assert(batch.length > 0, 'boundary blocks are expected to be included')
let lastBlock = last(batch).header.number
assert(lastBlock >= from)
from = lastBlock + 1
lastBlock = last(batch).header.number

let blocks = batch.map((b) => {
try {
Expand All @@ -108,11 +102,19 @@ export class EvmPortal implements DataSource<Block, DataRequest> {

yield {
blocks,
isHead: from > top,
isHead: lastBlock > top,
}

top = await height.get()
}

// stream ended before requested range,
// which means we reached the last available block
// should not happen if stopOnHead is set to false
if (lastBlock <= endBlock) {
assert(stopOnHead, 'unexpected end of stream')
break
}
}
}
}
6 changes: 2 additions & 4 deletions evm/evm-processor/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ export class EvmBatchProcessor<F extends FieldSelection = {}> {
* processor.setGateway('https://v2.archive.subsquid.io/network/ethereum-mainnet')
*/
setGateway(url: string | GatewaySettings): this {
assert(this.archive?.type !== 'gateway', 'setGateway() can not be used together with setPortal()')
assert(this.archive?.type !== 'gateway', '.setGateway() can not be used together with setPortal()')
this.assertNotRunning()
if (typeof url == 'string') {
this.archive = {type: 'gateway', url}
Expand All @@ -233,9 +233,8 @@ export class EvmBatchProcessor<F extends FieldSelection = {}> {
return this
}


setPortal(url: string | PortalSettings): this {
assert(this.archive?.type !== 'gateway', 'setPortal() can not be used together with setGateway()')
assert(this.archive?.type !== 'gateway', '.setPortal() can not be used together with setGateway()')
this.assertNotRunning()
if (typeof url == 'string') {
this.archive = {type: 'portal', url}
Expand All @@ -245,7 +244,6 @@ export class EvmBatchProcessor<F extends FieldSelection = {}> {
return this
}


/**
* Set chain RPC endpoint
*
Expand Down
1 change: 1 addition & 0 deletions substrate/substrate-processor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"@subsquid/http-client": "^1.5.0",
"@subsquid/logger": "^1.3.3",
"@subsquid/rpc-client": "^4.9.0",
"@subsquid/portal-client": "^0.0.0",
"@subsquid/substrate-data": "^4.2.1",
"@subsquid/substrate-data-raw": "^1.2.0",
"@subsquid/util-internal": "^3.2.0",
Expand Down
194 changes: 92 additions & 102 deletions substrate/substrate-processor/src/ds-archive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {DEFAULT_FIELDS, FieldSelection} from './interfaces/data'
import {ArchiveBlock, ArchiveBlockHeader} from './interfaces/data-partial'
import {DataRequest} from './interfaces/data-request'
import {Block, BlockHeader, Call, Event, Extrinsic, setUpItems} from './mapping'
import {mergeFields} from './selection'


interface ArchiveQuery extends DataRequest {
Expand Down Expand Up @@ -44,7 +45,6 @@ export class SubstrateArchive implements DataSource<Block, DataRequest> {
}

async *getFinalizedBlocks(requests: RangeRequestList<DataRequest>, stopOnHead?: boolean): AsyncIterable<Batch<Block>> {

let runtimeTracker = new RuntimeTracker<ArchiveBlockHeader & WithRuntime>(
this.rpc,
hdr => ({height: hdr.number, hash: hdr.hash, parentHash: hdr.parentHash}),
Expand Down Expand Up @@ -80,108 +80,8 @@ export class SubstrateArchive implements DataSource<Block, DataRequest> {

@annotateSyncError((src: ArchiveBlock) => ({blockHeight: src.header.number, blockHash: src.header.hash}))
private mapBlock(src: ArchiveBlock): Block {
let block = new Block(new BlockHeader(
assertNotNull(src.header.runtime),
assertNotNull(src.header.runtimeOfPrevBlock),
{
height: src.header.number,
...src.header
}
))

if (src.extrinsics) {
for (let s of src.extrinsics) {
let extrinsic = new Extrinsic(block.header, s.index)
if (s.version != null) {
extrinsic.version = s.version
}
if (s.signature != null) {
extrinsic.signature = s.signature
}
if (s.fee != null) {
extrinsic.fee = BigInt(s.fee)
}
if (s.tip != null) {
extrinsic.tip = BigInt(s.tip)
}
if (s.error != null) {
extrinsic.error = s.error
}
if (s.success != null) {
extrinsic.success = s.success
}
if (s.hash != null) {
extrinsic.hash = s.hash
}
block.extrinsics.push(extrinsic)
}
}

if (src.calls) {
for (let s of src.calls) {
let call = new Call(block.header, s.extrinsicIndex, s.address)
if (s.name) {
call.name = s.name
}
if (s.args != null) {
call.args = s.args
}
if (s.origin != null) {
call.origin = s.origin
}
if (s.error != null) {
call.error = s.error
}
if (s.success != null) {
call.success = s.success
}
block.calls.push(call)
}
}

if (src.events) {
for (let s of src.events) {
let event = new Event(block.header, s.index)
if (s.name != null) {
event.name = s.name
}
if (s.args != null) {
event.args = s.args
}
if (s.phase != null) {
event.phase = s.phase
}
if (s.extrinsicIndex != null) {
event.extrinsicIndex = s.extrinsicIndex
}
if (s.callAddress != null) {
event.callAddress = s.callAddress
}
if (s.topics != null) {
event.topics = s.topics
}
block.events.push(event)
}
}

setUpItems(block)
return block
}
}


type Selector<Keys extends string> = {
[K in Keys]?: boolean
}


function mergeFields<Keys extends string>(def: Selector<Keys>, requested?: Selector<Keys>, required?: Selector<Keys>): Selector<Keys> {
let fields: Selector<Keys> = {...def}
for (let key in requested) {
fields[key] = requested[key]
return mapBlock(src)
}
Object.assign(fields, required)
return fields
}


Expand All @@ -198,3 +98,93 @@ function getFields(fields: FieldSelection | undefined): FieldSelection {
extrinsic: mergeFields(DEFAULT_FIELDS.extrinsic, fields?.extrinsic)
}
}


export function mapBlock(src: ArchiveBlock): Block {
let block = new Block(new BlockHeader(
assertNotNull(src.header.runtime),
assertNotNull(src.header.runtimeOfPrevBlock),
{
height: src.header.number,
...src.header
}
))

if (src.extrinsics) {
for (let s of src.extrinsics) {
let extrinsic = new Extrinsic(block.header, s.index)
if (s.version != null) {
extrinsic.version = s.version
}
if (s.signature != null) {
extrinsic.signature = s.signature
}
if (s.fee != null) {
extrinsic.fee = BigInt(s.fee)
}
if (s.tip != null) {
extrinsic.tip = BigInt(s.tip)
}
if (s.error != null) {
extrinsic.error = s.error
}
if (s.success != null) {
extrinsic.success = s.success
}
if (s.hash != null) {
extrinsic.hash = s.hash
}
block.extrinsics.push(extrinsic)
}
}

if (src.calls) {
for (let s of src.calls) {
let call = new Call(block.header, s.extrinsicIndex, s.address)
if (s.name) {
call.name = s.name
}
if (s.args != null) {
call.args = s.args
}
if (s.origin != null) {
call.origin = s.origin
}
if (s.error != null) {
call.error = s.error
}
if (s.success != null) {
call.success = s.success
}
block.calls.push(call)
}
}

if (src.events) {
for (let s of src.events) {
let event = new Event(block.header, s.index)
if (s.name != null) {
event.name = s.name
}
if (s.args != null) {
event.args = s.args
}
if (s.phase != null) {
event.phase = s.phase
}
if (s.extrinsicIndex != null) {
event.extrinsicIndex = s.extrinsicIndex
}
if (s.callAddress != null) {
event.callAddress = s.callAddress
}
if (s.topics != null) {
event.topics = s.topics
}
block.events.push(event)
}
}

setUpItems(block)
return block
}
Loading

0 comments on commit 50a3fde

Please sign in to comment.