Skip to content

Commit

Permalink
Add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
InversionSpaces committed Oct 16, 2023
1 parent 01dfda1 commit 987f850
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 1 deletion.
5 changes: 5 additions & 0 deletions gateway/aqua/balancer.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ ability Balancer:
nextProvider() -> Provider
next() -> Worker, Provider

-- Create balancer that returns
-- workers and providers in random order
func randomBalancer{Random}(workers: []Worker, providers: []Provider) -> Balancer:
-- closures do not capture topology here
nextWorker = func () -> Worker:
rand <- Random.next()
idx = rand % workers.length
Expand All @@ -33,6 +36,8 @@ func randomBalancer{Random}(workers: []Worker, providers: []Provider) -> Balance

<- Balancer(next=next, nextWorker=nextWorker, nextProvider=nextProvider)

-- Create balancer that returns
-- workers and providers in cycle order
func cycleBalancer{Counter}(workers: []Worker, providers: []Provider) -> Balancer:
next = func () -> Worker, Provider:
n <- Counter.incrementAndReturn()
Expand Down
4 changes: 4 additions & 0 deletions gateway/aqua/eth_rpc.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import "services.aqua"

import Provider from "provider.aqua"

-- Ability to call Ethereum JSON RPC methods
ability RPCEth:
call(method: string, jsonArgs: []string) -> JsonString

-- Create RPCEth ability from Worker and Provider
func fromWorkerProvider(worker: Worker, provider: Provider) -> RPCEth:
-- closure does not capture topology here
call = func (method: string, jsonArgs: []string) -> JsonString:
-- TODO: Handle worker_id == nil?
on worker.worker_id! via worker.host_id:
res <- EthRpc.eth_call(provider, method, jsonArgs)
<- res
Expand Down
4 changes: 4 additions & 0 deletions gateway/aqua/quorum.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ data QuorumResult:
results: []JsonString
error: string

-- Ability to check if a quorum on results is reached
ability QuorumChecker:
check(results: []JsonString, minResults: u32) -> QuorumResult

service QuorumCheckerSrv("quorum"):
check(results: []JsonString, minResults: u32) -> QuorumResult

-- Create a QuorumChecker ability
-- that checks quorum on peer through QuorumCheckerSrv(id)
func onPeerQuorumChecker(peer: string, id: string) -> QuorumChecker:
-- closure does not capture topology here
check = func (results: []JsonString, minResults: u32) -> QuorumResult:
on peer:
QuorumCheckerSrv id
Expand Down
2 changes: 2 additions & 0 deletions gateway/aqua/random.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import NumOp from "utils.aqua"
ability Random:
next() -> i64

-- Create random from timestamp
func timeRandom() -> Random:
-- closure does not capture topology here
next = func () -> i64:
t <- Peer.timestamp_sec()
n <- NumOp.identity(t)
Expand Down
9 changes: 8 additions & 1 deletion gateway/aqua/rpc.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func errorQuorumResult(msg: string) -> QuorumResult:
func errorJsonString(msg: string) -> JsonString:
<- JsonString(value = "", success = false, error = msg)

-- Get workers participating in deal
func getWorkers() -> []Worker, ?string:
on INIT_PEER_ID via HOST_PEER_ID:
deals <- Deals.get()
Expand All @@ -31,19 +32,22 @@ func getWorkers() -> []Worker, ?string:
result <- Subnet.resolve(dealId)
<- result.workers, result.error

-- Call RPC method through ability
func rpcCall{RPCEth}(method: string, jsonArgs: []string) -> JsonString:
<- RPCEth.call(method, jsonArgs)

-- Call RPC method with load balancing
func balancedEthCall{Logger, Balancer}(method: string, jsonArgs: []string) -> JsonString:
on HOST_PEER_ID:
worker, provider <- Balancer.next()
Logger.logWorker(worker)
Logger.logCall(provider)
Op.noop() -- dirty hack
Op.noop() -- dirty hack for topology to converge
rpc <- fromWorkerProvider(worker, provider)
result <- rpcCall{rpc}(method, jsonArgs)
<- result

-- Call RPC method with random load balancing
func randomLoadBalancingEth(uris: []string, method: string, jsonArgs: []string) -> JsonString:
result: *JsonString

Expand All @@ -58,6 +62,7 @@ func randomLoadBalancingEth(uris: []string, method: string, jsonArgs: []string)

<- result!

-- Call RPC method with round-robin load balancing
func roundRobinEth(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string) -> JsonString:
result: *JsonString

Expand All @@ -72,6 +77,7 @@ func roundRobinEth(uris: []string, method: string, jsonArgs: []string, counterSe

<- result!

-- Call RPC method with workers quorum and provider load balancing
func quorum{ProviderBalancer, QuorumChecker}(workers: []Worker, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string) -> QuorumResult:
results: *JsonString
on HOST_PEER_ID:
Expand All @@ -86,6 +92,7 @@ func quorum{ProviderBalancer, QuorumChecker}(workers: []Worker, quorumNumber: u3

<- QuorumChecker.check(results, quorumNumber)

-- Call RPC method with workers quorum and provider load balancing
func quorumEth(uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string) -> QuorumResult:
result: *QuorumResult

Expand Down
1 change: 1 addition & 0 deletions gateway/aqua/utils.aqua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
module Utils declares NumOp

-- Used to coerce types
service NumOp("op"):
identity(n: u64) -> i64

0 comments on commit 987f850

Please sign in to comment.