Skip to content

Commit

Permalink
diagnosis job return create db/table sql
Browse files Browse the repository at this point in the history
  • Loading branch information
LordofAvernus committed Aug 17, 2023
1 parent 7fdd31d commit 01af6e6
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 3 deletions.
71 changes: 68 additions & 3 deletions api/handler/v2/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v2
import (
"archive/tar"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -20,6 +21,7 @@ import (
"syscall"
"time"

usql "github.com/actiontech/dtle/driver/mysql/sql"
nomadApi "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/labstack/echo/v4"
Expand All @@ -29,6 +31,7 @@ import (
"github.com/actiontech/dtle/driver/common"
"github.com/actiontech/dtle/driver/kafka"
mysql "github.com/actiontech/dtle/driver/mysql"
"github.com/actiontech/dtle/driver/mysql/mysqlconfig"
"github.com/actiontech/dtle/g"
)

Expand Down Expand Up @@ -2162,6 +2165,14 @@ func DiagnosisJobAndTarFile(logger g.LoggerType, jobId, src, dst string) (err er
tw := tar.NewWriter(gw)
defer tw.Close()

// schema and table info
jobSchemasName := path.Join(src, "dtle-"+jobId+"-schemas")
err = createSchemaInfoFile(logger, jobId, jobSchemasName)
if err != nil {
return err
}
defer os.Remove(jobSchemasName)

// create job info file for tar
jobInfoFileName := path.Join(src, "dtle-"+jobId+"-info")
err = createJobInfoFile(logger, jobId, jobInfoFileName)
Expand Down Expand Up @@ -2194,7 +2205,7 @@ func DiagnosisJobAndTarFile(logger g.LoggerType, jobId, src, dst string) (err er
if err != nil {
return err
}
hdr.Name = strings.TrimPrefix(filePath, string(filepath.Separator))
hdr.Name = path.Join("dump", hdr.Name)

if err := tw.WriteHeader(hdr); err != nil {
return err
Expand Down Expand Up @@ -2262,7 +2273,7 @@ func recordProcessStatus(logger g.LoggerType, w io.Writer) error {
}

func createJobInfoFile(logger g.LoggerType, jobId, fileName string) error {
logger.Info("create job %v info file", jobId)
logger.Info("create job info file", "jobId", jobId)
f, err := os.OpenFile(fileName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if nil != err {
return fmt.Errorf("capture job info error(%v)", err)
Expand All @@ -2273,7 +2284,6 @@ func createJobInfoFile(logger g.LoggerType, jobId, fileName string) error {
if err != nil {
return fmt.Errorf("get job detail failed")
}

jobByts, err := json.Marshal(jobInfo)
if err != nil {
return err
Expand All @@ -2291,3 +2301,58 @@ func createJobInfoFile(logger g.LoggerType, jobId, fileName string) error {
}
return nil
}
func createSchemaInfoFile(logger g.LoggerType, jobId, fileName string) error {
logger.Info("create job schema file", "jobId", jobId)
f, err := os.OpenFile(fileName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if nil != err {
return fmt.Errorf("capture job info error(%v)", err)
}
defer f.Close()
var fileInfo []byte
// get job detail
resp, err := getJobDetail(logger, jobId, GetJobTypeFromJobId(jobId))
if err != nil {
return err
}
connectionConfig := mysqlconfig.ConnectionConfig{
Host: resp.BasicTaskProfile.ConnectionInfo.SrcDataBase.Host,
Port: resp.BasicTaskProfile.ConnectionInfo.SrcDataBase.Port,
Password: resp.BasicTaskProfile.ConnectionInfo.SrcDataBase.Password,
User: resp.BasicTaskProfile.ConnectionInfo.SrcDataBase.User,
}
db, err := usql.CreateDB(connectionConfig.GetDBUri())
if err != nil {
return err
}

// query create db/table sql
databases, err := usql.ShowDatabases(db)
if err != nil {
return err
}
for _, database := range databases {
fileInfo = append(fileInfo, []byte(" schemaName : "+database+" \n")...)
createSchemas, err := usql.ShowCreateSchema(context.TODO(), db, database)
if err != nil {
return err
}
fileInfo = append(fileInfo, []byte(createSchemas+" \n")...)
tables, err := usql.ShowTables(db, database, true)
if err != nil {
return err
}
for _, table := range tables {
fileInfo = append(fileInfo, []byte(" tableName : "+table.TableName+" \n")...)
createTable, err := usql.ShowCreateTable(context.TODO(), db, database, table.TableName)
if err != nil {
return err
}
fileInfo = append(fileInfo, []byte(createTable+" \n")...)
}
}
err = os.WriteFile(fileName, fileInfo, 0644)
if err != nil {
return fmt.Errorf("write info to file err: %v", err)
}
return nil
}
13 changes: 13 additions & 0 deletions driver/mysql/sql/sqlutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,3 +426,16 @@ func GetSetWaitTimeout(db QueryAble) (originVal int, err error) {
}
return originVal, nil
}

func ShowCreateTable(ctx context.Context, db QueryAble, dbName, tbName string) (r string, err error) {
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", mysqlconfig.EscapeName(dbName), mysqlconfig.EscapeName(tbName))
g.Logger.Debug("ShowCreateTable", "query", query)
row := db.QueryRowContext(ctx, query)
var dummy interface{}
// | Table | Create Table |
err = row.Scan(&dummy, &r)
if err != nil {
return "", errors.Wrap(err, "ShowCreateTable")
}
return r, nil
}

0 comments on commit 01af6e6

Please sign in to comment.