diff --git a/api/handler/v2/job.go b/api/handler/v2/job.go index 6c2cfaeb5..2cfdc0ce7 100644 --- a/api/handler/v2/job.go +++ b/api/handler/v2/job.go @@ -3,6 +3,7 @@ package v2 import ( "archive/tar" "compress/gzip" + "context" "encoding/json" "errors" "fmt" @@ -20,6 +21,7 @@ import ( "syscall" "time" + usql "github.com/actiontech/dtle/driver/mysql/sql" nomadApi "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" "github.com/labstack/echo/v4" @@ -29,6 +31,7 @@ import ( "github.com/actiontech/dtle/driver/common" "github.com/actiontech/dtle/driver/kafka" mysql "github.com/actiontech/dtle/driver/mysql" + "github.com/actiontech/dtle/driver/mysql/mysqlconfig" "github.com/actiontech/dtle/g" ) @@ -2162,6 +2165,14 @@ func DiagnosisJobAndTarFile(logger g.LoggerType, jobId, src, dst string) (err er tw := tar.NewWriter(gw) defer tw.Close() + // schema and table info + jobSchemasName := path.Join(src, "dtle-"+jobId+"-schemas") + err = createSchemaInfoFile(logger, jobId, jobSchemasName) + if err != nil { + return err + } + defer os.Remove(jobSchemasName) + // create job info file for tar jobInfoFileName := path.Join(src, "dtle-"+jobId+"-info") err = createJobInfoFile(logger, jobId, jobInfoFileName) @@ -2194,7 +2205,7 @@ func DiagnosisJobAndTarFile(logger g.LoggerType, jobId, src, dst string) (err er if err != nil { return err } - hdr.Name = strings.TrimPrefix(filePath, string(filepath.Separator)) + hdr.Name = path.Join("dump", hdr.Name) if err := tw.WriteHeader(hdr); err != nil { return err @@ -2262,7 +2273,7 @@ func recordProcessStatus(logger g.LoggerType, w io.Writer) error { } func createJobInfoFile(logger g.LoggerType, jobId, fileName string) error { - logger.Info("create job %v info file", jobId) + logger.Info("create job info file", "jobId", jobId) f, err := os.OpenFile(fileName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) if nil != err { return fmt.Errorf("capture job info error(%v)", err) @@ -2273,7 +2284,6 @@ func createJobInfoFile(logger g.LoggerType, jobId, fileName string) error { if err != nil { return fmt.Errorf("get job detail failed") } - jobByts, err := json.Marshal(jobInfo) if err != nil { return err @@ -2291,3 +2301,58 @@ func createJobInfoFile(logger g.LoggerType, jobId, fileName string) error { } return nil } +func createSchemaInfoFile(logger g.LoggerType, jobId, fileName string) error { + logger.Info("create job schema file", "jobId", jobId) + f, err := os.OpenFile(fileName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) + if nil != err { + return fmt.Errorf("capture job info error(%v)", err) + } + defer f.Close() + var fileInfo []byte + // get job detail + resp, err := getJobDetail(logger, jobId, GetJobTypeFromJobId(jobId)) + if err != nil { + return err + } + connectionConfig := mysqlconfig.ConnectionConfig{ + Host: resp.BasicTaskProfile.ConnectionInfo.SrcDataBase.Host, + Port: resp.BasicTaskProfile.ConnectionInfo.SrcDataBase.Port, + Password: resp.BasicTaskProfile.ConnectionInfo.SrcDataBase.Password, + User: resp.BasicTaskProfile.ConnectionInfo.SrcDataBase.User, + } + db, err := usql.CreateDB(connectionConfig.GetDBUri()) + if err != nil { + return err + } + + // query create db/table sql + databases, err := usql.ShowDatabases(db) + if err != nil { + return err + } + for _, database := range databases { + fileInfo = append(fileInfo, []byte(" schemaName : "+database+" \n")...) + createSchemas, err := usql.ShowCreateSchema(context.TODO(), db, database) + if err != nil { + return err + } + fileInfo = append(fileInfo, []byte(createSchemas+" \n")...) + tables, err := usql.ShowTables(db, database, true) + if err != nil { + return err + } + for _, table := range tables { + fileInfo = append(fileInfo, []byte(" tableName : "+table.TableName+" \n")...) + createTable, err := usql.ShowCreateTable(context.TODO(), db, database, table.TableName) + if err != nil { + return err + } + fileInfo = append(fileInfo, []byte(createTable+" \n")...) + } + } + err = os.WriteFile(fileName, fileInfo, 0644) + if err != nil { + return fmt.Errorf("write info to file err: %v", err) + } + return nil +} diff --git a/driver/mysql/sql/sqlutils.go b/driver/mysql/sql/sqlutils.go index 3882e4846..ce97cd743 100644 --- a/driver/mysql/sql/sqlutils.go +++ b/driver/mysql/sql/sqlutils.go @@ -426,3 +426,16 @@ func GetSetWaitTimeout(db QueryAble) (originVal int, err error) { } return originVal, nil } + +func ShowCreateTable(ctx context.Context, db QueryAble, dbName, tbName string) (r string, err error) { + query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", mysqlconfig.EscapeName(dbName), mysqlconfig.EscapeName(tbName)) + g.Logger.Debug("ShowCreateTable", "query", query) + row := db.QueryRowContext(ctx, query) + var dummy interface{} + // | Table | Create Table | + err = row.Scan(&dummy, &r) + if err != nil { + return "", errors.Wrap(err, "ShowCreateTable") + } + return r, nil +}