From 8f862a7369930dea58da947d7bf9fc3a420785f1 Mon Sep 17 00:00:00 2001 From: Bruno Calza Date: Thu, 9 May 2024 17:19:43 -0300 Subject: [PATCH] simplify the export by using reflection and dynamic parquet schema generation Signed-off-by: Bruno Calza --- .gitignore | 2 - .golangci.yml | 63 ++++++++++++++ Makefile | 14 +++ README.md | 19 +---- cmd.go | 121 +++++--------------------- cmd/main.go | 38 --------- database.go | 138 ++++++++++-------------------- exporter.go | 217 ++++++++++++++++++++++++++++++++++++----------- exporter_test.go | 157 ++++++++++++++++++++++++++++++++++ go.mod | 6 +- go.sum | 9 +- helpers.go | 21 ----- main.go | 3 +- pool.go | 50 ----------- schemas.tmpl | 26 ------ template.go | 11 --- 16 files changed, 482 insertions(+), 413 deletions(-) create mode 100644 .golangci.yml create mode 100644 Makefile delete mode 100644 cmd/main.go create mode 100644 exporter_test.go delete mode 100644 helpers.go delete mode 100644 pool.go delete mode 100644 schemas.tmpl delete mode 100644 template.go diff --git a/.gitignore b/.gitignore index dae8a5e..5ea1e8e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,3 @@ *.db* -schemas.go -schemas.so sqlite-exporter output \ No newline at end of file diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..4656768 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,63 @@ +linters-settings: + revive: + ignore-generated-header: false + severity: warning + confidence: 0.8 + errorCode: 0 + warningCode: 0 + rules: + - name: blank-imports + - name: context-as-argument + - name: context-keys-type + - name: dot-imports + - name: error-return + - name: error-strings + - name: error-naming + - name: exported + - name: if-return + - name: increment-decrement + - name: var-naming + - name: var-declaration + - name: package-comments + disabled: true + - name: range + - name: receiver-naming + - name: time-naming + - name: unexported-return + - name: indent-error-flow + - name: errorf + - name: empty-block + - name: superfluous-else + - name: unused-parameter + - name: unreachable-code + - name: redefines-builtin-id + misspell: + locale: US + +linters: + enable: + - revive + - misspell + - bodyclose + - unconvert + - goconst + - goimports + - unparam + - whitespace + - godot + - lll + - sqlclosecheck + - gofumpt + +issues: + exclude-use-default: false + + exclude: + - stutters + +run: + timeout: 30m + + skip-dirs: + - "pkg/database/db" + - "internal/router/controllers/apiv1" diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..251aa18 --- /dev/null +++ b/Makefile @@ -0,0 +1,14 @@ +# Lint +lint: + go run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.54.2 run +.PHONY: lint + +# Test +test: + go test ./... -short -race -timeout 1m +.PHONY: test + +# Build +build: + go build -o sqlite-exporter . +.PHONY: build \ No newline at end of file diff --git a/README.md b/README.md index fdc97be..755a908 100644 --- a/README.md +++ b/README.md @@ -5,23 +5,17 @@ Exports SQLite tables to Basin. This tool is used to export Tableland tables to ## How it works? Basin Exporter scans a SQLite database, export the tables to Parquet files, and upload them to Basin. -In order to export the tables to Parquet files, you must first generate the Go structs for each one of the tables. You do that by running -```bash -go run . generate [DB_PATH] -``` - -This will genenerate the structs in `schemas.go`. -Then you build a shared object out of that file by running +Build it by running ```bash -go build -buildmode=plugin -o schemas.so schemas.go +make build ``` -Now, you can export the all tables to Parquet by running +Now, export the all tables to Parquet by running ```bash -go run . export [DB_PATH] +./sqlite-export export -o ./output [DB_PATH] ``` This command does not upload the files to Basin. It exports Parquet files inside the `output` directory. You can also, choose specific tables to export by using the `tables` flag. @@ -31,8 +25,3 @@ To export and push the files to Basin you must the machine identifier, together ```bash go run . export --upload --machine=[HASH] [DB_PATH] ``` - -## Issues - -- You have to generate the struct schemas. Gotta figure out a way of exporting without doing that. -- How to deal with tables that has GENERATED COLUMNS? \ No newline at end of file diff --git a/cmd.go b/cmd.go index 68967ce..ef05a1d 100644 --- a/cmd.go +++ b/cmd.go @@ -1,94 +1,15 @@ package main import ( - "context" "log" - "os" - "text/template" - "unicode" "github.com/urfave/cli/v2" ) -func newGenerateCommand() *cli.Command { - return &cli.Command{ - Name: "generate", - Usage: "Generate structs", - ArgsUsage: "", - Description: "Generate structs for each one of the SQLite table", - Flags: []cli.Flag{ - &cli.StringSliceFlag{ - Name: "tables", - Category: "OPTIONAL:", - Usage: "The tables you want to export", - DefaultText: "empty", - }, - }, - Action: func(cCtx *cli.Context) error { - dbPath := cCtx.Args().First() - - db, err := NewSQLite(dbPath, cCtx.StringSlice("tables")) - if err != nil { - log.Fatal(err) - } - - templateData := TemplateData{ - Structs: make([]Struct, 0), - Tables: make([]string, 0), - } - - iter, err := db.GetTables(context.Background()) - if err != nil { - log.Fatal(err) - } - defer iter.Close() - for { - table, hasNext := iter.Next() - if !hasNext { - break - } - - columns, err := db.GetColumnsByTable(context.Background(), table) - if err != nil { - log.Print(err) - } - - if !unicode.IsLetter(rune(table[0])) { - table = "X" + table - } - - templateData.Tables = append(templateData.Tables, table) - templateData.Structs = append(templateData.Structs, Struct{ - Table: table, - Schema: columns, - }) - } - - funcMap := template.FuncMap{ - "CamelCase": camelCaseString, - } - - tmpl, _ := template.New("schemas.tmpl").Funcs(funcMap).ParseFiles("schemas.tmpl") - - f, err := os.OpenFile("schemas.go", os.O_CREATE|os.O_RDWR, 0o777) - if err != nil { - log.Fatal(err) - } - defer f.Close() - - if err := tmpl.Execute(f, templateData); err != nil { - log.Fatal(err) - } - - return nil - }, - } -} - func newExportCommand() *cli.Command { var upload bool var tables *cli.StringSlice - var machine string + var machine, output string return &cli.Command{ Name: "export", Usage: "Export tables", @@ -111,6 +32,15 @@ func newExportCommand() *cli.Command { Destination: tables, Value: nil, }, + &cli.StringFlag{ + Name: "output", + Aliases: []string{"o"}, + Category: "REQUIRED:", + Usage: "The path of the exported Parquet file on disk", + DefaultText: ".", + Destination: &output, + Value: ".", + }, &cli.StringFlag{ Name: "machine", Category: "REQUIRED:", @@ -123,7 +53,7 @@ func newExportCommand() *cli.Command { Action: func(cCtx *cli.Context) error { dbPath := cCtx.Args().First() - db, err := NewSQLite(dbPath, cCtx.StringSlice("tables")) + db, err := NewSQLite(dbPath) if err != nil { log.Fatal(err) } @@ -136,30 +66,19 @@ func newExportCommand() *cli.Command { } } - // TODO: make them configurable - pool := NewPool(10, 1000) - pool.Start(context.Background()) - - iter, err := db.GetTables(context.Background()) - if err != nil { - log.Fatal(err) - } - defer iter.Close() - - for { - table, hasNext := iter.Next() - if !hasNext { - break - } + tables := cCtx.StringSlice("tables") - task, err := NewTableExporter(db, table, sink) - if err != nil { - log.Print(err) + exporter := NewDatabaseExporter(db, sink, output) + if len(tables) > 0 { + if err := exporter.ExportTables(cCtx.Context, tables); err != nil { + log.Fatal(err) } - pool.AddTask(task) + return nil } - pool.Close() + if err := exporter.ExportAll(cCtx.Context); err != nil { + log.Fatal(err) + } return nil }, diff --git a/cmd/main.go b/cmd/main.go deleted file mode 100644 index 4d165e3..0000000 --- a/cmd/main.go +++ /dev/null @@ -1,38 +0,0 @@ -package main - -import ( - "context" - "database/sql" - "fmt" - "log" - - _ "github.com/marcboeker/go-duckdb" -) - -func main() { - db, err := sql.Open("duckdb", "") - if err != nil { - log.Fatal(err) - } - defer db.Close() - - _, err = db.Exec("INSTALL https; LOAD https;") - if err != nil { - log.Fatal(err) - } - - rows, err := db.QueryContext(context.Background(), "select *from read_parquet(['http://34.106.97.87:8002/v1/os/t2gh7m2iaqvwv2oexsaetncmdm6hhac7k6ne3teda/pilot_sessions_80001_7137'])") - if err != nil { - log.Fatal(err) - } - defer rows.Close() - - for rows.Next() { - var link, name string - if err := rows.Scan(&link, &name); err != nil { - log.Fatal(err) - } - - fmt.Println(link, name) - } -} diff --git a/database.go b/database.go index da0c297..9d82a73 100644 --- a/database.go +++ b/database.go @@ -6,41 +6,41 @@ import ( "fmt" "log" "strings" - "unicode" "github.com/jmoiron/sqlx" - "golang.org/x/exp/slices" ) +// SQLite represents a SQLite Database. type SQLite struct { - db *sqlx.DB - tables []string + db *sqlx.DB } -func NewSQLite(path string, tables []string) (*SQLite, error) { +// NewSQLite creates a new SQLite object. +func NewSQLite(path string) (*SQLite, error) { db, err := sqlx.Open("sqlite3", path) if err != nil { return nil, err } - return &SQLite{db, tables}, nil + return &SQLite{db}, nil } -func (s *SQLite) GetTables(ctx context.Context) (TablesIter, error) { +// GetTablesIterator returns an iterator of tables. +func (s *SQLite) GetTablesIterator(ctx context.Context, tables []string) (TablesIter, error) { sql := `SELECT name AS table_name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite?_%' escape '?' AND name NOT LIKE 'system?_%' escape '?'` - if len(s.tables) > 0 { - tables := make([]string, len(s.tables)) - for i, t := range s.tables { - tables[i] = fmt.Sprintf("'%s'", t) + if len(tables) > 0 { + t := make([]string, len(tables)) + for i, table := range tables { + t[i] = fmt.Sprintf("'%s'", table) } - sql = sql + fmt.Sprintf(" AND name IN (%s)", strings.Join(tables, ",")) + sql = sql + fmt.Sprintf(" AND name IN (%s)", strings.Join(t, ",")) } - rows, err := s.db.QueryxContext(ctx, sql) + rows, err := s.db.QueryContext(ctx, sql) if err != nil { return TablesIter{}, fmt.Errorf("query tables: %s", err) } @@ -48,8 +48,9 @@ func (s *SQLite) GetTables(ctx context.Context) (TablesIter, error) { return TablesIter{rows}, nil } -func (s *SQLite) GetRowsByTable(ctx context.Context, table string) (*sqlx.Rows, error) { - rows, err := s.db.QueryxContext(ctx, fmt.Sprintf("SELECT * FROM %s", table)) +// GetRowsByTable returns an iterators of all rows of a specific table. +func (s *SQLite) GetRowsByTable(ctx context.Context, table string) (*sql.Rows, error) { + rows, err := s.db.QueryContext(ctx, fmt.Sprintf("SELECT * FROM %s", table)) if err != nil { return nil, fmt.Errorf("query tables: %s", err) } @@ -57,6 +58,7 @@ func (s *SQLite) GetRowsByTable(ctx context.Context, table string) (*sqlx.Rows, return rows, nil } +// GetColumnsByTable returns the columns of a table. func (s *SQLite) GetColumnsByTable(ctx context.Context, table string) ([]Column, error) { type column struct { CID int `db:"cid"` @@ -65,13 +67,18 @@ func (s *SQLite) GetColumnsByTable(ctx context.Context, table string) ([]Column, NotNull int `db:"notnull"` DefaultValue sql.NullString `db:"dflt_value"` PrimaryKey int `db:"pk"` + Hidden int `db:"hidden"` } - rows, err := s.db.QueryxContext(ctx, fmt.Sprintf("SELECT * FROM PRAGMA_TABLE_INFO('%s')", table)) + rows, err := s.db.QueryxContext(ctx, fmt.Sprintf("SELECT * FROM PRAGMA_TABLE_XINFO('%s')", table)) if err != nil { return nil, fmt.Errorf("query tables: %s", err) } - defer rows.Close() + defer func() { + if err := rows.Close(); err != nil { + log.Println(err) + } + }() columns := []Column{} for rows.Next() { @@ -103,10 +110,12 @@ func (s *SQLite) GetColumnsByTable(ctx context.Context, table string) ([]Column, return columns, nil } +// TablesIter represents an iterator of tables. type TablesIter struct { - rows *sqlx.Rows + rows *sql.Rows } +// Next returns the next table of the iterator. func (i *TablesIter) Next() (string, bool) { hasNext := i.rows.Next() if hasNext { @@ -120,10 +129,12 @@ func (i *TablesIter) Next() (string, bool) { return "", false } +// Close closes the table's iterator. func (i *TablesIter) Close() error { return i.rows.Close() } +// Column represents information of a column of a database's table. type Column struct { OrdinalPosition int `db:"ordinal_position"` Name string `db:"column_name"` @@ -133,88 +144,25 @@ type Column struct { ColumnKey string `db:"column_key"` // mysql specific } -func (c *Column) IsInteger() bool { - return slices.Contains([]string{ - "integer", - "int", - }, strings.ToLower(c.DataType)) -} - -func (c *Column) IsFloat() bool { - return slices.Contains([]string{ - "real", - "numeric", - }, strings.ToLower(c.DataType)) -} - -func (s *Column) IsTemporal() bool { - return false -} - -type Formatted struct { - text string - isNullable bool - isTemporal bool - tags string -} - -func (f Formatted) Text() string { - return f.text -} - -func (f Formatted) Tags() string { - return f.tags +// GoName returns the name of the column in Golang's format. +func (c *Column) GoName() string { + return camelCaseString(c.Name) } -func (c *Column) Format() Formatted { - var isNullable, isTemporal bool - var goType string - columnName := camelCaseString(c.Name) - if !unicode.IsLetter(rune(columnName[0])) { - columnName = "X" + columnName +func camelCaseString(s string) string { + if s == "" { + return s } - if c.IsInteger() { - goType = "int" - if c.IsNullable { - goType = "*int" - isNullable = true - } - } else if c.IsFloat() { - goType = "float64" - if c.IsNullable { - goType = "*float64" - isNullable = true - } - } else if c.IsTemporal() { - isTemporal = true - if !c.IsNullable { - goType = "time.Time" - } else { - goType = "*time.Time" - isNullable = true - } - } else { - switch c.DataType { - case "boolean": - goType = "bool" - if c.IsNullable { - goType = "*bool" - isNullable = true - } - default: - goType = "string" - if c.IsNullable { - goType = "*string" - isNullable = true - } - } + splitted := strings.Split(s, "_") + + if len(splitted) == 1 { + return caser.String(s) } - return Formatted{ - text: fmt.Sprintf("%s %s", columnName, goType), - isNullable: isNullable, - isTemporal: isTemporal, - tags: "`parquet:\"" + c.Name + "\" db:\"" + c.Name + "\"`", + var cc string + for _, part := range splitted { + cc += caser.String(strings.ToLower(part)) } + return cc } diff --git a/exporter.go b/exporter.go index baf3e1f..83089f2 100644 --- a/exporter.go +++ b/exporter.go @@ -2,52 +2,43 @@ package main import ( "context" + "database/sql" "fmt" "log" "net/http" "os" "path" - "plugin" + "reflect" + "strconv" "strings" - "github.com/jmoiron/sqlx" "github.com/parquet-go/parquet-go" ) +// Sink represents a destination of the exported file. type Sink interface { Send(context.Context, string) error } +// Exporter represents a kind of exporter (e.g. parquet, ...). type Exporter interface { - Export(context.Context, string, *sqlx.Rows, Sink) error + Export(context.Context, string, []Column, *sql.Rows, Sink) error } +// ParquetExporter is an exporter that exports Parquet files. type ParquetExporter struct { - GetRowInstance func(table string) (interface{}, error) + outputDir string } -func NewParquetExporter() (*ParquetExporter, error) { - plugin, err := plugin.Open("schemas.so") - if err != nil { - return nil, err - } - - symbol, err := plugin.Lookup("GetRowInstance") - if err != nil { - return nil, err - } - - getRowInstance, ok := symbol.(func(table string) (interface{}, error)) - if !ok { - return nil, err - } - +// NewParquetExporter creates new ParquetExporter. +func NewParquetExporter(outputDir string) *ParquetExporter { return &ParquetExporter{ - GetRowInstance: getRowInstance, - }, nil + outputDir: outputDir, + } } -func (e *ParquetExporter) Export(ctx context.Context, table string, rows *sqlx.Rows, sink Sink) error { +// Export exports a table to Parquet. +func (e *ParquetExporter) Export(ctx context.Context, table string, columns []Column, rows *sql.Rows, sink Sink) error { hasNext := false if rows.Next() { hasNext = true @@ -57,31 +48,91 @@ func (e *ParquetExporter) Export(ctx context.Context, table string, rows *sqlx.R return nil } - filename := fmt.Sprintf("./output/%s.parquet", table) + filename := fmt.Sprintf("%s/%s.parquet", e.outputDir, table) f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0o755) if err != nil { return fmt.Errorf("open file: %s", err) } - defer f.Close() + defer func() { + if err := f.Close(); err != nil { + log.Println(err) + } + }() - row, err := e.GetRowInstance(table) - if err != nil { - return fmt.Errorf("get row instance: %s", err) + rawBuffer := make([]sql.RawBytes, len(columns)) + scanCallArgs := make([]interface{}, len(rawBuffer)) + for i := range rawBuffer { + scanCallArgs[i] = &rawBuffer[i] } - writer := parquet.NewWriter(f) - for { - fmt.Println(table) + fields := make([]reflect.StructField, len(columns)) + for i, column := range columns { + var typ reflect.Type + switch column.DataType { + case "INT", "INTEGER": + typ = reflect.TypeOf(int64(0)) + case "TEXT": + typ = reflect.TypeOf("") + case "BLOB": + typ = reflect.TypeOf([]byte{}) + default: + return fmt.Errorf("unknown column type %s from table %s of column %s", column.DataType, table, column.Name) + } + + tag := fmt.Sprintf(`parquet:"%v"`, column.Name) + if column.IsNullable { + tag = fmt.Sprintf(`parquet:"%v,optional"`, column.Name) + } - if err := rows.StructScan(row); err != nil { - log.Print(err) + fields[i] = reflect.StructField{ + Name: column.GoName(), + Type: typ, + Tag: reflect.StructTag(tag), } + } + + v := reflect.New(reflect.StructOf(fields)) + writer := parquet.NewGenericWriter[any](f, &parquet.WriterConfig{ + Schema: parquet.SchemaOf(v.Interface()), + }) + + for { + if err := rows.Scan(scanCallArgs...); err != nil { + return fmt.Errorf("failed to scan row: %s", err) + } + + structValue := v.Elem() + for i, arg := range scanCallArgs { + field := structValue.FieldByName(columns[i].GoName()) + + bytes, ok := arg.(*sql.RawBytes) + if !ok { + return fmt.Errorf("converting arg %s type to *sql.RawBytes", field.Kind()) + } + // handle NULL + if len(*bytes) == 0 { + structValue.FieldByName(columns[i].GoName()).SetZero() + continue + } - if err == nil { - if err := writer.Write(row); err != nil { - log.Print(err) + switch field.Kind() { + case reflect.Int, reflect.Int64: + i64, err := strconv.ParseInt(string(*bytes), 10, field.Type().Bits()) + if err != nil { + return fmt.Errorf("converting arg type %T to a %s: %v", arg, field.Kind(), err) + } + structValue.FieldByName(columns[i].GoName()).SetInt(i64) + case reflect.String: + structValue.FieldByName(columns[i].GoName()).SetString(string(*bytes)) + case reflect.Slice: + structValue.FieldByName(columns[i].GoName()).SetBytes(*bytes) + default: + return fmt.Errorf("unknown destination type %s", field.Kind()) } } + if _, err := writer.Write([]any{structValue.Interface()}); err != nil { + return fmt.Errorf("failed to write row: %s", err) + } hasNext = rows.Next() if !hasNext { @@ -91,6 +142,7 @@ func (e *ParquetExporter) Export(ctx context.Context, table string, rows *sqlx.R if err := writer.Close(); err != nil { return fmt.Errorf("close writer: %s", err) } + if err := sink.Send(ctx, filename); err != nil { return fmt.Errorf("send to sink: %s", err) } @@ -98,6 +150,59 @@ func (e *ParquetExporter) Export(ctx context.Context, table string, rows *sqlx.R return nil } +// DatabaseExporter represents an exports of the entire database. +type DatabaseExporter struct { + db *SQLite + sink Sink + output string +} + +// NewDatabaseExporter creates a new DatabaseExporter. +func NewDatabaseExporter(db *SQLite, sink Sink, output string) *DatabaseExporter { + return &DatabaseExporter{ + db: db, + sink: sink, + output: output, + } +} + +// ExportTables exports specific tables of a database. +func (de *DatabaseExporter) ExportTables(ctx context.Context, tables []string) error { + return de.export(ctx, tables) +} + +// ExportAll exports all tables of a database. +func (de *DatabaseExporter) ExportAll(ctx context.Context) error { + return de.export(ctx, []string{}) +} + +func (de *DatabaseExporter) export(ctx context.Context, tables []string) error { + iter, err := de.db.GetTablesIterator(ctx, tables) + if err != nil { + return fmt.Errorf("get tables iterato: %s", err) + } + defer func() { + if err := iter.Close(); err != nil { + log.Println(err) + } + }() + + for { + table, hasNext := iter.Next() + if !hasNext { + break + } + + tableExporter := NewTableExporter(de.db, table, de.output, de.sink) + if err := tableExporter.Execute(ctx); err != nil { + log.Println(err) + } + } + + return nil +} + +// TableExporter represents an exporter of a single table. type TableExporter struct { db *SQLite table string @@ -105,51 +210,67 @@ type TableExporter struct { sink Sink } -func NewTableExporter(db *SQLite, table string, sink Sink) (*TableExporter, error) { - parquet, err := NewParquetExporter() - if err != nil { - return nil, err - } +// NewTableExporter creates new TableExporter. +func NewTableExporter(db *SQLite, table string, output string, sink Sink) *TableExporter { + parquet := NewParquetExporter(output) return &TableExporter{ db: db, table: table, exporter: parquet, sink: sink, - }, nil + } } -func (te *TableExporter) Execute(ctx context.Context, worker int) error { +// Execute executes the exportation process. +func (te *TableExporter) Execute(ctx context.Context) error { + columns, err := te.db.GetColumnsByTable(ctx, te.table) + if err != nil { + return fmt.Errorf("get columns by table: %s", err) + } + rows, err := te.db.GetRowsByTable(ctx, te.table) if err != nil { - return fmt.Errorf("query: %s", err) + return fmt.Errorf("get rows by table: %s", err) } - defer rows.Close() + defer func() { + if err := rows.Close(); err != nil { + log.Println(err) + } + }() - if err := te.exporter.Export(ctx, te.table, rows, te.sink); err != nil { + if err := te.exporter.Export(ctx, te.table, columns, rows, te.sink); err != nil { return fmt.Errorf("export: %s", err) } return nil } +// MockSink represents a mocked sink. type MockSink struct{} +// Send does nothing. func (mock *MockSink) Send(_ context.Context, _ string) error { return nil } +// BasinSink represents a sink that sends file to Basin. type BasinSink struct { provider string machine string } +// Send sends the exported file to Basin. func (s *BasinSink) Send(ctx context.Context, filepath string) error { f, err := os.Open(filepath) if err != nil { return err } - defer f.Close() + defer func() { + if err := f.Close(); err != nil { + log.Println(err) + } + }() fi, err := f.Stat() if err != nil { @@ -183,7 +304,5 @@ func (s *BasinSink) Send(ctx context.Context, filepath string) error { _ = resp.Body.Close() }() - fmt.Println(resp.StatusCode) - return nil } diff --git a/exporter_test.go b/exporter_test.go new file mode 100644 index 0000000..dcc6441 --- /dev/null +++ b/exporter_test.go @@ -0,0 +1,157 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "os" + "testing" + + "github.com/jmoiron/sqlx" + _ "github.com/marcboeker/go-duckdb" + "github.com/stretchr/testify/require" +) + +func TestTableExporter(t *testing.T) { + // setup database initial state + setup := &DatabaseSetup{ + t: t, + } + setup.Open() + setup.Exec("CREATE TABLE test (a_id INT, b TEXT, c BLOB)") + setup.Exec("INSERT INTO test (a_id, b, c) VALUES (1, 'Hello 1', X'010203'), (2, 'Hello 2', null)") + defer setup.Close() + + assertion := &sinkAssertion{ + t, + []struct { + aid int64 + b string + c []byte + }{ + { + 1, "Hello 1", []byte{0x01, 0x02, 0x03}, + }, + { + 2, "Hello 2", nil, + }, + }, + false, + } + // create table exporter + exporter := NewTableExporter(setup.sqlite, "test", t.TempDir(), assertion) + + err := exporter.Execute(context.Background()) + require.NoError(t, err) + require.True(t, assertion.isCalled) +} + +func TestDatabaseExporter(t *testing.T) { + // setup database initial state + setup := &DatabaseSetup{ + t: t, + } + setup.Open() + setup.Exec("CREATE TABLE test (a_id INT, b TEXT, c BLOB)") + setup.Exec("INSERT INTO test (a_id, b, c) VALUES (1, 'Hello 1', X'010203'), (2, 'Hello 2', null)") + defer setup.Close() + + assertion := &sinkAssertion{ + t, + []struct { + aid int64 + b string + c []byte + }{ + { + 1, "Hello 1", []byte{0x01, 0x02, 0x03}, + }, + { + 2, "Hello 2", nil, + }, + }, + false, + } + // create database exporter + exporter := NewDatabaseExporter(setup.sqlite, assertion, t.TempDir()) + + err := exporter.ExportAll(context.Background()) + require.NoError(t, err) + require.True(t, assertion.isCalled) +} + +// This sink implementation opens the exported file from disk using DuckDB +// and read it using read_parquet and assert the data inside the file. +type sinkAssertion struct { + t *testing.T + data []struct { + aid int64 + b string + c []byte + } + isCalled bool +} + +func (s *sinkAssertion) Send(ctx context.Context, filepath string) error { + s.isCalled = true + + db, err := sql.Open("duckdb", "") + require.NoError(s.t, err) + defer func() { + require.NoError(s.t, db.Close()) + }() + + rows, err := db.QueryContext(ctx, fmt.Sprintf("select * from read_parquet(['%s'])", filepath)) + require.NoError(s.t, err) + defer func() { + require.NoError(s.t, rows.Close()) + }() + + i := 0 + for rows.Next() { + var aid int64 + var b string + var c []byte + err := rows.Scan(&aid, &b, &c) + + require.NoError(s.t, err) + require.Equal(s.t, aid, s.data[i].aid) + require.Equal(s.t, b, s.data[i].b) + require.Equal(s.t, c, s.data[i].c) + i++ + } + + return nil +} + +type DatabaseSetup struct { + t *testing.T + sqlite *SQLite +} + +func (db *DatabaseSetup) Open() { + f, err := os.CreateTemp("", "") + if err != nil { + db.t.Fatal(err) + } + database, err := sqlx.Open("sqlite3", f.Name()) + if err != nil { + db.t.Fatal(err) + } + + db.sqlite = &SQLite{ + db: database, + } +} + +func (db *DatabaseSetup) Exec(sql string) { + if _, err := db.sqlite.db.Exec(sql); err != nil { + db.t.Fatal(err) + } +} + +func (db *DatabaseSetup) Close() { + if err := db.sqlite.db.Close(); err != nil { + db.t.Fatal(err) + } +} diff --git a/go.mod b/go.mod index 5110ae9..58dd901 100644 --- a/go.mod +++ b/go.mod @@ -7,8 +7,8 @@ require ( github.com/marcboeker/go-duckdb v1.6.1 github.com/mattn/go-sqlite3 v1.14.22 github.com/parquet-go/parquet-go v0.20.1 + github.com/stretchr/testify v1.8.4 github.com/urfave/cli/v2 v2.27.1 - golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f golang.org/x/text v0.14.0 ) @@ -16,6 +16,7 @@ require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/apache/arrow/go/v14 v14.0.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/google/flatbuffers v23.5.26+incompatible // indirect github.com/google/uuid v1.3.1 // indirect @@ -25,14 +26,17 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/segmentio/encoding v0.3.6 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect + golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.19.0 // indirect golang.org/x/tools v0.20.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index a27aac1..f66f2fc 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,10 @@ github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGC github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/marcboeker/go-duckdb v1.6.1 h1:PIlVNHAU+wu0xRnshEdA9p6RTOz5dWiJk57ntMuV1bM= @@ -59,8 +63,6 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= -golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 h1:ESSUROHIBHg7USnszlcdmjBEwdMj9VUvU+OPk4yl2mc= -golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY= golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= @@ -81,5 +83,8 @@ gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o= gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/helpers.go b/helpers.go deleted file mode 100644 index 0854e6f..0000000 --- a/helpers.go +++ /dev/null @@ -1,21 +0,0 @@ -package main - -import "strings" - -func camelCaseString(s string) string { - if s == "" { - return s - } - - splitted := strings.Split(s, "_") - - if len(splitted) == 1 { - return caser.String(s) - } - - var cc string - for _, part := range splitted { - cc += caser.String(strings.ToLower(part)) - } - return cc -} diff --git a/main.go b/main.go index 63985f7..670da79 100644 --- a/main.go +++ b/main.go @@ -16,11 +16,10 @@ var version = "dev" func main() { cliApp := &cli.App{ - Name: "parquet-exporter", + Name: "sqlite-exporter", Usage: "Export SQLite tables to Parquet", Version: version, Commands: []*cli.Command{ - newGenerateCommand(), newExportCommand(), }, } diff --git a/pool.go b/pool.go deleted file mode 100644 index c3578d4..0000000 --- a/pool.go +++ /dev/null @@ -1,50 +0,0 @@ -package main - -import ( - "context" - "log" - "sync" -) - -type Task interface { - Execute(context.Context, int) error -} - -type Pool struct { - size int - wg sync.WaitGroup - tasks chan Task -} - -func NewPool(sz int, maxTasks int) *Pool { - return &Pool{ - size: sz, - tasks: make(chan Task, maxTasks), - } -} - -func (p *Pool) AddTask(task Task) { - p.tasks <- task -} - -func (p *Pool) Start(ctx context.Context) { - for w := 1; w <= p.size; w++ { - p.wg.Add(1) - go p.run(ctx, w) - } -} - -func (p *Pool) Close() { - close(p.tasks) - p.wg.Wait() -} - -func (p *Pool) run(ctx context.Context, worker int) { - defer p.wg.Done() - - for task := range p.tasks { - if err := task.Execute(ctx, worker); err != nil { - log.Print(err) - } - } -} diff --git a/schemas.tmpl b/schemas.tmpl deleted file mode 100644 index e5ea577..0000000 --- a/schemas.tmpl +++ /dev/null @@ -1,26 +0,0 @@ -//go:build exclude - -package main - -import ( - "errors" -) - -{{ range .Structs }} -type {{ .Table | CamelCase}} struct { - {{- range .Schema }} - {{ .Format.Text }} {{ .Format.Tags }} - {{- end }} -} -{{ end }} - -func GetRowInstance(table string) (interface{}, error) { - switch table { - {{- range .Tables }} - case "{{ . }}": - return new({{ . | CamelCase }}), nil - {{- end }} - default: - return nil, errors.New("unknown table") - } -} diff --git a/template.go b/template.go deleted file mode 100644 index 39d5e67..0000000 --- a/template.go +++ /dev/null @@ -1,11 +0,0 @@ -package main - -type TemplateData struct { - Structs []Struct - Tables []string -} - -type Struct struct { - Table string - Schema []Column -}