Skip to content

Commit

Permalink
Fix update clients
Browse files Browse the repository at this point in the history
Signed-off-by: Dongri Jin <[email protected]>
  • Loading branch information
dongrie committed Nov 24, 2023
1 parent 2ef267a commit 90087f6
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 66 deletions.
8 changes: 4 additions & 4 deletions cmd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func startCmd(ctx *config.Context) *cobra.Command {
}
cmd.Flags().Duration(flagRelayInterval, defaultRelayInterval, "time interval to perform relays")
cmd.Flags().String(flagPrometheusAddr, defaultPrometheusAddr, "host address to which the prometheus exporter listens")
cmd.Flags().Duration(flagSrcRelayOptimizeInterval, defaultRelayOptimizeInterval, "time interval to perform relays optimization on source chain")
cmd.Flags().Int64(flagSrcRelayOptimizeCount, defaultRelayOptimizeCount, "number of packets to relays optimization on source chain")
cmd.Flags().Duration(flagDstRelayOptimizeInterval, defaultRelayOptimizeInterval, "time interval to perform relays optimization on destination chain")
cmd.Flags().Int64(flagDstRelayOptimizeCount, defaultRelayOptimizeCount, "number of packets to relays optimization on destination chain")
cmd.Flags().Duration(flagSrcRelayOptimizeInterval, defaultRelayOptimizeInterval, "maximum time interval to delay relays for optimization")
cmd.Flags().Int64(flagSrcRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization")
cmd.Flags().Duration(flagDstRelayOptimizeInterval, defaultRelayOptimizeInterval, "maximum time interval to delay relays for optimization")
cmd.Flags().Int64(flagDstRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization")
return cmd
}
18 changes: 14 additions & 4 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,18 @@ func relayMsgsCmd(ctx *config.Context) *cobra.Command {

msgs := core.NewRelayMsgs()

if m, err := st.UpdateClients(c[src], c[dst], sp, &core.RelayPackets{}, sh, viper.GetBool(flagDoRefresh)); err != nil {
doExecuteRelaySrc := len(sp.Src) > 0
doExecuteRelayDst := len(sp.Dst) > 0
doExecuteAckSrc := false
doExecuteAckDst := false

if m, err := st.UpdateClients(c[src], c[dst], doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, sh, viper.GetBool(flagDoRefresh)); err != nil {
return err
} else {
msgs.Merge(m)
}

if m, err := st.RelayPackets(c[src], c[dst], sp, sh, true, true); err != nil {
if m, err := st.RelayPackets(c[src], c[dst], sp, sh, doExecuteRelaySrc, doExecuteRelayDst); err != nil {
return err
} else {
msgs.Merge(m)
Expand Down Expand Up @@ -258,13 +263,18 @@ func relayAcksCmd(ctx *config.Context) *cobra.Command {

msgs := core.NewRelayMsgs()

if m, err := st.UpdateClients(c[src], c[dst], &core.RelayPackets{}, sp, sh, viper.GetBool(flagDoRefresh)); err != nil {
doExecuteRelaySrc := false
doExecuteRelayDst := false
doExecuteAckSrc := len(sp.Src) > 0
doExecuteAckDst := len(sp.Dst) > 0

if m, err := st.UpdateClients(c[src], c[dst], doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, sh, viper.GetBool(flagDoRefresh)); err != nil {
return err
} else {
msgs.Merge(m)
}

if m, err := st.RelayAcknowledgements(c[src], c[dst], sp, sh, true, true); err != nil {
if m, err := st.RelayAcknowledgements(c[src], c[dst], sp, sh, doExecuteAckSrc, doExecuteAckDst); err != nil {
return err
} else {
msgs.Merge(m)
Expand Down
21 changes: 9 additions & 12 deletions core/naive-strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader
}, nil
}

func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, srcOptimizeRelay, dstOptimizeRelay bool) (*RelayMsgs, error) {
func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteRelaySrc, doExecuteRelayDst bool) (*RelayMsgs, error) {
logger := GetChannelPairLogger(src, dst)
defer logger.TimeTrack(time.Now(), "RelayPackets", "num_src", len(rp.Src), "num_dst", len(rp.Dst))

Expand All @@ -225,7 +225,7 @@ func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets,
return nil, err
}

if srcOptimizeRelay {
if doExecuteRelaySrc {
msgs.Dst, err = collectPackets(srcCtx, src, rp.Src, dstAddress)
if err != nil {
logger.Error(
Expand All @@ -236,7 +236,7 @@ func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets,
}
}

if dstOptimizeRelay {
if doExecuteRelayDst {
msgs.Src, err = collectPackets(dstCtx, dst, rp.Dst, srcAddress)
if err != nil {
logger.Error(
Expand Down Expand Up @@ -420,7 +420,7 @@ func logPacketsRelayed(src, dst Chain, num int, obj string, dir string) {
)
}

func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, srcOptimizeRelay, dstOptimizeRelay bool) (*RelayMsgs, error) {
func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteRelaySrc, doExecuteRelayDst bool) (*RelayMsgs, error) {
logger := GetChannelPairLogger(src, dst)
defer logger.TimeTrack(time.Now(), "RelayAcknowledgements", "num_src", len(rp.Src), "num_dst", len(rp.Dst))

Expand All @@ -445,13 +445,13 @@ func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *Rela
return nil, err
}

if !st.dstNoAck && srcOptimizeRelay {
if !st.dstNoAck && doExecuteRelaySrc {
msgs.Dst, err = collectAcks(srcCtx, src, rp.Src, dstAddress)
if err != nil {
return nil, err
}
}
if !st.srcNoAck && dstOptimizeRelay {
if !st.srcNoAck && doExecuteRelayDst {
msgs.Src, err = collectAcks(dstCtx, dst, rp.Dst, srcAddress)
if err != nil {
return nil, err
Expand Down Expand Up @@ -495,16 +495,13 @@ func collectAcks(ctx QueryContext, chain *ProvableChain, packets PacketInfoList,
return msgs, nil
}

func (st *NaiveStrategy) UpdateClients(src, dst *ProvableChain, rpForRecv, rpForAck *RelayPackets, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) {
func (st *NaiveStrategy) UpdateClients(src, dst *ProvableChain, doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst bool, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) {
logger := GetChannelPairLogger(src, dst)

msgs := NewRelayMsgs()

// check if unrelayed packets or acks exist
needsUpdateForSrc := len(rpForRecv.Dst) > 0 ||
!st.srcNoAck && len(rpForAck.Dst) > 0
needsUpdateForDst := len(rpForRecv.Src) > 0 ||
!st.dstNoAck && len(rpForAck.Src) > 0
needsUpdateForSrc := doExecuteRelaySrc || (doExecuteAckSrc && !st.srcNoAck)
needsUpdateForDst := doExecuteRelayDst || (doExecuteAckDst && !st.dstNoAck)

// check if LC refresh is needed
if !needsUpdateForSrc && doRefresh {
Expand Down
68 changes: 35 additions & 33 deletions core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,19 @@ func NewRelayService(
dstOptimizeInterval: dstOptimizeInterval,
dstOptimizeCount: dstOptimizeCount,
srcRelayPacketStartTime: WatchStartTime{
AlreadySet: true,
AlreadySet: false,
StartTime: time.Now(),
},
dstRelayPacketStartTime: WatchStartTime{
AlreadySet: true,
AlreadySet: false,
StartTime: time.Now(),
},
srcRelayAckStartTime: WatchStartTime{
AlreadySet: true,
AlreadySet: false,
StartTime: time.Now(),
},
dstRelayAckStartTime: WatchStartTime{
AlreadySet: true,
AlreadySet: false,
StartTime: time.Now(),
},
},
Expand Down Expand Up @@ -157,14 +157,6 @@ func (srv *RelayService) Serve(ctx context.Context) error {

msgs := NewRelayMsgs()

// update clients
if m, err := srv.st.UpdateClients(srv.src, srv.dst, pseqs, aseqs, srv.sh, true); err != nil {
logger.Error("failed to update clients", err)
return err
} else {
msgs.Merge(m)
}

// reset watch start time for packets
if len(pseqs.Src) > 0 {
resetWatchStartTime(&srv.optimizeRelay.srcRelayPacketStartTime)
Expand All @@ -173,43 +165,53 @@ func (srv *RelayService) Serve(ctx context.Context) error {
resetWatchStartTime(&srv.optimizeRelay.dstRelayPacketStartTime)
}

// reset watch start time for acks
if len(aseqs.Src) > 0 {
resetWatchStartTime(&srv.optimizeRelay.srcRelayAckStartTime)
}
if len(aseqs.Dst) > 0 {
resetWatchStartTime(&srv.optimizeRelay.dstRelayAckStartTime)
}

doExecuteRelaySrc, doExecuteRelayDst := srv.shouldExecuteRelay(pseqs, srv.optimizeRelay.srcRelayPacketStartTime, srv.optimizeRelay.dstRelayPacketStartTime)
doExecuteAckSrc, doExecuteAckDst := srv.shouldExecuteRelay(aseqs, srv.optimizeRelay.srcRelayAckStartTime, srv.optimizeRelay.dstRelayAckStartTime)

// update clients
if m, err := srv.st.UpdateClients(srv.src, srv.dst, doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, srv.sh, true); err != nil {
logger.Error("failed to update clients", err)
return err
} else {
msgs.Merge(m)
}

// relay packets if unrelayed seqs exist
srcRelayPackets, dstRelayPackets := srv.shouldExecuteRelay(pseqs, srv.optimizeRelay.srcRelayPacketStartTime, srv.optimizeRelay.dstRelayPacketStartTime)
if m, err := srv.st.RelayPackets(srv.src, srv.dst, pseqs, srv.sh, srcRelayPackets, dstRelayPackets); err != nil {
if m, err := srv.st.RelayPackets(srv.src, srv.dst, pseqs, srv.sh, doExecuteRelaySrc, doExecuteRelayDst); err != nil {
logger.Error("failed to relay packets", err)
return err
} else {
if srcRelayPackets {
srv.optimizeRelay.srcRelayPacketStartTime.AlreadySet = false
}
if dstRelayPackets {
srv.optimizeRelay.dstRelayPacketStartTime.AlreadySet = false
}
msgs.Merge(m)
}

// reset watch start time for acks
if len(aseqs.Src) > 0 {
resetWatchStartTime(&srv.optimizeRelay.srcRelayAckStartTime)
if doExecuteRelaySrc {
srv.optimizeRelay.srcRelayPacketStartTime.AlreadySet = false
}
if len(aseqs.Dst) > 0 {
resetWatchStartTime(&srv.optimizeRelay.dstRelayAckStartTime)
if doExecuteRelayDst {
srv.optimizeRelay.dstRelayPacketStartTime.AlreadySet = false
}

// relay acks if unrelayed seqs exist
srcRelayAcks, dstRelayAcks := srv.shouldExecuteRelay(aseqs, srv.optimizeRelay.srcRelayAckStartTime, srv.optimizeRelay.dstRelayAckStartTime)
if m, err := srv.st.RelayAcknowledgements(srv.src, srv.dst, aseqs, srv.sh, srcRelayAcks, dstRelayAcks); err != nil {
if m, err := srv.st.RelayAcknowledgements(srv.src, srv.dst, aseqs, srv.sh, doExecuteAckSrc, doExecuteAckDst); err != nil {
logger.Error("failed to relay acknowledgements", err)
return err
} else {
if srcRelayAcks {
srv.optimizeRelay.srcRelayAckStartTime.AlreadySet = false
}
if dstRelayAcks {
srv.optimizeRelay.dstRelayAckStartTime.AlreadySet = false
}
msgs.Merge(m)
}
if doExecuteAckSrc {
srv.optimizeRelay.srcRelayAckStartTime.AlreadySet = false
}
if doExecuteAckDst {
srv.optimizeRelay.dstRelayAckStartTime.AlreadySet = false
}

// send all msgs to src/dst chains
srv.st.Send(srv.src, srv.dst, msgs)
Expand Down
6 changes: 3 additions & 3 deletions core/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ type StrategyI interface {
UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error)

// RelayPackets executes RecvPacket to the packets contained in `rp` on both chains (`src` and `dst`).
RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, srcOptimizeRelay, dstOptimizeRelay bool) (*RelayMsgs, error)
RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteRelaySrc, doExecuteRelayDst bool) (*RelayMsgs, error)

// UnrelayedAcknowledgements returns packets to execute AcknowledgePacket to on `src` and `dst`.
// `includeRelayedButUnfinalized` decides if the result includes packets of which acknowledgePacket has been executed but not finalized
UnrelayedAcknowledgements(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error)

// RelayAcknowledgements executes AcknowledgePacket to the packets contained in `rp` on both chains (`src` and `dst`).
RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, srcOptimizeRelay, dstOptimizeRelay bool) (*RelayMsgs, error)
RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteRelaySrc, doExecuteRelayDst bool) (*RelayMsgs, error)

// UpdateClients executes UpdateClient only if needed
UpdateClients(src, dst *ProvableChain, rpForRecv, rpForAck *RelayPackets, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error)
UpdateClients(src, dst *ProvableChain, doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst bool, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error)

// Send executes submission of msgs to src/dst chains
Send(src, dst Chain, msgs *RelayMsgs)
Expand Down
22 changes: 17 additions & 5 deletions tests/cases/tm2tm/scripts/test-service
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

: <<'END_COMMENT'
* relay-interval = 10s
* relay-optimize-interval = 15s
* relay-optimize-count = 3
* src-relay-optimize-interval = 15s
* src-relay-optimize-count = 3
* dst-relay-optimize-interval = 15s
* dst-relay-optimize-count = 3
- relay service [query = 0, time = 0] -> skip
- transfer
Expand All @@ -15,7 +17,7 @@
-
-
-
- relay service [query = 1, time = 10] -> skip
- relay service [query = 1, time = 0] -> skip
- transfer
-
-
Expand All @@ -25,6 +27,16 @@
-
-
-
- relay service [query = 2, time = 10] -> skip
-
-
-
-
-
-
-
-
-
- relay service [query = 2, time = 20] -> exec (time)
- transfer x 3
-
Expand All @@ -34,7 +46,7 @@
-
-
-
-
-
- relay service [query = 3, time = 10] -> exec (count)
END_COMMENT

Expand Down Expand Up @@ -80,7 +92,7 @@ if [ $unrelayed_packets -ne 2 ]; then
exit 1
fi

sleep 7 # wait for relay-interval
sleep 17 # wait for relay-interval
echo "xxxxxxx relay service -> exec - time xxxxxx"

# transfer a token x 3
Expand Down
22 changes: 17 additions & 5 deletions tests/cases/tmmock2tmmock/scripts/test-service
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

: <<'END_COMMENT'
* relay-interval = 10s
* relay-optimize-interval = 15s
* relay-optimize-count = 3
* src-relay-optimize-interval = 15s
* src-relay-optimize-count = 3
* dst-relay-optimize-interval = 15s
* dst-relay-optimize-count = 3
- relay service [query = 0, time = 0] -> skip
- transfer
Expand All @@ -15,7 +17,7 @@
-
-
-
- relay service [query = 1, time = 10] -> skip
- relay service [query = 1, time = 0] -> skip
- transfer
-
-
Expand All @@ -25,6 +27,16 @@
-
-
-
- relay service [query = 2, time = 10] -> skip
-
-
-
-
-
-
-
-
-
- relay service [query = 2, time = 20] -> exec (time)
- transfer x 3
-
Expand All @@ -34,7 +46,7 @@
-
-
-
-
-
- relay service [query = 3, time = 10] -> exec (count)
END_COMMENT

Expand Down Expand Up @@ -80,7 +92,7 @@ if [ $unrelayed_packets -ne 2 ]; then
exit 1
fi

sleep 7 # wait for relay-interval
sleep 17 # wait for relay-interval
echo "xxxxxxx relay service -> exec - time xxxxxx"

# transfer a token x 3
Expand Down

0 comments on commit 90087f6

Please sign in to comment.