Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
aptend committed Jan 9, 2025
1 parent 694b7c0 commit 9201096
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 14 deletions.
14 changes: 13 additions & 1 deletion pkg/objectio/injects.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ const (

FJ_Debug19524 = "fj/debug/19524"

FJ_CNRecvErr = "fj/cn/recv/err"
FJ_CNRecvErr = "fj/cn/recv/err"
FJ_CNSubSysErr = "fj/cn/recv/subsyserr"
FJ_CNReplayCacheErr = "fj/cn/recv/rcacheerr"

FJ_LogReader = "fj/log/reader"
FJ_LogWorkspace = "fj/log/workspace"
Expand Down Expand Up @@ -343,6 +345,16 @@ func CNRecvErrInjected() (bool, int) {
return injected, int(p)
}

func CNSubSysErrInjected() (bool, int) {
p, _, injected := fault.TriggerFault(FJ_CNSubSysErr)
return injected, int(p)
}

func CNReplayCacheErrInjected() (bool, int) {
p, _, injected := fault.TriggerFault(FJ_CNReplayCacheErr)
return injected, int(p)
}

func RangesLogInjected(dbName, tableName string) (bool, int) {
_, sarg, injected := fault.TriggerFault(FJ_TraceRanges)
if !injected {
Expand Down
25 changes: 21 additions & 4 deletions pkg/vm/engine/disttae/logtail_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,9 @@ func (c *PushClient) subscribeTable(
}

func (c *PushClient) subSysTables(ctx context.Context) error {
if enabled, p := objectio.CNSubSysErrInjected(); enabled && rand.Intn(100000) < p {
return moerr.NewInternalError(ctx, "FIND_TABLE sub sys error injected")
}
// push subscription to Table `mo_database`, `mo_table`, `mo_column` of mo_catalog.
databaseId := uint64(catalog.MO_CATALOG_ID)
tableIds := []uint64{catalog.MO_DATABASE_ID, catalog.MO_TABLES_ID, catalog.MO_COLUMNS_ID}
Expand All @@ -555,6 +558,10 @@ func (c *PushClient) pause(s bool) {
if c.mu.paused {
return
}
// Note
// If subSysTables fails to send a successful request, receiveLogtails will receive nothing until the context is done. In this case, we attempt to stop the receiveLogtails goroutine immediately.
// The break signal left in the channel will interrupt the normal receiving process, but this is not an issue because reconnecting will create a new channel.
c.subscriber.logTailClient.BreakoutReceive()
select {
case c.pauseC <- s:
c.mu.paused = true
Expand Down Expand Up @@ -733,6 +740,9 @@ func (c *PushClient) waitTimestamp() {
}

func (c *PushClient) replayCatalogCache(ctx context.Context, e *Engine) (err error) {
if enabled, p := objectio.CNReplayCacheErrInjected(); enabled && rand.Intn(100000) < p {
return moerr.NewInternalError(ctx, "FIND_TABLE replay catalog cache error injected")
}
// replay mo_catalog cache
var op client.TxnOperator
var result executor.Result
Expand Down Expand Up @@ -873,6 +883,7 @@ func (c *PushClient) connect(ctx context.Context, e *Engine) {

e.setPushClientStatus(false)

// the consumer goroutine is supposed to be stopped.
c.stopConsumers()

logutil.Infof("%s %s: clean finished, start to reconnect to tn log tail service", logTag, c.serviceID)
Expand Down Expand Up @@ -912,17 +923,23 @@ func (c *PushClient) connect(ctx context.Context, e *Engine) {
c.dcaReset()
err = c.subSysTables(ctx)
if err != nil {
c.pause(false)
// send on closed channel error:
// receive logtail error -> pause -> reconnect -------------------------> stop
// |-> forced subscribe table timeout -> continue ----> resume
// Any errors related to the logtail consumer should not be retried within the inner connect loop; they should be handled by the outer caller.
// So we break the loop here.

c.pause(true)
logutil.Errorf("%s subscribe system tables failed, err %v", logTag, err)
continue
break
}

c.waitTimestamp()

if err := c.replayCatalogCache(ctx, e); err != nil {
c.pause(false)
c.pause(true)
logutil.Errorf("%s replay catalog cache failed, err %v", logTag, err)
continue
break
}

e.setPushClientStatus(true)
Expand Down
6 changes: 6 additions & 0 deletions pkg/vm/engine/tae/logtail/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,12 @@ func LoadCheckpointEntries(
key := locationsAndVersions[i]
version, err := strconv.ParseUint(locationsAndVersions[i+1], 10, 32)
if err != nil {
logutil.Error(
"Parse-CKP-Name-Error",
zap.String("loc", metaLoc),
zap.Int("i", i),
zap.Error(err),
)
return nil, nil, err
}
location, err := objectio.StringToLocation(key)
Expand Down
27 changes: 18 additions & 9 deletions pkg/vm/engine/tae/logtail/service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ type LogtailClient struct {
// There is another worker send the items in the chan to stream.
requestC chan *LogtailRequest

stream morpc.Stream
recvChan chan morpc.Message
broken chan struct{} // mark morpc stream as broken when necessary
once sync.Once
stream morpc.Stream
recvChan chan morpc.Message
breakChan chan struct{}
broken chan struct{} // mark morpc stream as broken when necessary
once sync.Once

options struct {
rps int
Expand All @@ -69,11 +70,12 @@ type LogtailClient struct {
func NewLogtailClient(ctx context.Context, stream morpc.Stream, opts ...ClientOption) (*LogtailClient, error) {
ctx, cancel := context.WithCancel(ctx)
client := &LogtailClient{
ctx: ctx,
cancel: cancel,
requestC: make(chan *LogtailRequest, defaultRequestChanSize),
stream: stream,
broken: make(chan struct{}),
ctx: ctx,
cancel: cancel,
requestC: make(chan *LogtailRequest, defaultRequestChanSize),
stream: stream,
broken: make(chan struct{}),
breakChan: make(chan struct{}, 10),
}

recvChan, err := stream.Receive()
Expand Down Expand Up @@ -150,6 +152,10 @@ func (c *LogtailClient) Unsubscribe(
return c.sendRequest(request)
}

func (c *LogtailClient) BreakoutReceive() {
c.breakChan <- struct{}{}
}

// Receive fetches logtail response.
//
// 1. response for error: *LogtailResponse.GetError() != nil
Expand All @@ -162,6 +168,9 @@ func (c *LogtailClient) Receive(ctx context.Context) (*LogtailResponse, error) {
case <-ctx.Done():
return nil, ctx.Err()

case <-c.breakChan:
return nil, moerr.NewInternalErrorNoCtx("logtail client: reconnect breakout")

case <-c.broken:
return nil, moerr.NewStreamClosedNoCtx()

Expand Down
8 changes: 8 additions & 0 deletions pkg/vm/engine/tae/logtail/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package logtail

import (
"context"
"testing"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnbase"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTxnTable1(t *testing.T) {
Expand Down Expand Up @@ -108,3 +110,9 @@ func TestTxnTable1(t *testing.T) {
func Less(a int, b int) bool {
return a < b
}

func TestLoadCheckpointError(t *testing.T) {
locs := "01944064-7ce8-78be-b072-767fe85ea839_00000_1_2479_1029_9541_0_0;01944064-7ce8-78be-b072-767fe85ea838_00000_1_2479_1029_9541_0_0;"
_, _, err := LoadCheckpointEntries(context.Background(), "", locs, 213, "t1", 214, "d2", nil, nil)
require.Error(t, err)
}

0 comments on commit 9201096

Please sign in to comment.