From 920109608bf51d507dca9b803a882e9c2c97df91 Mon Sep 17 00:00:00 2001 From: aptend Date: Thu, 9 Jan 2025 16:14:59 +0800 Subject: [PATCH] fix --- pkg/objectio/injects.go | 14 ++++++++++- pkg/vm/engine/disttae/logtail_consumer.go | 25 ++++++++++++++++--- pkg/vm/engine/tae/logtail/handle.go | 6 +++++ pkg/vm/engine/tae/logtail/service/client.go | 27 ++++++++++++++------- pkg/vm/engine/tae/logtail/table_test.go | 8 ++++++ 5 files changed, 66 insertions(+), 14 deletions(-) diff --git a/pkg/objectio/injects.go b/pkg/objectio/injects.go index 2d2e8c4134760..f905b69ecdc09 100644 --- a/pkg/objectio/injects.go +++ b/pkg/objectio/injects.go @@ -41,7 +41,9 @@ const ( FJ_Debug19524 = "fj/debug/19524" - FJ_CNRecvErr = "fj/cn/recv/err" + FJ_CNRecvErr = "fj/cn/recv/err" + FJ_CNSubSysErr = "fj/cn/recv/subsyserr" + FJ_CNReplayCacheErr = "fj/cn/recv/rcacheerr" FJ_LogReader = "fj/log/reader" FJ_LogWorkspace = "fj/log/workspace" @@ -343,6 +345,16 @@ func CNRecvErrInjected() (bool, int) { return injected, int(p) } +func CNSubSysErrInjected() (bool, int) { + p, _, injected := fault.TriggerFault(FJ_CNSubSysErr) + return injected, int(p) +} + +func CNReplayCacheErrInjected() (bool, int) { + p, _, injected := fault.TriggerFault(FJ_CNReplayCacheErr) + return injected, int(p) +} + func RangesLogInjected(dbName, tableName string) (bool, int) { _, sarg, injected := fault.TriggerFault(FJ_TraceRanges) if !injected { diff --git a/pkg/vm/engine/disttae/logtail_consumer.go b/pkg/vm/engine/disttae/logtail_consumer.go index 4c3836176f291..d8ae775e1d137 100644 --- a/pkg/vm/engine/disttae/logtail_consumer.go +++ b/pkg/vm/engine/disttae/logtail_consumer.go @@ -531,6 +531,9 @@ func (c *PushClient) subscribeTable( } func (c *PushClient) subSysTables(ctx context.Context) error { + if enabled, p := objectio.CNSubSysErrInjected(); enabled && rand.Intn(100000) < p { + return moerr.NewInternalError(ctx, "FIND_TABLE sub sys error injected") + } // push subscription to Table `mo_database`, `mo_table`, `mo_column` of mo_catalog. databaseId := uint64(catalog.MO_CATALOG_ID) tableIds := []uint64{catalog.MO_DATABASE_ID, catalog.MO_TABLES_ID, catalog.MO_COLUMNS_ID} @@ -555,6 +558,10 @@ func (c *PushClient) pause(s bool) { if c.mu.paused { return } + // Note + // If subSysTables fails to send a successful request, receiveLogtails will receive nothing until the context is done. In this case, we attempt to stop the receiveLogtails goroutine immediately. + // The break signal left in the channel will interrupt the normal receiving process, but this is not an issue because reconnecting will create a new channel. + c.subscriber.logTailClient.BreakoutReceive() select { case c.pauseC <- s: c.mu.paused = true @@ -733,6 +740,9 @@ func (c *PushClient) waitTimestamp() { } func (c *PushClient) replayCatalogCache(ctx context.Context, e *Engine) (err error) { + if enabled, p := objectio.CNReplayCacheErrInjected(); enabled && rand.Intn(100000) < p { + return moerr.NewInternalError(ctx, "FIND_TABLE replay catalog cache error injected") + } // replay mo_catalog cache var op client.TxnOperator var result executor.Result @@ -873,6 +883,7 @@ func (c *PushClient) connect(ctx context.Context, e *Engine) { e.setPushClientStatus(false) + // the consumer goroutine is supposed to be stopped. c.stopConsumers() logutil.Infof("%s %s: clean finished, start to reconnect to tn log tail service", logTag, c.serviceID) @@ -912,17 +923,23 @@ func (c *PushClient) connect(ctx context.Context, e *Engine) { c.dcaReset() err = c.subSysTables(ctx) if err != nil { - c.pause(false) + // send on closed channel error: + // receive logtail error -> pause -> reconnect -------------------------> stop + // |-> forced subscribe table timeout -> continue ----> resume + // Any errors related to the logtail consumer should not be retried within the inner connect loop; they should be handled by the outer caller. + // So we break the loop here. + + c.pause(true) logutil.Errorf("%s subscribe system tables failed, err %v", logTag, err) - continue + break } c.waitTimestamp() if err := c.replayCatalogCache(ctx, e); err != nil { - c.pause(false) + c.pause(true) logutil.Errorf("%s replay catalog cache failed, err %v", logTag, err) - continue + break } e.setPushClientStatus(true) diff --git a/pkg/vm/engine/tae/logtail/handle.go b/pkg/vm/engine/tae/logtail/handle.go index 41e5b122aaefd..29d91a8e9d385 100644 --- a/pkg/vm/engine/tae/logtail/handle.go +++ b/pkg/vm/engine/tae/logtail/handle.go @@ -450,6 +450,12 @@ func LoadCheckpointEntries( key := locationsAndVersions[i] version, err := strconv.ParseUint(locationsAndVersions[i+1], 10, 32) if err != nil { + logutil.Error( + "Parse-CKP-Name-Error", + zap.String("loc", metaLoc), + zap.Int("i", i), + zap.Error(err), + ) return nil, nil, err } location, err := objectio.StringToLocation(key) diff --git a/pkg/vm/engine/tae/logtail/service/client.go b/pkg/vm/engine/tae/logtail/service/client.go index 3532878c3c629..ef52e6954b0cb 100644 --- a/pkg/vm/engine/tae/logtail/service/client.go +++ b/pkg/vm/engine/tae/logtail/service/client.go @@ -53,10 +53,11 @@ type LogtailClient struct { // There is another worker send the items in the chan to stream. requestC chan *LogtailRequest - stream morpc.Stream - recvChan chan morpc.Message - broken chan struct{} // mark morpc stream as broken when necessary - once sync.Once + stream morpc.Stream + recvChan chan morpc.Message + breakChan chan struct{} + broken chan struct{} // mark morpc stream as broken when necessary + once sync.Once options struct { rps int @@ -69,11 +70,12 @@ type LogtailClient struct { func NewLogtailClient(ctx context.Context, stream morpc.Stream, opts ...ClientOption) (*LogtailClient, error) { ctx, cancel := context.WithCancel(ctx) client := &LogtailClient{ - ctx: ctx, - cancel: cancel, - requestC: make(chan *LogtailRequest, defaultRequestChanSize), - stream: stream, - broken: make(chan struct{}), + ctx: ctx, + cancel: cancel, + requestC: make(chan *LogtailRequest, defaultRequestChanSize), + stream: stream, + broken: make(chan struct{}), + breakChan: make(chan struct{}, 10), } recvChan, err := stream.Receive() @@ -150,6 +152,10 @@ func (c *LogtailClient) Unsubscribe( return c.sendRequest(request) } +func (c *LogtailClient) BreakoutReceive() { + c.breakChan <- struct{}{} +} + // Receive fetches logtail response. // // 1. response for error: *LogtailResponse.GetError() != nil @@ -162,6 +168,9 @@ func (c *LogtailClient) Receive(ctx context.Context) (*LogtailResponse, error) { case <-ctx.Done(): return nil, ctx.Err() + case <-c.breakChan: + return nil, moerr.NewInternalErrorNoCtx("logtail client: reconnect breakout") + case <-c.broken: return nil, moerr.NewStreamClosedNoCtx() diff --git a/pkg/vm/engine/tae/logtail/table_test.go b/pkg/vm/engine/tae/logtail/table_test.go index bc6d9695c4f6d..744793a39ee60 100644 --- a/pkg/vm/engine/tae/logtail/table_test.go +++ b/pkg/vm/engine/tae/logtail/table_test.go @@ -15,12 +15,14 @@ package logtail import ( + "context" "testing" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnbase" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestTxnTable1(t *testing.T) { @@ -108,3 +110,9 @@ func TestTxnTable1(t *testing.T) { func Less(a int, b int) bool { return a < b } + +func TestLoadCheckpointError(t *testing.T) { + locs := "01944064-7ce8-78be-b072-767fe85ea839_00000_1_2479_1029_9541_0_0;01944064-7ce8-78be-b072-767fe85ea838_00000_1_2479_1029_9541_0_0;" + _, _, err := LoadCheckpointEntries(context.Background(), "", locs, 213, "t1", 214, "d2", nil, nil) + require.Error(t, err) +}