diff --git a/sources/mysql/adapter/adapter.go b/sources/mysql/adapter/adapter.go index ccd129c1..55963b96 100644 --- a/sources/mysql/adapter/adapter.go +++ b/sources/mysql/adapter/adapter.go @@ -20,13 +20,14 @@ const defaultErrorRetries = 10 type mysqlAdapter struct { db *sql.DB + dbName string table mysql.Table columns []schema.Column fieldConverters []transformer.FieldConverter scannerCfg scan.ScannerConfig } -func NewMySQLAdapter(db *sql.DB, tableCfg config.MySQLTable) (mysqlAdapter, error) { +func NewMySQLAdapter(db *sql.DB, dbName string, tableCfg config.MySQLTable) (mysqlAdapter, error) { slog.Info("Loading metadata for table") table, err := mysql.LoadTable(db, tableCfg.Name) if err != nil { @@ -38,10 +39,10 @@ func NewMySQLAdapter(db *sql.DB, tableCfg config.MySQLTable) (mysqlAdapter, erro return mysqlAdapter{}, err } - return newMySQLAdapter(db, *table, columns, tableCfg.ToScannerConfig(defaultErrorRetries)) + return newMySQLAdapter(db, dbName, *table, columns, tableCfg.ToScannerConfig(defaultErrorRetries)) } -func newMySQLAdapter(db *sql.DB, table mysql.Table, columns []schema.Column, scannerCfg scan.ScannerConfig) (mysqlAdapter, error) { +func newMySQLAdapter(db *sql.DB, dbName string, table mysql.Table, columns []schema.Column, scannerCfg scan.ScannerConfig) (mysqlAdapter, error) { fieldConverters := make([]transformer.FieldConverter, len(columns)) for i, col := range columns { converter, err := valueConverterForType(col.Type, col.Opts) @@ -53,6 +54,7 @@ func newMySQLAdapter(db *sql.DB, table mysql.Table, columns []schema.Column, sca return mysqlAdapter{ db: db, + dbName: dbName, table: table, columns: columns, fieldConverters: fieldConverters, @@ -65,7 +67,7 @@ func (m mysqlAdapter) TableName() string { } func (m mysqlAdapter) TopicSuffix() string { - return strings.ReplaceAll(m.table.Name, `"`, ``) + return fmt.Sprintf("%s.%s", m.dbName, strings.ReplaceAll(m.table.Name, `"`, ``)) } func (m mysqlAdapter) FieldConverters() []transformer.FieldConverter { diff --git a/sources/mysql/adapter/adapter_test.go b/sources/mysql/adapter/adapter_test.go index 7249789a..5cefa00d 100644 --- a/sources/mysql/adapter/adapter_test.go +++ b/sources/mysql/adapter/adapter_test.go @@ -16,7 +16,7 @@ func TestMySQLAdapter_TableName(t *testing.T) { table := mysql.Table{ Name: "table1", } - adapter, err := newMySQLAdapter(nil, table, []schema.Column{}, scan.ScannerConfig{}) + adapter, err := newMySQLAdapter(nil, "foo", table, []schema.Column{}, scan.ScannerConfig{}) assert.NoError(t, err) assert.Equal(t, "table1", adapter.TableName()) } @@ -32,18 +32,18 @@ func TestMySQLAdapter_TopicSuffix(t *testing.T) { table: mysql.Table{ Name: "table1", }, - expected: "table1", + expected: "db.table1", }, { table: mysql.Table{ Name: `"PublicStatus"`, }, - expected: "PublicStatus", + expected: "db.PublicStatus", }, } for _, tc := range tcs { - adapter, err := newMySQLAdapter(nil, tc.table, []schema.Column{}, scan.ScannerConfig{}) + adapter, err := newMySQLAdapter(nil, "db", tc.table, []schema.Column{}, scan.ScannerConfig{}) assert.NoError(t, err) assert.Equal(t, tc.expected, adapter.TopicSuffix()) } diff --git a/sources/mysql/snapshot.go b/sources/mysql/snapshot.go index fd05d58c..98cb6ca0 100644 --- a/sources/mysql/snapshot.go +++ b/sources/mysql/snapshot.go @@ -18,8 +18,9 @@ import ( ) type Source struct { - cfg config.MySQL - db *sql.DB + cfg config.MySQL + db *sql.DB + dbName string } func Load(cfg config.MySQL) (*Source, error) { @@ -28,8 +29,9 @@ func Load(cfg config.MySQL) (*Source, error) { return nil, fmt.Errorf("failed to connect to MySQL: %w", err) } return &Source{ - cfg: cfg, - db: db, + cfg: cfg, + db: db, + dbName: cfg.Database, }, nil } @@ -50,7 +52,7 @@ func (s Source) snapshotTable(ctx context.Context, writer kafkalib.BatchWriter, logger := slog.With(slog.String("table", tableCfg.Name)) snapshotStartTime := time.Now() - adapter, err := adapter.NewMySQLAdapter(s.db, tableCfg) + adapter, err := adapter.NewMySQLAdapter(s.db, s.dbName, tableCfg) if err != nil { return fmt.Errorf("failed to create MySQL adapter: %w", err) }