Skip to content

Commit

Permalink
delete unsubscribed account in mo_subs (#20676)
Browse files Browse the repository at this point in the history
delete unsubscribed account in mo_subs

Approved by: @daviszhen, @LeftHandCold, @qingxinhome, @zhangxu19830126, @sukki37
  • Loading branch information
ck89119 authored Dec 9, 2024
1 parent e483a82 commit 2c325a4
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 2 deletions.
5 changes: 5 additions & 0 deletions pkg/bootstrap/versions/v2_0_1/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func migrateMoPubs(txn executor.TxnExecutor) (err error) {
if _, err = txn.Exec(insertSql, executor.StatementOption{}.WithAccountID(0)); err != nil {
return
}

deleteSql := fmt.Sprintf("delete from mo_catalog.mo_subs where pub_account_name = '%s' and pub_name = '%s' and sub_name is null", info.PubAccountName, info.PubName)
if _, err = txn.Exec(deleteSql, executor.StatementOption{}.WithAccountID(0)); err != nil {
return
}
}
return
}
54 changes: 52 additions & 2 deletions pkg/bootstrap/versions/v2_0_1/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package v2_0_1

import (
"strings"
"testing"

"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
Expand All @@ -27,7 +28,9 @@ import (
"github.com/stretchr/testify/assert"
)

type MockTxnExecutor struct{}
type MockTxnExecutor struct {
flag bool
}

func (MockTxnExecutor) Use(db string) {
//TODO implement me
Expand All @@ -39,7 +42,11 @@ func (MockTxnExecutor) LockTable(table string) error {
panic("implement me")
}

func (MockTxnExecutor) Exec(sql string, options executor.StatementOption) (executor.Result, error) {
func (e MockTxnExecutor) Exec(sql string, options executor.StatementOption) (executor.Result, error) {
if strings.HasPrefix(sql, "delete from mo_catalog.mo_subs") && e.flag {
return executor.Result{}, assert.AnError
}

bat := batch.New([]string{"a"})
bat.Vecs[0] = testutil.MakeInt32Vector([]int32{1}, nil)
bat.SetRowCount(1)
Expand Down Expand Up @@ -106,3 +113,46 @@ func Test_migrateMoPubs(t *testing.T) {
err := migrateMoPubs(txn)
assert.NoError(t, err)
}

func Test_migrateMoPubs_deleteFailed(t *testing.T) {
getAccountsStub := gostub.Stub(
&pubsub.GetAccounts,
func(_ executor.TxnExecutor) (map[string]*pubsub.AccountInfo, map[int32]*pubsub.AccountInfo, error) {
return map[string]*pubsub.AccountInfo{
"acc1": {Id: 1, Name: "acc1"},
}, nil, nil
},
)
defer getAccountsStub.Reset()

getAllPubInfosStub := gostub.Stub(
&versions.GetAllPubInfos,
func(_ executor.TxnExecutor, _ map[string]*pubsub.AccountInfo) (map[string]*pubsub.PubInfo, error) {
return map[string]*pubsub.PubInfo{
"sys#pubName": {
PubAccountName: "sys",
PubName: "pubName",
SubAccountsStr: pubsub.AccountAll,
},
"acc1#pubName": {
PubAccountName: "acc1",
PubName: "pubName",
SubAccountsStr: pubsub.AccountAll,
},
}, nil
},
)
defer getAllPubInfosStub.Reset()

getSubbedAccNamesStub := gostub.Stub(
&getSubbedAccNames,
func(_ executor.TxnExecutor, _, _ string, _ map[int32]*pubsub.AccountInfo) ([]string, error) {
return []string{"acc2"}, nil
},
)
defer getSubbedAccNamesStub.Reset()

txn := &MockTxnExecutor{flag: true}
err := migrateMoPubs(txn)
assert.Error(t, err)
}

0 comments on commit 2c325a4

Please sign in to comment.