diff --git a/cmd/service.go b/cmd/service.go index 6c797b5d..e80278e7 100644 --- a/cmd/service.go +++ b/cmd/service.go @@ -81,9 +81,9 @@ func startCmd(ctx *config.Context) *cobra.Command { } cmd.Flags().Duration(flagRelayInterval, defaultRelayInterval, "time interval to perform relays") cmd.Flags().String(flagPrometheusAddr, defaultPrometheusAddr, "host address to which the prometheus exporter listens") - cmd.Flags().Duration(flagSrcRelayOptimizeInterval, defaultRelayOptimizeInterval, "time interval to perform relays optimization on source chain") - cmd.Flags().Int64(flagSrcRelayOptimizeCount, defaultRelayOptimizeCount, "number of packets to relays optimization on source chain") - cmd.Flags().Duration(flagDstRelayOptimizeInterval, defaultRelayOptimizeInterval, "time interval to perform relays optimization on destination chain") - cmd.Flags().Int64(flagDstRelayOptimizeCount, defaultRelayOptimizeCount, "number of packets to relays optimization on destination chain") + cmd.Flags().Duration(flagSrcRelayOptimizeInterval, defaultRelayOptimizeInterval, "maximum time interval to delay relays for optimization") + cmd.Flags().Int64(flagSrcRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization") + cmd.Flags().Duration(flagDstRelayOptimizeInterval, defaultRelayOptimizeInterval, "maximum time interval to delay relays for optimization") + cmd.Flags().Int64(flagDstRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization") return cmd } diff --git a/cmd/tx.go b/cmd/tx.go index 9f9ad72b..f16cf02d 100644 --- a/cmd/tx.go +++ b/cmd/tx.go @@ -197,13 +197,18 @@ func relayMsgsCmd(ctx *config.Context) *cobra.Command { msgs := core.NewRelayMsgs() - if m, err := st.UpdateClients(c[src], c[dst], sp, &core.RelayPackets{}, sh, viper.GetBool(flagDoRefresh)); err != nil { + doExecuteRelaySrc := len(sp.Src) > 0 + doExecuteRelayDst := len(sp.Dst) > 0 + doExecuteAckSrc := false + doExecuteAckDst := false + + if m, err := st.UpdateClients(c[src], c[dst], doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, sh, viper.GetBool(flagDoRefresh)); err != nil { return err } else { msgs.Merge(m) } - if m, err := st.RelayPackets(c[src], c[dst], sp, sh, true, true); err != nil { + if m, err := st.RelayPackets(c[src], c[dst], sp, sh, doExecuteRelaySrc, doExecuteRelayDst); err != nil { return err } else { msgs.Merge(m) @@ -258,13 +263,18 @@ func relayAcksCmd(ctx *config.Context) *cobra.Command { msgs := core.NewRelayMsgs() - if m, err := st.UpdateClients(c[src], c[dst], &core.RelayPackets{}, sp, sh, viper.GetBool(flagDoRefresh)); err != nil { + doExecuteRelaySrc := false + doExecuteRelayDst := false + doExecuteAckSrc := len(sp.Src) > 0 + doExecuteAckDst := len(sp.Dst) > 0 + + if m, err := st.UpdateClients(c[src], c[dst], doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, sh, viper.GetBool(flagDoRefresh)); err != nil { return err } else { msgs.Merge(m) } - if m, err := st.RelayAcknowledgements(c[src], c[dst], sp, sh, true, true); err != nil { + if m, err := st.RelayAcknowledgements(c[src], c[dst], sp, sh, doExecuteAckSrc, doExecuteAckDst); err != nil { return err } else { msgs.Merge(m) diff --git a/core/naive-strategy.go b/core/naive-strategy.go index 0091c60f..f87fe347 100644 --- a/core/naive-strategy.go +++ b/core/naive-strategy.go @@ -199,7 +199,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader }, nil } -func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, srcOptimizeRelay, dstOptimizeRelay bool) (*RelayMsgs, error) { +func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteRelaySrc, doExecuteRelayDst bool) (*RelayMsgs, error) { logger := GetChannelPairLogger(src, dst) defer logger.TimeTrack(time.Now(), "RelayPackets", "num_src", len(rp.Src), "num_dst", len(rp.Dst)) @@ -225,7 +225,7 @@ func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, return nil, err } - if srcOptimizeRelay { + if doExecuteRelaySrc { msgs.Dst, err = collectPackets(srcCtx, src, rp.Src, dstAddress) if err != nil { logger.Error( @@ -236,7 +236,7 @@ func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, } } - if dstOptimizeRelay { + if doExecuteRelayDst { msgs.Src, err = collectPackets(dstCtx, dst, rp.Dst, srcAddress) if err != nil { logger.Error( @@ -420,7 +420,7 @@ func logPacketsRelayed(src, dst Chain, num int, obj string, dir string) { ) } -func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, srcOptimizeRelay, dstOptimizeRelay bool) (*RelayMsgs, error) { +func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteRelaySrc, doExecuteRelayDst bool) (*RelayMsgs, error) { logger := GetChannelPairLogger(src, dst) defer logger.TimeTrack(time.Now(), "RelayAcknowledgements", "num_src", len(rp.Src), "num_dst", len(rp.Dst)) @@ -445,13 +445,13 @@ func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *Rela return nil, err } - if !st.dstNoAck && srcOptimizeRelay { + if !st.dstNoAck && doExecuteRelaySrc { msgs.Dst, err = collectAcks(srcCtx, src, rp.Src, dstAddress) if err != nil { return nil, err } } - if !st.srcNoAck && dstOptimizeRelay { + if !st.srcNoAck && doExecuteRelayDst { msgs.Src, err = collectAcks(dstCtx, dst, rp.Dst, srcAddress) if err != nil { return nil, err @@ -495,16 +495,13 @@ func collectAcks(ctx QueryContext, chain *ProvableChain, packets PacketInfoList, return msgs, nil } -func (st *NaiveStrategy) UpdateClients(src, dst *ProvableChain, rpForRecv, rpForAck *RelayPackets, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) { +func (st *NaiveStrategy) UpdateClients(src, dst *ProvableChain, doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst bool, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) { logger := GetChannelPairLogger(src, dst) msgs := NewRelayMsgs() - // check if unrelayed packets or acks exist - needsUpdateForSrc := len(rpForRecv.Dst) > 0 || - !st.srcNoAck && len(rpForAck.Dst) > 0 - needsUpdateForDst := len(rpForRecv.Src) > 0 || - !st.dstNoAck && len(rpForAck.Src) > 0 + needsUpdateForSrc := doExecuteRelaySrc || (doExecuteAckSrc && !st.srcNoAck) + needsUpdateForDst := doExecuteRelayDst || (doExecuteAckDst && !st.dstNoAck) // check if LC refresh is needed if !needsUpdateForSrc && doRefresh { diff --git a/core/service.go b/core/service.go index 811b5719..c0425927 100644 --- a/core/service.go +++ b/core/service.go @@ -85,19 +85,19 @@ func NewRelayService( dstOptimizeInterval: dstOptimizeInterval, dstOptimizeCount: dstOptimizeCount, srcRelayPacketStartTime: WatchStartTime{ - AlreadySet: true, + AlreadySet: false, StartTime: time.Now(), }, dstRelayPacketStartTime: WatchStartTime{ - AlreadySet: true, + AlreadySet: false, StartTime: time.Now(), }, srcRelayAckStartTime: WatchStartTime{ - AlreadySet: true, + AlreadySet: false, StartTime: time.Now(), }, dstRelayAckStartTime: WatchStartTime{ - AlreadySet: true, + AlreadySet: false, StartTime: time.Now(), }, }, @@ -157,14 +157,6 @@ func (srv *RelayService) Serve(ctx context.Context) error { msgs := NewRelayMsgs() - // update clients - if m, err := srv.st.UpdateClients(srv.src, srv.dst, pseqs, aseqs, srv.sh, true); err != nil { - logger.Error("failed to update clients", err) - return err - } else { - msgs.Merge(m) - } - // reset watch start time for packets if len(pseqs.Src) > 0 { resetWatchStartTime(&srv.optimizeRelay.srcRelayPacketStartTime) @@ -173,43 +165,53 @@ func (srv *RelayService) Serve(ctx context.Context) error { resetWatchStartTime(&srv.optimizeRelay.dstRelayPacketStartTime) } + // reset watch start time for acks + if len(aseqs.Src) > 0 { + resetWatchStartTime(&srv.optimizeRelay.srcRelayAckStartTime) + } + if len(aseqs.Dst) > 0 { + resetWatchStartTime(&srv.optimizeRelay.dstRelayAckStartTime) + } + + doExecuteRelaySrc, doExecuteRelayDst := srv.shouldExecuteRelay(pseqs, srv.optimizeRelay.srcRelayPacketStartTime, srv.optimizeRelay.dstRelayPacketStartTime) + doExecuteAckSrc, doExecuteAckDst := srv.shouldExecuteRelay(aseqs, srv.optimizeRelay.srcRelayAckStartTime, srv.optimizeRelay.dstRelayAckStartTime) + + // update clients + if m, err := srv.st.UpdateClients(srv.src, srv.dst, doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, srv.sh, true); err != nil { + logger.Error("failed to update clients", err) + return err + } else { + msgs.Merge(m) + } + // relay packets if unrelayed seqs exist - srcRelayPackets, dstRelayPackets := srv.shouldExecuteRelay(pseqs, srv.optimizeRelay.srcRelayPacketStartTime, srv.optimizeRelay.dstRelayPacketStartTime) - if m, err := srv.st.RelayPackets(srv.src, srv.dst, pseqs, srv.sh, srcRelayPackets, dstRelayPackets); err != nil { + if m, err := srv.st.RelayPackets(srv.src, srv.dst, pseqs, srv.sh, doExecuteRelaySrc, doExecuteRelayDst); err != nil { logger.Error("failed to relay packets", err) return err } else { - if srcRelayPackets { - srv.optimizeRelay.srcRelayPacketStartTime.AlreadySet = false - } - if dstRelayPackets { - srv.optimizeRelay.dstRelayPacketStartTime.AlreadySet = false - } msgs.Merge(m) } - // reset watch start time for acks - if len(aseqs.Src) > 0 { - resetWatchStartTime(&srv.optimizeRelay.srcRelayAckStartTime) + if doExecuteRelaySrc { + srv.optimizeRelay.srcRelayPacketStartTime.AlreadySet = false } - if len(aseqs.Dst) > 0 { - resetWatchStartTime(&srv.optimizeRelay.dstRelayAckStartTime) + if doExecuteRelayDst { + srv.optimizeRelay.dstRelayPacketStartTime.AlreadySet = false } // relay acks if unrelayed seqs exist - srcRelayAcks, dstRelayAcks := srv.shouldExecuteRelay(aseqs, srv.optimizeRelay.srcRelayAckStartTime, srv.optimizeRelay.dstRelayAckStartTime) - if m, err := srv.st.RelayAcknowledgements(srv.src, srv.dst, aseqs, srv.sh, srcRelayAcks, dstRelayAcks); err != nil { + if m, err := srv.st.RelayAcknowledgements(srv.src, srv.dst, aseqs, srv.sh, doExecuteAckSrc, doExecuteAckDst); err != nil { logger.Error("failed to relay acknowledgements", err) return err } else { - if srcRelayAcks { - srv.optimizeRelay.srcRelayAckStartTime.AlreadySet = false - } - if dstRelayAcks { - srv.optimizeRelay.dstRelayAckStartTime.AlreadySet = false - } msgs.Merge(m) } + if doExecuteAckSrc { + srv.optimizeRelay.srcRelayAckStartTime.AlreadySet = false + } + if doExecuteAckDst { + srv.optimizeRelay.dstRelayAckStartTime.AlreadySet = false + } // send all msgs to src/dst chains srv.st.Send(srv.src, srv.dst, msgs) diff --git a/core/strategies.go b/core/strategies.go index 5985749c..f344ff2f 100644 --- a/core/strategies.go +++ b/core/strategies.go @@ -18,17 +18,17 @@ type StrategyI interface { UnrelayedPackets(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error) // RelayPackets executes RecvPacket to the packets contained in `rp` on both chains (`src` and `dst`). - RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, srcOptimizeRelay, dstOptimizeRelay bool) (*RelayMsgs, error) + RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteRelaySrc, doExecuteRelayDst bool) (*RelayMsgs, error) // UnrelayedAcknowledgements returns packets to execute AcknowledgePacket to on `src` and `dst`. // `includeRelayedButUnfinalized` decides if the result includes packets of which acknowledgePacket has been executed but not finalized UnrelayedAcknowledgements(src, dst *ProvableChain, sh SyncHeaders, includeRelayedButUnfinalized bool) (*RelayPackets, error) // RelayAcknowledgements executes AcknowledgePacket to the packets contained in `rp` on both chains (`src` and `dst`). - RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, srcOptimizeRelay, dstOptimizeRelay bool) (*RelayMsgs, error) + RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteRelaySrc, doExecuteRelayDst bool) (*RelayMsgs, error) // UpdateClients executes UpdateClient only if needed - UpdateClients(src, dst *ProvableChain, rpForRecv, rpForAck *RelayPackets, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) + UpdateClients(src, dst *ProvableChain, doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst bool, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) // Send executes submission of msgs to src/dst chains Send(src, dst Chain, msgs *RelayMsgs) diff --git a/tests/cases/tm2tm/scripts/test-service b/tests/cases/tm2tm/scripts/test-service index 92b4abca..21aa3910 100755 --- a/tests/cases/tm2tm/scripts/test-service +++ b/tests/cases/tm2tm/scripts/test-service @@ -2,8 +2,10 @@ : <<'END_COMMENT' * relay-interval = 10s -* relay-optimize-interval = 15s -* relay-optimize-count = 3 +* src-relay-optimize-interval = 15s +* src-relay-optimize-count = 3 +* dst-relay-optimize-interval = 15s +* dst-relay-optimize-count = 3 - relay service [query = 0, time = 0] -> skip - transfer @@ -15,7 +17,7 @@ - - - -- relay service [query = 1, time = 10] -> skip +- relay service [query = 1, time = 0] -> skip - transfer - - @@ -25,6 +27,16 @@ - - - +- relay service [query = 2, time = 10] -> skip + - + - + - + - + - + - + - + - + - - relay service [query = 2, time = 20] -> exec (time) - transfer x 3 - @@ -34,7 +46,7 @@ - - - - - + - - relay service [query = 3, time = 10] -> exec (count) END_COMMENT @@ -80,7 +92,7 @@ if [ $unrelayed_packets -ne 2 ]; then exit 1 fi -sleep 7 # wait for relay-interval +sleep 17 # wait for relay-interval echo "xxxxxxx relay service -> exec - time xxxxxx" # transfer a token x 3 diff --git a/tests/cases/tmmock2tmmock/scripts/test-service b/tests/cases/tmmock2tmmock/scripts/test-service index 92b4abca..21aa3910 100755 --- a/tests/cases/tmmock2tmmock/scripts/test-service +++ b/tests/cases/tmmock2tmmock/scripts/test-service @@ -2,8 +2,10 @@ : <<'END_COMMENT' * relay-interval = 10s -* relay-optimize-interval = 15s -* relay-optimize-count = 3 +* src-relay-optimize-interval = 15s +* src-relay-optimize-count = 3 +* dst-relay-optimize-interval = 15s +* dst-relay-optimize-count = 3 - relay service [query = 0, time = 0] -> skip - transfer @@ -15,7 +17,7 @@ - - - -- relay service [query = 1, time = 10] -> skip +- relay service [query = 1, time = 0] -> skip - transfer - - @@ -25,6 +27,16 @@ - - - +- relay service [query = 2, time = 10] -> skip + - + - + - + - + - + - + - + - + - - relay service [query = 2, time = 20] -> exec (time) - transfer x 3 - @@ -34,7 +46,7 @@ - - - - - + - - relay service [query = 3, time = 10] -> exec (count) END_COMMENT @@ -80,7 +92,7 @@ if [ $unrelayed_packets -ne 2 ]; then exit 1 fi -sleep 7 # wait for relay-interval +sleep 17 # wait for relay-interval echo "xxxxxxx relay service -> exec - time xxxxxx" # transfer a token x 3