Skip to content

Commit

Permalink
Merge branch 'main' into t8
Browse files Browse the repository at this point in the history
  • Loading branch information
huby2358 authored Jan 6, 2025
2 parents bad04d7 + 8ab49c2 commit 90b5c34
Show file tree
Hide file tree
Showing 37 changed files with 10,396 additions and 9,671 deletions.
2 changes: 1 addition & 1 deletion pkg/bootstrap/versions/upgrade_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func CheckTableColumn(txn executor.TxnExecutor,
}

// CheckViewDefinition Check if the view exists, if so, return true and return the view definition
func CheckViewDefinition(txn executor.TxnExecutor, accountId uint32, schema string, viewName string) (bool, string, error) {
var CheckViewDefinition = func(txn executor.TxnExecutor, accountId uint32, schema string, viewName string) (bool, string, error) {
sql := fmt.Sprintf("SELECT tbl.rel_createsql AS `VIEW_DEFINITION` FROM mo_catalog.mo_tables tbl LEFT JOIN mo_catalog.mo_user usr ON tbl.creator = usr.user_id WHERE tbl.relkind = 'v' AND tbl.reldatabase = '%s' AND tbl.relname = '%s'", schema, viewName)
if accountId == catalog.System_Account {
sql = fmt.Sprintf("SELECT tbl.rel_createsql AS `VIEW_DEFINITION` FROM mo_catalog.mo_tables tbl LEFT JOIN mo_catalog.mo_user usr ON tbl.creator = usr.user_id WHERE tbl.relkind = 'v' AND account_id = 0 AND tbl.reldatabase = '%s' AND tbl.relname = '%s'", schema, viewName)
Expand Down
12 changes: 12 additions & 0 deletions pkg/bootstrap/versions/v2_0_2/cluster_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package v2_0_2
import (
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/frontend"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)

Expand All @@ -25,6 +26,7 @@ var clusterUpgEntries = []versions.UpgradeEntry{
upg_mo_pubs_add_account_name,
upg_mo_subs_add_sub_account_name,
upg_mo_subs_add_pub_account_id,
upg_mo_account_lock,
}

var upg_mo_cdc_watermark = versions.UpgradeEntry{
Expand Down Expand Up @@ -85,3 +87,13 @@ var upg_mo_subs_add_pub_account_id = versions.UpgradeEntry{
},
PostSql: "UPDATE mo_catalog.mo_subs t1 INNER JOIN mo_catalog.mo_account t2 ON t1.pub_account_name = t2.account_name SET t1.pub_account_id = t2.account_id",
}

var upg_mo_account_lock = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_ACCOUNT_LOCK,
UpgType: versions.CREATE_NEW_TABLE,
UpgSql: frontend.MoCatalogMoAccountLockDDL,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
return versions.CheckTableDefinition(txn, accountId, catalog.MO_CATALOG, catalog.MO_ACCOUNT_LOCK)
},
}
47 changes: 46 additions & 1 deletion pkg/bootstrap/versions/v2_0_2/tenant_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,52 @@
package v2_0_2

import (
"fmt"

"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
"github.com/matrixorigin/matrixone/pkg/util/executor"
"github.com/matrixorigin/matrixone/pkg/util/sysview"
)

var tenantUpgEntries = []versions.UpgradeEntry{}
var tenantUpgEntries = []versions.UpgradeEntry{
upg_information_schema_tables,
upg_information_schema_columns,
}

var upg_information_schema_tables = versions.UpgradeEntry{
Schema: sysview.InformationDBConst,
TableName: "TABLES",
UpgType: versions.MODIFY_VIEW,
UpgSql: sysview.InformationSchemaTablesDDL,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
exists, viewDef, err := versions.CheckViewDefinition(txn, accountId, sysview.InformationDBConst, "TABLES")
if err != nil {
return false, err
}

if exists && viewDef == sysview.InformationSchemaTablesDDL {
return true, nil
}
return false, nil
},
PreSql: fmt.Sprintf("DROP VIEW IF EXISTS %s.%s;", sysview.InformationDBConst, "TABLES"),
}

var upg_information_schema_columns = versions.UpgradeEntry{
Schema: sysview.InformationDBConst,
TableName: "COLUMNS",
UpgType: versions.MODIFY_VIEW,
UpgSql: sysview.InformationSchemaColumnsDDL,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
exists, viewDef, err := versions.CheckViewDefinition(txn, accountId, sysview.InformationDBConst, "COLUMNS")
if err != nil {
return false, err
}

if exists && viewDef == sysview.InformationSchemaColumnsDDL {
return true, nil
}
return false, nil
},
PreSql: fmt.Sprintf("DROP VIEW IF EXISTS %s.%s;", sysview.InformationDBConst, "COLUMNS"),
}
2 changes: 1 addition & 1 deletion pkg/bootstrap/versions/v2_0_2/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
Version: "2.0.2",
MinUpgradeVersion: "2.0.1",
UpgradeCluster: versions.Yes,
UpgradeTenant: versions.No,
UpgradeTenant: versions.Yes,
VersionOffset: uint32(len(clusterUpgEntries) + len(tenantUpgEntries)),
},
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/bootstrap/versions/v2_0_2/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/golang/mock/gomock"
"github.com/prashantv/gostub"
"github.com/stretchr/testify/assert"

"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
Expand Down Expand Up @@ -197,3 +198,32 @@ func Test_UpgEntry(t *testing.T) {
},
)
}

func Test_upgrade_view(t *testing.T) {
stubs := gostub.Stub(&versions.CheckViewDefinition, func(txn executor.TxnExecutor, accountId uint32, schema string, viewName string) (bool, string, error) {
return true, "", moerr.NewInternalErrorNoCtx("return error")
})
defer stubs.Reset()
_, err := upg_information_schema_tables.CheckFunc(nil, 0)
assert.Error(t, err)
_, err = upg_information_schema_columns.CheckFunc(nil, 0)
assert.Error(t, err)
}

func Test_upgrade_view2(t *testing.T) {
stubs := gostub.Stub(&versions.CheckViewDefinition, func(txn executor.TxnExecutor, accountId uint32, schema string, viewName string) (bool, string, error) {
return true, upg_information_schema_tables.UpgSql, nil
})
defer stubs.Reset()
_, err := upg_information_schema_tables.CheckFunc(nil, 0)
assert.NoError(t, err)
}

func Test_upgrade_view3(t *testing.T) {
stubs := gostub.Stub(&versions.CheckViewDefinition, func(txn executor.TxnExecutor, accountId uint32, schema string, viewName string) (bool, string, error) {
return true, upg_information_schema_columns.UpgSql, nil
})
defer stubs.Reset()
_, err := upg_information_schema_columns.CheckFunc(nil, 0)
assert.NoError(t, err)
}
2 changes: 2 additions & 0 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ const (
MO_DATA_KEY = "mo_data_key"

MO_TABLE_STATS = "mo_table_stats_alpha"

MO_ACCOUNT_LOCK = "__mo_account_lock"
)

func IsSystemTable(id uint64) bool {
Expand Down
68 changes: 67 additions & 1 deletion pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,7 @@ var (
"mo_cdc_task": 0,
"mo_cdc_watermark": 0,
catalog.MO_TABLE_STATS: 0,
catalog.MO_ACCOUNT_LOCK: 0,
}
createDbInformationSchemaSql = "create database information_schema;"
createAutoTableSql = MoCatalogMoAutoIncrTableDDL
Expand Down Expand Up @@ -1007,6 +1008,7 @@ var (
MoCatalogMoCdcWatermarkDDL,
MoCatalogMoDataKeyDDL,
MoCatalogMoTableStatsDDL,
MoCatalogMoAccountLockDDL,
}

//drop tables for the tenant
Expand Down Expand Up @@ -1186,6 +1188,8 @@ const (

deleteAccountFromMoAccountFormat = `delete from mo_catalog.mo_account where account_name = "%s" order by account_id;`

lockMoAccountNameFormat = `select account_name from mo_catalog.__mo_account_lock where account_name = "%s" for update;`

deletePitrFromMoPitrFormat = `delete from mo_catalog.mo_pitr where create_account = %d;`

getPasswordOfUserFormat = `select user_id, authentication_string, default_role from mo_catalog.mo_user where user_name = "%s" order by user_id;`
Expand Down Expand Up @@ -1637,6 +1641,14 @@ func getSqlForDeleteAccountFromMoAccount(ctx context.Context, account string) (s
return fmt.Sprintf(deleteAccountFromMoAccountFormat, account), nil
}

func getSqlForLockMoAccountNameFormat(ctx context.Context, account string) (string, error) {
err := inputNameIsInvalid(ctx, account)
if err != nil {
return "", err
}
return fmt.Sprintf(lockMoAccountNameFormat, account), nil
}

func getSqlForDeletePitrFromMoPitr(accountId uint64) string {
return fmt.Sprintf(deletePitrFromMoPitrFormat, accountId)
}
Expand Down Expand Up @@ -2950,6 +2962,17 @@ func doAlterAccount(ctx context.Context, ses *Session, aa *alterAccount) (err er
return rtnErr
}

//step 0: lock account name first
sql, rtnErr = getSqlForLockMoAccountNameFormat(ctx, aa.Name)
if rtnErr != nil {
return rtnErr
}
bh.ClearExecResultSet()
rtnErr = bh.Exec(ctx, sql)
if rtnErr != nil {
return rtnErr
}

//step 1: check account exists or not
//get accountID
sql, rtnErr = getSqlForCheckTenant(ctx, aa.Name)
Expand Down Expand Up @@ -3614,7 +3637,7 @@ func doDropAccount(ctx context.Context, bh BackgroundExec, ses *Session, da *dro
}

checkAccount := func(accountName string) (accountId int64, version uint64, ok bool, err error) {
if sql, err = getSqlForCheckTenant(ctx, da.Name); err != nil {
if sql, err = getSqlForCheckTenant(ctx, accountName); err != nil {
return
}

Expand All @@ -3641,6 +3664,26 @@ func doDropAccount(ctx context.Context, bh BackgroundExec, ses *Session, da *dro
}

dropAccountFunc := func() (rtnErr error) {
rtnErr = bh.Exec(ctx, "begin;")
defer func() {
rtnErr = finishTxn(ctx, bh, rtnErr)
}()
if rtnErr != nil {
return rtnErr
}

//step 0: lock account name first
sql, rtnErr = getSqlForLockMoAccountNameFormat(ctx, da.Name)
ses.Infof(ctx, "dropAccount %s sql: %s", da.Name, sql)
if rtnErr != nil {
return rtnErr
}
bh.ClearExecResultSet()
rtnErr = bh.Exec(ctx, sql)
if rtnErr != nil {
return rtnErr
}

ses.Infof(ctx, "dropAccount %s sql: %s", da.Name, getAccountIdNamesSql)
if accountId, version, hasAccount, rtnErr = checkAccount(da.Name); rtnErr != nil {
return
Expand Down Expand Up @@ -7265,6 +7308,7 @@ func InitGeneralTenant(ctx context.Context, bh BackgroundExec, ses *Session, ca
var newTenant *TenantInfo
var newTenantCtx context.Context
var mp *mpool.MPool
var sql string
ctx, span := trace.Debug(ctx, "InitGeneralTenant")
defer span.End()
tenant := ses.GetTenantInfo()
Expand Down Expand Up @@ -7307,6 +7351,25 @@ func InitGeneralTenant(ctx context.Context, bh BackgroundExec, ses *Session, ca
defer mpool.DeleteMPool(mp)

createNewAccount := func() (rtnErr error) {
rtnErr = bh.Exec(ctx, "begin;")
defer func() {
rtnErr = finishTxn(ctx, bh, rtnErr)
}()
if rtnErr != nil {
return rtnErr
}

//step 0: lock account name first
sql, rtnErr = getSqlForLockMoAccountNameFormat(ctx, ca.Name)
if rtnErr != nil {
return rtnErr
}
bh.ClearExecResultSet()
rtnErr = bh.Exec(ctx, sql)
if rtnErr != nil {
return rtnErr
}

start1 := time.Now()
//USE the mo_catalog
// MOVE into txn, make sure only create ONE txn.
Expand Down Expand Up @@ -7527,6 +7590,9 @@ func createTablesInMoCatalogOfGeneralTenant2(bh BackgroundExec, ca *createAccoun
if strings.HasPrefix(sql, fmt.Sprintf("create table mo_catalog.%s", catalog.MO_TABLE_STATS)) {
return true
}
if strings.HasPrefix(sql, fmt.Sprintf("create table mo_catalog.%s", catalog.MO_ACCOUNT_LOCK)) {
return true
}
return false
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3212,7 +3212,7 @@ func ExecRequest(ses *Session, execCtx *ExecCtx, req *Request) (resp *Response,
logStatementStatus(execCtx.reqCtx, ses, execCtx.stmt, fail, err)
}
}()
_, _, _ = fault.TriggerFault("exec_request_panic")
_, _, _ = fault.TriggerFaultInDomain(fault.DomainFrontend, "exec_request_panic")

ses.EnterFPrint(FPExecRequest)
defer ses.ExitFPrint(FPExecRequest)
Expand Down
16 changes: 8 additions & 8 deletions pkg/frontend/mysql_cmd_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1560,15 +1560,15 @@ func Benchmark_RecordStatement_IsTrue(b *testing.B) {
}

func Test_panic(t *testing.T) {
fault.Enable()
defer fault.Disable()
fault.EnableDomain(fault.DomainFrontend)
defer fault.DisableDomain(fault.DomainFrontend)

ctrl := gomock.NewController(t)
defer ctrl.Finish()

runPanic := func(panicChoice int64) {
fault.AddFaultPoint(context.Background(), "exec_request_panic", ":::", "panic", panicChoice, "has panic", false)
defer fault.RemoveFaultPoint(context.Background(), "exec_request_panic")
fault.AddFaultPointInDomain(context.Background(), fault.DomainFrontend, "exec_request_panic", ":::", "panic", panicChoice, "has panic", false)
defer fault.RemoveFaultPointFromDomain(context.Background(), fault.DomainFrontend, "exec_request_panic")

ses := newTestSession(t, ctrl)
execCtx := &ExecCtx{
Expand All @@ -1588,15 +1588,15 @@ func Test_panic(t *testing.T) {
}

func Test_run_panic(t *testing.T) {
fault.Enable()
defer fault.Disable()
fault.EnableDomain(fault.DomainFrontend)
defer fault.DisableDomain(fault.DomainFrontend)

ctrl := gomock.NewController(t)
defer ctrl.Finish()

runPanic := func(panicChoice int64) {
fault.AddFaultPoint(context.Background(), "executeStmtWithWorkspace_panic", ":::", "panic", panicChoice, "has panic", false)
defer fault.RemoveFaultPoint(context.Background(), "executeStmtWithWorkspace_panic")
fault.AddFaultPointInDomain(context.Background(), fault.DomainFrontend, "executeStmtWithWorkspace_panic", ":::", "panic", panicChoice, "has panic", false)
defer fault.RemoveFaultPointFromDomain(context.Background(), fault.DomainFrontend, "executeStmtWithWorkspace_panic")

ses := newTestSession(t, ctrl)
execCtx := &ExecCtx{
Expand Down
4 changes: 4 additions & 0 deletions pkg/frontend/predefined.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ var (
takes bigint unsigned,
primary key(account_id, database_id, table_id)
)`, catalog.MO_TABLE_STATS)

MoCatalogMoAccountLockDDL = fmt.Sprintf(`create table mo_catalog.%s(
account_name varchar(300) primary key
)`, catalog.MO_ACCOUNT_LOCK)
)

// `mo_catalog` database system tables
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/compile/sql_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,17 @@ func (exec *txnExecutor) Exec(
exec.s.us,
nil,
)
accId, err := defines.GetAccountId(proc.Ctx)
if err != nil {
return executor.Result{}, err
}
useId := defines.GetUserId(proc.Ctx)
roleId := defines.GetRoleId(proc.Ctx)

proc.Base.WaitPolicy = statementOption.WaitPolicy()
proc.Base.SessionInfo.AccountId = accId
proc.Base.SessionInfo.UserId = useId
proc.Base.SessionInfo.RoleId = roleId
proc.Base.SessionInfo.TimeZone = exec.opts.GetTimeZone()
proc.Base.SessionInfo.Buf = exec.s.buf
proc.Base.SessionInfo.StorageEngine = exec.s.eng
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/parsers/dialect/mysql/keywords.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,5 +649,6 @@ func init() {
"apply": APPLY,
"dedup": DEDUP,
"savepoint": SAVEPOINT,
"recovery_window": RECOVERY_WINDOW,
}
}
Loading

0 comments on commit 90b5c34

Please sign in to comment.