Skip to content

Commit

Permalink
[MySQL] Retrieve all tables in a particular schema (#627)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Dec 20, 2024
1 parent 440e093 commit 8fb5664
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
24 changes: 24 additions & 0 deletions lib/mysql/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,30 @@ func QuoteIdentifier(s string) string {
return fmt.Sprintf("`%s`", strings.ReplaceAll(s, "`", "``"))
}

func ListTables(db *sql.DB, dbName string) ([]string, error) {
rows, err := db.Query(fmt.Sprintf("SHOW FULL TABLES IN %s WHERE table_type = 'BASE TABLE'", QuoteIdentifier(dbName)))
if err != nil {
return nil, fmt.Errorf("failed to list tables: %w", err)
}

defer rows.Close()
var tables []string
for rows.Next() {
var table string
var unused string
if err = rows.Scan(&table, &unused); err != nil {
return nil, fmt.Errorf("failed to scan: %w", err)
}
tables = append(tables, table)
}

if err = rows.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate over rows: %w", err)
}

return tables, nil
}

func GetCreateTableDDL(db *sql.DB, table string) (string, error) {
row := db.QueryRow("SHOW CREATE TABLE " + QuoteIdentifier(table))
var unused string
Expand Down
14 changes: 10 additions & 4 deletions sources/mysql/streaming/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ func buildSchemaAdapter(db *sql.DB, cfg config.MySQL, schemaHistoryList persiste
return ddl.SchemaAdapter{}, fmt.Errorf("latest schema timestamp %d is greater than the current position's timestamp %d", latestSchemaUnixTs, pos.UnixTs)
}

// Check if there are any additional tables that we should be tracking that don't exist in our schema adapter
for _, tbl := range cfg.Tables {
if _, ok := schemaAdapter.GetTableAdapter(tbl.Name); !ok {
// Find all the tables in the schema, check if they are already in the schema adapter
// If not, then call [GetCreateTableDDL] to get the DDL and apply it to the schema adapter
tables, err := schema.ListTables(db, cfg.Database)
if err != nil {
return ddl.SchemaAdapter{}, fmt.Errorf("failed to list tables: %w", err)
}

for _, tbl := range tables {
if _, ok := schemaAdapter.GetTableAdapter(tbl); !ok {
now := time.Now().Unix()
ddlQuery, err := schema.GetCreateTableDDL(db, tbl.Name)
ddlQuery, err := schema.GetCreateTableDDL(db, tbl)
if err != nil {
return ddl.SchemaAdapter{}, fmt.Errorf("failed to get columns: %w", err)
}
Expand Down

0 comments on commit 8fb5664

Please sign in to comment.