Skip to content

Commit

Permalink
mo_account add column version_offset
Browse files Browse the repository at this point in the history
  • Loading branch information
qingxinhome committed Jan 9, 2025
1 parent 4af809a commit 24ddae8
Show file tree
Hide file tree
Showing 16 changed files with 236 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pkg/bootstrap/custom_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (s *service) UpgradeOneTenant(ctx context.Context, tenantID int32) error {
if err := v.HandleTenantUpgrade(ctx, tenantID, txn); err != nil {
return err
}
if err := versions.UpgradeTenantVersion(tenantID, v.Metadata().Version, txn); err != nil {
if err := versions.UpgradeTenantVersion(tenantID, v.Metadata().Version, v.Metadata().VersionOffset, txn); err != nil {
return err
}
from = v.Metadata().Version
Expand Down
2 changes: 1 addition & 1 deletion pkg/bootstrap/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (s *service) execBootstrap(ctx context.Context) error {
if err := initPreprocessSQL(ctx, txn, s.GetFinalVersion(), s.GetFinalVersionOffset()); err != nil {
return err
}
if err := frontend.InitSysTenant(ctx, txn, s.GetFinalVersion()); err != nil {
if err := frontend.InitSysTenant(ctx, txn, s.GetFinalVersion(), s.GetFinalVersionOffset()); err != nil {
return err
}
if err := sysview.InitSchema(ctx, txn); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bootstrap/service_upgrade_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *service) MaybeUpgradeTenant(
if err := v.HandleTenantUpgrade(ctx, tenantID, txn); err != nil {
return err
}
if err := versions.UpgradeTenantVersion(tenantID, v.Metadata().Version, txn); err != nil {
if err := versions.UpgradeTenantVersion(tenantID, v.Metadata().Version, v.Metadata().VersionOffset, txn); err != nil {
return err
}
from = v.Metadata().Version
Expand Down Expand Up @@ -213,7 +213,7 @@ func (s *service) asyncUpgradeTenantTask(ctx context.Context) {
return err
}

if err = versions.UpgradeTenantVersion(id, h.Metadata().Version, txn); err != nil {
if err = versions.UpgradeTenantVersion(id, h.Metadata().Version, h.Metadata().VersionOffset, txn); err != nil {
s.logger.Error("failed to update upgrade tenant create version",
zap.Int32("tenant", id),
zap.String("upgrade", upgrade.String()),
Expand Down
5 changes: 2 additions & 3 deletions pkg/bootstrap/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,20 @@ import (
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v2_0_0"
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v2_0_1"
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v2_0_2"
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v2_1_0"
)

// initUpgrade all versions need create a upgrade handle in pkg/bootstrap/versions
// package. And register the upgrade logic into handles.
func (s *service) initUpgrade() {
s.handles = append(s.handles, v1_2_0.Handler)
// TODO: When v1.2.0 release, open the commented code as follows, Enable v1.2.1 upgrade package
s.handles = append(s.handles, v1_2_1.Handler)
// TODO: When v1.2.1 release, open the commented code as follows, Enable v1.2.2 upgrade package
s.handles = append(s.handles, v1_2_2.Handler)
s.handles = append(s.handles, v1_2_3.Handler)
s.handles = append(s.handles, v2_0_0.Handler)
s.handles = append(s.handles, v2_0_1.Handler)
// TODO: When v2.0.1 release, open the commented code as follows, Enable v2.0.2 upgrade package
s.handles = append(s.handles, v2_0_2.Handler)
s.handles = append(s.handles, v2_1_0.Handler)
}

func (s *service) getFinalVersionHandle() VersionHandle {
Expand Down
4 changes: 3 additions & 1 deletion pkg/bootstrap/versions/upgrade_tenant_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,11 @@ func GetTenantCreateVersionForUpdate(
func UpgradeTenantVersion(
tenantID int32,
version string,
versionOffset uint32,
txn executor.TxnExecutor) error {
sql := fmt.Sprintf("update mo_account set create_version = '%s' where account_id = %d",
sql := fmt.Sprintf("update mo_account set create_version = '%s', version_offset = %d where account_id = %d",
version,
versionOffset,
tenantID)
res, err := txn.Exec(sql, executor.StatementOption{})
if err != nil {
Expand Down
39 changes: 39 additions & 0 deletions pkg/bootstrap/versions/v2_1_0/cluster_upgrade_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v2_1_0

import (
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)

var clusterUpgEntries = []versions.UpgradeEntry{
upg_mo_account_version_offset,
}

var upg_mo_account_version_offset = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MOAccountTable,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_account add column version_offset int unsigned default 0 after create_version",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MOAccountTable, "version_offset")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
}
24 changes: 24 additions & 0 deletions pkg/bootstrap/versions/v2_1_0/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v2_1_0

import (
"github.com/matrixorigin/matrixone/pkg/common/log"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
)

func getLogger(sid string) *log.MOLogger {
return runtime.ServiceRuntime(sid).Logger()
}
21 changes: 21 additions & 0 deletions pkg/bootstrap/versions/v2_1_0/tenant_upgrade_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v2_1_0

import (
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
)

var tenantUpgEntries = []versions.UpgradeEntry{}
105 changes: 105 additions & 0 deletions pkg/bootstrap/versions/v2_1_0/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v2_1_0

import (
"context"
"time"

"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)

var (
Handler = &versionHandle{
metadata: versions.Version{
Version: "2.1.0",
MinUpgradeVersion: "2.0.2",
UpgradeCluster: versions.Yes,
UpgradeTenant: versions.Yes,
VersionOffset: uint32(len(clusterUpgEntries) + len(tenantUpgEntries)),
},
}
)

type versionHandle struct {
metadata versions.Version
}

func (v *versionHandle) Metadata() versions.Version {
return v.metadata
}

func (v *versionHandle) Prepare(
ctx context.Context,
txn executor.TxnExecutor,
final bool) error {
txn.Use(catalog.MO_CATALOG)
return nil
}

func (v *versionHandle) HandleTenantUpgrade(
ctx context.Context,
tenantID int32,
txn executor.TxnExecutor) error {

for _, upgEntry := range tenantUpgEntries {
start := time.Now()

err := upgEntry.Upgrade(txn, uint32(tenantID))
if err != nil {
getLogger(txn.Txn().TxnOptions().CN).Error("tenant upgrade entry execute error", zap.Error(err), zap.Int32("tenantId", tenantID), zap.String("version", v.Metadata().Version), zap.String("upgrade entry", upgEntry.String()))
return err
}

duration := time.Since(start)
getLogger(txn.Txn().TxnOptions().CN).Info("tenant upgrade entry complete",
zap.String("upgrade entry", upgEntry.String()),
zap.Int64("time cost(ms)", duration.Milliseconds()),
zap.Int32("tenantId", tenantID),
zap.String("toVersion", v.Metadata().Version))
}

return nil
}

func (v *versionHandle) HandleClusterUpgrade(
ctx context.Context,
txn executor.TxnExecutor) error {
for _, upgEntry := range clusterUpgEntries {
start := time.Now()

err := upgEntry.Upgrade(txn, catalog.System_Account)
if err != nil {
getLogger(txn.Txn().TxnOptions().CN).Error("cluster upgrade entry execute error", zap.Error(err), zap.String("version", v.Metadata().Version), zap.String("upgrade entry", upgEntry.String()))
return err
}

duration := time.Since(start)
getLogger(txn.Txn().TxnOptions().CN).Info("cluster upgrade entry complete",
zap.String("upgrade entry", upgEntry.String()),
zap.Int64("time cost(ms)", duration.Milliseconds()),
zap.String("toVersion", v.Metadata().Version))
}
return nil
}

func (v *versionHandle) HandleCreateFrameworkDeps(txn executor.TxnExecutor) error {
return moerr.NewInternalErrorNoCtxf("Only v1.2.0 can initialize upgrade framework, current version is:%s", Handler.metadata.Version)
}
4 changes: 4 additions & 0 deletions pkg/cnservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ func (s *service) GetFinalVersion() string {
return s.bootstrapService.GetFinalVersion()
}

func (s *service) GetFinalVersionOffset() int32 {
return s.bootstrapService.GetFinalVersionOffset()
}

func (s *service) stopFrontend() error {
defer logutil.LogClose(s.logger, "cnservice/frontend")()

Expand Down
13 changes: 8 additions & 5 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,14 +1107,16 @@ var (
status,
created_time,
comments,
create_version) values (%d,"%s","%s","%s","%s","%s","%s");`
create_version,
version_offset) values (%d,"%s","%s","%s","%s","%s","%s",%d);`
initMoAccountWithoutIDFormat = `insert into mo_catalog.mo_account(
account_name,
admin_name,
status,
created_time,
comments,
create_version) values ("%s","%s","%s","%s","%s","%s");`
create_version,
version_offset) values ("%s","%s","%s","%s","%s","%s",%d);`
initMoRoleFormat = `insert into mo_catalog.mo_role(
role_id,
role_name,
Expand Down Expand Up @@ -7313,6 +7315,7 @@ func InitGeneralTenant(ctx context.Context, bh BackgroundExec, ses *Session, ca
defer span.End()
tenant := ses.GetTenantInfo()
finalVersion := ses.rm.baseService.GetFinalVersion()
finalVersionOffset := ses.rm.baseService.GetFinalVersionOffset()

if !(tenant.IsSysTenant() && tenant.IsMoAdminRole()) {
return moerr.NewInternalErrorf(ctx, "tenant %s user %s role %s do not have the privilege to create the new account", tenant.GetTenant(), tenant.GetUser(), tenant.GetDefaultRole())
Expand Down Expand Up @@ -7390,7 +7393,7 @@ func InitGeneralTenant(ctx context.Context, bh BackgroundExec, ses *Session, ca
}
return rtnErr
} else {
newTenant, newTenantCtx, rtnErr = createTablesInMoCatalogOfGeneralTenant(ctx, bh, finalVersion, ca)
newTenant, newTenantCtx, rtnErr = createTablesInMoCatalogOfGeneralTenant(ctx, bh, finalVersion, finalVersionOffset, ca)
if rtnErr != nil {
return rtnErr
}
Expand Down Expand Up @@ -7472,7 +7475,7 @@ func InitGeneralTenant(ctx context.Context, bh BackgroundExec, ses *Session, ca
}

// createTablesInMoCatalogOfGeneralTenant creates catalog tables in the database mo_catalog.
func createTablesInMoCatalogOfGeneralTenant(ctx context.Context, bh BackgroundExec, finalVersion string, ca *createAccount) (*TenantInfo, context.Context, error) {
func createTablesInMoCatalogOfGeneralTenant(ctx context.Context, bh BackgroundExec, finalVersion string, finalVersionOffset int32, ca *createAccount) (*TenantInfo, context.Context, error) {
var err error
var initMoAccount string
var erArray []ExecResult
Expand Down Expand Up @@ -7510,7 +7513,7 @@ func createTablesInMoCatalogOfGeneralTenant(ctx context.Context, bh BackgroundEx
}
}

initMoAccount = fmt.Sprintf(initMoAccountWithoutIDFormat, ca.Name, ca.AdminName, status, types.CurrentTimestamp().String2(time.UTC, 0), comment, finalVersion)
initMoAccount = fmt.Sprintf(initMoAccountWithoutIDFormat, ca.Name, ca.AdminName, status, types.CurrentTimestamp().String2(time.UTC, 0), comment, finalVersion, finalVersionOffset)
//execute the insert
err = bh.Exec(ctx, initMoAccount)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/frontend/authenticate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7490,6 +7490,10 @@ func (m *MockBaseService) GetFinalVersion() string {
return "1.2.0"
}

func (m *MockBaseService) GetFinalVersionOffset() int32 {
return 8
}

func (s *MockBaseService) UpgradeTenant(ctx context.Context, tenantName string, retryCount uint32, isALLAccount bool) error {
//TODO implement me
panic("implement me")
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/predefined.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ var (
comments varchar(256),
version bigint unsigned auto_increment,
suspended_time timestamp default NULL,
create_version varchar(50) default '1.2.0'
create_version varchar(50) default '1.2.0',
version_offset int unsigned default 0
)`

MoCatalogMoRoleDDL = `create table mo_catalog.mo_role (
Expand Down
2 changes: 2 additions & 0 deletions pkg/frontend/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type BaseService interface {
CheckTenantUpgrade(ctx context.Context, tenantID int64) error
// GetFinalVersion Get mo final version, which is based on the current code
GetFinalVersion() string
// GetFinalVersionOffset Get mo final version offset, which is based on the current code
GetFinalVersionOffset() int32
// UpgradeTenant used to upgrade tenant
UpgradeTenant(ctx context.Context, tenantName string, retryCount uint32, isALLAccount bool) error
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/frontend/system_initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

// InitSysTenant initializes the tenant SYS before any tenants and accepting any requests
// during the system is booting.
func InitSysTenant(ctx context.Context, txn executor.TxnExecutor, finalVersion string) (err error) {
func InitSysTenant(ctx context.Context, txn executor.TxnExecutor, finalVersion string, finalVersonOffset int32) (err error) {
txn.Use(catalog.MO_CATALOG)
res, err := txn.Exec(createDbInformationSchemaSql, executor.StatementOption{})
if err != nil {
Expand All @@ -45,15 +45,15 @@ func InitSysTenant(ctx context.Context, txn executor.TxnExecutor, finalVersion s
}

if !exists {
if err = createTablesInMoCatalog(ctx, txn, finalVersion); err != nil {
if err = createTablesInMoCatalog(ctx, txn, finalVersion, finalVersonOffset); err != nil {
return err
}
}
return err
}

// createTablesInMoCatalog creates catalog tables in the database mo_catalog.
func createTablesInMoCatalog(ctx context.Context, txn executor.TxnExecutor, finalVersion string) error {
func createTablesInMoCatalog(ctx context.Context, txn executor.TxnExecutor, finalVersion string, finalVersonOffset int32) error {
var initMoAccount string
var initDataSqls []string
var err error
Expand All @@ -69,7 +69,7 @@ func createTablesInMoCatalog(ctx context.Context, txn executor.TxnExecutor, fina

//initialize the default data of tables for the tenant
//step 1: add new tenant entry to the mo_account
initMoAccount = fmt.Sprintf(initMoAccountFormat, sysAccountID, sysAccountName, rootName, sysAccountStatus, types.CurrentTimestamp().String2(time.UTC, 0), sysAccountComments, finalVersion)
initMoAccount = fmt.Sprintf(initMoAccountFormat, sysAccountID, sysAccountName, rootName, sysAccountStatus, types.CurrentTimestamp().String2(time.UTC, 0), sysAccountComments, finalVersion, finalVersonOffset)
addSqlIntoSet(initMoAccount)

//step 2:add new role entries to the mo_role
Expand Down
Loading

0 comments on commit 24ddae8

Please sign in to comment.