Skip to content

Commit

Permalink
Create NewContractReader in RegisterTrigger flow of the trigger capab…
Browse files Browse the repository at this point in the history
…ility
  • Loading branch information
kidambisrinivas committed Sep 9, 2024
1 parent 0ff783b commit 1b617a6
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package log_event_trigger
package logeventtrigger

import (
"context"
"errors"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"

evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
)

const ID = "cron-[email protected]"
const ID = "log-event-trigger-%s-%d@1.0.0"

const defaultSendChannelBufferSize = 1000

Expand All @@ -20,8 +24,15 @@ var logEventTriggerInfo = capabilities.MustNewCapabilityInfo(
"A trigger that listens for specific contract log events and starts a workflow run.",
)

// Log Event Trigger Capability Config
type Config struct {
// Log Event Trigger Capability RequestConfig
type RequestConfig struct {
ContractName string `json:"contractName"`
ContractAddress common.Address `json:"contractAddress"`
ContractReaderConfig evmtypes.ChainReaderConfig `json:"contractReaderConfig"`
}

// Log Event Trigger Capability Input
type Input struct {
}

// Log Event Trigger Capability Payload
Expand All @@ -37,21 +48,29 @@ type Response struct {
Payload Payload
}

type logEventTrigger struct {
ch chan<- capabilities.TriggerResponse
}

// Log Event Trigger Capabilities Manager
// Manages different log event triggers using an underlying triggerStore
type LogEventTriggerService struct {
capabilities.CapabilityInfo
capabilities.Validator[Config, struct{}, capabilities.TriggerResponse]
lggr logger.Logger
triggers CapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]
capabilities.Validator[RequestConfig, Input, capabilities.TriggerResponse]
lggr logger.Logger
triggers CapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]
relayer core.Relayer
logEventConfig LogEventConfig
}

// Common capability level config across all workflows
type LogEventConfig struct {
ChainId uint64 `json:"chainId"`
Network string `json:"network"`
LookbackBlocks uint64 `json:"lookbakBlocks"`
PollPeriod uint64 `json:"pollPeriod"`
}

type Params struct {
Logger logger.Logger
Logger logger.Logger
Relayer core.Relayer
LogEventConfig LogEventConfig
}

var _ capabilities.TriggerCapability = (*LogEventTriggerService)(nil)
Expand All @@ -68,24 +87,32 @@ func NewLogEventTriggerService(p Params) *LogEventTriggerService {
CapabilityInfo: logEventTriggerInfo,
lggr: l,
triggers: logEventStore,
relayer: p.Relayer,
logEventConfig: p.LogEventConfig,
}
}

func (s *LogEventTriggerService) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
return capabilities.NewCapabilityInfo(
fmt.Sprintf(ID, s.logEventConfig.Network, s.logEventConfig.ChainId),
capabilities.CapabilityTypeTrigger,
"A trigger that listens for specific contract log events and starts a workflow run.",
)
}

// Register a new trigger
// Can register triggers before the service is actively scheduling
func (s *LogEventTriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
if req.Config == nil {
return nil, errors.New("config is required to register a log event trigger")
}
_, err := s.ValidateConfig(req.Config)
reqConfig, err := s.ValidateConfig(req.Config)
if err != nil {
return nil, err
}
respCh, err := s.triggers.InsertIfNotExists(req.TriggerID, func() (logEventTrigger, chan capabilities.TriggerResponse) {
callbackCh := make(chan capabilities.TriggerResponse, defaultSendChannelBufferSize)
return logEventTrigger{
ch: callbackCh,
}, callbackCh
// Add log event trigger with Contract details to CapabilitiesStore
respCh, err := s.triggers.InsertIfNotExists(req.TriggerID, func() (*logEventTrigger, chan capabilities.TriggerResponse, error) {
return newLogEventTrigger(ctx, reqConfig, s.logEventConfig, s.relayer)
})
if err != nil {
return nil, fmt.Errorf("log_event_trigger %v", err)
Expand All @@ -109,6 +136,10 @@ func (s *LogEventTriggerService) UnregisterTrigger(ctx context.Context, req capa

// Start the service.
func (s *LogEventTriggerService) Start(ctx context.Context) error {
if s.relayer == nil {
return errors.New("service has shutdown, it must be built again to restart")
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,62 +1,65 @@
package log_event_trigger
package logeventtrigger

import (
"fmt"
"sync"
)

type NewCapabilityFn[T any, Resp any] func() (T, chan Resp)
type RegisterCapabilityFn[T any, Resp any] func() (*T, chan Resp, error)

// Interface of the capabilities store
type CapabilitiesStore[T any, Resp any] interface {
Read(capabilityID string) (value T, ok bool)
ReadAll() (values map[string]T)
Write(capabilityID string, value T)
InsertIfNotExists(capabilityID string, fn NewCapabilityFn[T, Resp]) (chan Resp, error)
Read(capabilityID string) (value *T, ok bool)
ReadAll() (values map[string]*T)
Write(capabilityID string, value *T)
InsertIfNotExists(capabilityID string, fn RegisterCapabilityFn[T, Resp]) (chan Resp, error)
Delete(capabilityID string)
}

// Implementation for the CapabilitiesStore interface
type capabilitiesStore[T any, Resp any] struct {
mu sync.RWMutex
capabilities map[string]T
capabilities map[string]*T
}

var _ CapabilitiesStore[string, string] = (CapabilitiesStore[string, string])(nil)

// Constructor for capabilitiesStore struct implementing CapabilitiesStore interface
func NewCapabilitiesStore[T any, Resp any]() CapabilitiesStore[T, Resp] {
return &capabilitiesStore[T, Resp]{
capabilities: map[string]T{},
capabilities: map[string]*T{},
}
}

func (cs *capabilitiesStore[T, Resp]) Read(capabilityID string) (value T, ok bool) {
func (cs *capabilitiesStore[T, Resp]) Read(capabilityID string) (value *T, ok bool) {
cs.mu.RLock()
defer cs.mu.RUnlock()
trigger, ok := cs.capabilities[capabilityID]
return trigger, ok
}

func (cs *capabilitiesStore[T, Resp]) ReadAll() (values map[string]T) {
func (cs *capabilitiesStore[T, Resp]) ReadAll() (values map[string]*T) {
cs.mu.RLock()
defer cs.mu.RUnlock()
return cs.capabilities
}

func (cs *capabilitiesStore[T, Resp]) Write(capabilityID string, value T) {
func (cs *capabilitiesStore[T, Resp]) Write(capabilityID string, value *T) {
cs.mu.Lock()
defer cs.mu.Unlock()
cs.capabilities[capabilityID] = value
}

func (cs *capabilitiesStore[T, Resp]) InsertIfNotExists(capabilityID string, fn NewCapabilityFn[T, Resp]) (chan Resp, error) {
func (cs *capabilitiesStore[T, Resp]) InsertIfNotExists(capabilityID string, fn RegisterCapabilityFn[T, Resp]) (chan Resp, error) {
cs.mu.Lock()
defer cs.mu.Unlock()
if _, ok := cs.capabilities[capabilityID]; ok {
return nil, fmt.Errorf("capabilityID %v already exists", capabilityID)
}
value, respCh := fn()
value, respCh, err := fn()
if err != nil {
return nil, fmt.Errorf("error registering capability: %v", err)
}
cs.capabilities[capabilityID] = value
return respCh, nil
}
Expand Down
93 changes: 93 additions & 0 deletions core/capabilities/logeventtrigger/trigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package logeventtrigger

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"

evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
)

type logEventTrigger struct {
ch chan<- capabilities.TriggerResponse

// Contract address and Event Signature to monitor for
contractName string
contractAddress common.Address
contractReaderConfig evmtypes.ChainReaderConfig
contractReader types.ContractReader

// Log Event Trigger config with pollPeriod and lookbackBlocks
logEventConfig LogEventConfig
ticker *time.Ticker
done chan bool
}

// Construct for logEventTrigger struct
func newLogEventTrigger(ctx context.Context,
reqConfig *RequestConfig,
logEventConfig LogEventConfig,
relayer core.Relayer) (*logEventTrigger, chan capabilities.TriggerResponse, error) {
jsonBytes, err := json.Marshal(reqConfig.ContractReaderConfig)
if err != nil {
return nil, nil, err
}

// Create a New Contract Reader client, which brings a corresponding ContractReader gRPC service
// in Chainlink Core service
contractReader, err := relayer.NewContractReader(ctx, jsonBytes)
if err != nil {
return nil, nil,
fmt.Errorf("error fetching contractReader for chainID %d from relayerSet: %v", logEventConfig.ChainId, err)
}

// Bind Contract in ContractReader
boundContracts := []types.BoundContract{{Name: reqConfig.ContractName, Address: reqConfig.ContractAddress.Hex()}}
err = contractReader.Bind(ctx, boundContracts)
if err != nil {
return nil, nil, err
}

callbackCh := make(chan capabilities.TriggerResponse, defaultSendChannelBufferSize)
ticker := time.NewTicker(time.Duration(logEventConfig.PollPeriod) * time.Millisecond)
done := make(chan bool)

// Initialise a Log Event Trigger
l := &logEventTrigger{
ch: callbackCh,
contractName: reqConfig.ContractName,
contractAddress: reqConfig.ContractAddress,
contractReaderConfig: reqConfig.ContractReaderConfig,
contractReader: contractReader,
logEventConfig: logEventConfig,
ticker: ticker,
done: done,
}
go l.Listen()

return l, callbackCh, nil
}

// Listen to contract events and trigger workflow runs
func (l *logEventTrigger) Listen() {
// Listen for events from lookbackPeriod
for {
select {
case <-l.done:
return
case t := <-l.ticker.C:
fmt.Println("Tick at", t)
}
}
}

// Stop contract event listener for the current contract
func (l *logEventTrigger) Stop() {
l.done <- true
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ require (
github.com/smartcontractkit/chain-selectors v1.0.21
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240902144105-70b5719fd098
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240903184200-6488292a85e3
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240905131601-1128f33dc70b
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240904093355-e40169857652
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240710170203-5b41615da827
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1147,8 +1147,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240902144105-70b5719fd098 h1:gZsXQ//TbsaD9bcvR2wOdao7AgNDIS/Uml0FEF0vJuI=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240902144105-70b5719fd098/go.mod h1:Z9lQ5t20kRk28pzRLnqAJZUVOw8E6/siA3P3MLyKqoM=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240903184200-6488292a85e3 h1:fkfOoAPviqO2rN8ngvejsDa7WKcw4paGEFA4/Znu0L0=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240903184200-6488292a85e3/go.mod h1:D/qaCoq0SxXzg5NRN5FtBRv98VBf+D2NOC++RbvvuOc=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240905131601-1128f33dc70b h1:TV7fPyY/4hGaehG2XowJ8elJCg+bIB0WN8YOqT/MRc4=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240905131601-1128f33dc70b/go.mod h1:D/qaCoq0SxXzg5NRN5FtBRv98VBf+D2NOC++RbvvuOc=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 h1:NBQLtqk8zsyY4qTJs+NElI3aDFTcAo83JHvqD04EvB0=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45/go.mod h1:LV0h7QBQUpoC2UUi6TcUvcIFm1xjP/DtEcqV8+qeLUs=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240904093355-e40169857652 h1:0aZ3HiEz2bMM5ywHAyKlFMN95qTzpNDn7uvnHLrFX6s=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package main

import (
"context"
"encoding/json"
"fmt"

"github.com/hashicorp/go-plugin"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/log_event_trigger"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/logeventtrigger"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
)

Expand All @@ -20,6 +22,7 @@ const (
type LogEventTriggerGRPCService struct {
trigger capabilities.TriggerCapability
s *loop.Server
config logeventtrigger.LogEventConfig
}

func main() {
Expand Down Expand Up @@ -87,8 +90,25 @@ func (cs *LogEventTriggerGRPCService) Initialise(
relayerSet core.RelayerSet,
) error {
cs.s.Logger.Debugf("Initialising %s", serviceName)
cs.trigger = log_event_trigger.NewLogEventTriggerService(log_event_trigger.Params{
Logger: cs.s.Logger,

var logEventConfig logeventtrigger.LogEventConfig
err := json.Unmarshal([]byte(config), &logEventConfig)
if err != nil {
return fmt.Errorf("error decoding log_event_trigger config: %v", err)
}

relayID := types.NewRelayID(logEventConfig.Network, fmt.Sprintf("%d", logEventConfig.ChainId))
relayer, err := relayerSet.Get(ctx, relayID)
if err != nil {
return fmt.Errorf("error fetching relayer for chainID %d from relayerSet: %v", logEventConfig.ChainId, err)
}

// Set relayer and trigger in LogEventTriggerGRPCService
cs.config = logEventConfig
cs.trigger = logeventtrigger.NewLogEventTriggerService(logeventtrigger.Params{
Logger: cs.s.Logger,
Relayer: relayer,
LogEventConfig: logEventConfig,
})

if err := capabilityRegistry.Add(ctx, cs.trigger); err != nil {
Expand Down

0 comments on commit 1b617a6

Please sign in to comment.