Skip to content

Commit

Permalink
[MySQL] GTID scaffold (#606)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Dec 17, 2024
1 parent 2b077be commit 02cee52
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 5 deletions.
5 changes: 4 additions & 1 deletion config/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
)

type MySQLStreamingSettings struct {
Enabled bool `yaml:"enabled,omitempty"`
Enabled bool `yaml:"enabled,omitempty"`
// TODO: Remove TODO once GTID is fully functional.
EnableGTID bool `yaml:"enableGTID,omitempty"`

OffsetFile string `yaml:"offsetFile,omitempty"`
SchemaHistoryFile string `yaml:"schemaHistoryFile,omitempty"`
// ServerID - Unique ID in the cluster.
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func buildSource(ctx context.Context, cfg *config.Settings) (sources.Source, boo
case config.SourceMongoDB:
return mongo.Load(ctx, *cfg.MongoDB)
case config.SourceMySQL:
return mysql.Load(*cfg.MySQL)
return mysql.Load(ctx, *cfg.MySQL)
case config.SourceMSSQL:
source, err = mssql.Load(*cfg.MSSQL)
case config.SourcePostgreSQL:
Expand Down
5 changes: 3 additions & 2 deletions sources/mysql/mysql.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mysql

import (
"context"
"database/sql"
"fmt"
"log/slog"
Expand All @@ -9,7 +10,7 @@ import (
"github.com/artie-labs/reader/sources"
)

func Load(cfg config.MySQL) (sources.Source, bool, error) {
func Load(ctx context.Context, cfg config.MySQL) (sources.Source, bool, error) {
db, err := sql.Open("mysql", cfg.ToDSN())
if err != nil {
return nil, false, fmt.Errorf("failed to connect to MySQL: %w", err)
Expand All @@ -25,7 +26,7 @@ func Load(cfg config.MySQL) (sources.Source, bool, error) {
slog.Any("sqlMode", settings.SQLMode),
)
if cfg.StreamingSettings.Enabled {
stream, err := buildStreamingConfig(db, cfg, settings.SQLMode)
stream, err := buildStreamingConfig(ctx, db, cfg, settings.SQLMode)
if err != nil {
return nil, false, fmt.Errorf("failed to build streaming config: %w", err)
}
Expand Down
7 changes: 6 additions & 1 deletion sources/mysql/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ type Streaming struct {
db *sql.DB
}

func buildStreamingConfig(db *sql.DB, cfg config.MySQL, sqlMode []string) (Streaming, error) {
func buildStreamingConfig(ctx context.Context, db *sql.DB, cfg config.MySQL, sqlMode []string) (Streaming, error) {
// Validate to ensure that we can use streaming.
if err := ValidateMySQL(ctx, db, true, cfg.StreamingSettings.EnableGTID); err != nil {
return Streaming{}, fmt.Errorf("failed validation: %w", err)
}

iter, err := streaming.BuildStreamingIterator(db, cfg, sqlMode)
if err != nil {
return Streaming{}, err
Expand Down
50 changes: 50 additions & 0 deletions sources/mysql/validate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package mysql

import (
"context"
"database/sql"
"fmt"
"strings"
)

func fetchVariable(ctx context.Context, db *sql.DB, name string) (string, error) {
row := db.QueryRowContext(ctx, "SHOW VARIABLES WHERE variable_name = ?", name)
if row.Err() != nil {
return "", fmt.Errorf("failed to query for %q variable: %w", name, row.Err())
}

var variableName string
var value string
if err := row.Scan(&variableName, &value); err != nil {
return "", fmt.Errorf("failed to scan row: %w", err)
} else if variableName != name {
return "", fmt.Errorf("the variable %q was returned instead of %q", variableName, name)
}

return value, nil
}

func ValidateMySQL(ctx context.Context, db *sql.DB, validateStreaming bool, validateGTID bool) error {
requiredVariableToValueMap := make(map[string]string)
if validateStreaming {
requiredVariableToValueMap["binlog_format"] = "ROW"
}

if validateGTID {
requiredVariableToValueMap["gtid_mode"] = "ON"
requiredVariableToValueMap["enforce_gtid_consistency"] = "ON"
}

for requiredVariable, requiredValue := range requiredVariableToValueMap {
value, err := fetchVariable(ctx, db, requiredVariable)
if err != nil {
return err
}

if strings.ToUpper(value) != requiredValue {
return fmt.Errorf("%s must be set to %q, current value is %q", requiredVariable, requiredValue, value)
}
}

return nil
}

0 comments on commit 02cee52

Please sign in to comment.