Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: close and open readers #34

Merged
merged 1 commit into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Types of changes
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [0.4.0]

- `Added` `Open` and `Close` methods to column/value reader inferfaces.

## [0.3.0]

- `Added` moved `min` and `max` to the main metric.
Expand Down
6 changes: 6 additions & 0 deletions internal/infra/filesReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func NewJSONLFolderReader(folderpath string) (*JSONLFolderReader, error) {
}, nil
}

func (r *JSONLFolderReader) Open() error { return nil }
func (r *JSONLFolderReader) Close() error { return nil }

func (r *JSONLFolderReader) BaseName() string {
return r.basename
}
Expand Down Expand Up @@ -155,6 +158,9 @@ func NewJSONLColReader(table, column string, decoder *json.Decoder) *JSONLColRea
}
}

func (cr *JSONLColReader) Open() error { return nil }
func (cr *JSONLColReader) Close() error { return nil }

func (cr *JSONLColReader) ColName() string {
return cr.column
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/rimo/driven.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@ package rimo

import "github.com/cgi-fr/rimo/pkg/model"

type Resource interface {
Open() error
Close() error
}

type ColReader interface {
Resource
ColName() string
TableName() string
Next() bool
Value() (any, error)
}

type Reader interface {
Resource
BaseName() string
Next() bool
Col() (ColReader, error)
Expand Down
168 changes: 98 additions & 70 deletions pkg/rimo/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,102 +32,130 @@ type Driver struct {
Distinct bool
}

//nolint:funlen,cyclop,gocognit
func (d Driver) AnalyseBase(reader Reader, writer Writer) error {
func (d Driver) AnalyseBase(reader Reader, writer Writer) (err error) {
if err := reader.Open(); err != nil {
return fmt.Errorf("failed to open column reader : %w", err)
}

defer func() {
if localerr := reader.Close(); err != nil {
err = localerr
}
}()

baseName := reader.BaseName()

base := model.NewBase(baseName)
tables := map[string]model.Table{}

for reader.Next() { // itère colonne par colonne
valreader, err := reader.Col()
if err != nil {
return fmt.Errorf("failed to get column reader : %w", err)
if err := d.analyse(reader, tables); err != nil {
return err
}
}

nilcount := 0
for _, table := range tables {
sort.SliceStable(table.Columns, func(i, j int) bool {
return table.Columns[i].Name < table.Columns[j].Name
})

for valreader.Next() {
val, err := valreader.Value()
if err != nil {
return fmt.Errorf("failed to read value : %w", err)
}
base.Tables = append(base.Tables, table)
}

log.Debug().Msgf("Processing [%s base][%s table][%s column]", baseName, valreader.TableName(), valreader.ColName())
sort.SliceStable(base.Tables, func(i, j int) bool {
return base.Tables[i].Name < base.Tables[j].Name
})

switch valtyped := val.(type) {
case string:
col, err := d.AnalyseString(nilcount, valtyped, valreader)
if err != nil {
return fmt.Errorf("failed to analyse column : %w", err)
}
err = writer.Export(base)
if err != nil {
return fmt.Errorf("failed to export base : %w", err)
}

table, exists := tables[valreader.TableName()]
if !exists {
table = model.Table{
Name: valreader.TableName(),
Columns: []model.Column{},
}
}
return nil
}

table.Columns = append(table.Columns, col)
//nolint:funlen,cyclop
func (d Driver) analyse(reader Reader, tables map[string]model.Table) (err error) {
valreader, err := reader.Col()
if err != nil {
return fmt.Errorf("failed to get column reader : %w", err)
}

tables[valreader.TableName()] = table
case float64, float32, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
col, err := d.AnalyseNumeric(nilcount, valtyped, valreader)
if err != nil {
return fmt.Errorf("failed to analyse column : %w", err)
}
if err := valreader.Open(); err != nil {
return fmt.Errorf("failed to open value reader : %w", err)
}

table, exists := tables[valreader.TableName()]
if !exists {
table = model.Table{
Name: valreader.TableName(),
Columns: []model.Column{},
}
}
defer func() {
if localerr := reader.Close(); err != nil {
err = localerr
}
}()

table.Columns = append(table.Columns, col)
nilcount := 0

tables[valreader.TableName()] = table
case bool:
col, err := d.AnalyseBool(nilcount, valtyped, valreader)
if err != nil {
return fmt.Errorf("failed to analyse column : %w", err)
}
for valreader.Next() {
val, err := valreader.Value()
if err != nil {
return fmt.Errorf("failed to read value : %w", err)
}

table, exists := tables[valreader.TableName()]
if !exists {
table = model.Table{
Name: valreader.TableName(),
Columns: []model.Column{},
}
log.Debug().Msgf("Processing [%s][%s][%s]", reader.BaseName(), valreader.TableName(), valreader.ColName())

switch valtyped := val.(type) {
case string:
col, err := d.AnalyseString(nilcount, valtyped, valreader)
if err != nil {
return fmt.Errorf("failed to analyse column : %w", err)
}

table, exists := tables[valreader.TableName()]
if !exists {
table = model.Table{
Name: valreader.TableName(),
Columns: []model.Column{},
}
}

table.Columns = append(table.Columns, col)
table.Columns = append(table.Columns, col)

tables[valreader.TableName()] = table
case nil:
nilcount++
tables[valreader.TableName()] = table
case float64, float32, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
col, err := d.AnalyseNumeric(nilcount, valtyped, valreader)
if err != nil {
return fmt.Errorf("failed to analyse column : %w", err)
}
}
}

for _, table := range tables {
sort.SliceStable(table.Columns, func(i, j int) bool {
return table.Columns[i].Name < table.Columns[j].Name
})
table, exists := tables[valreader.TableName()]
if !exists {
table = model.Table{
Name: valreader.TableName(),
Columns: []model.Column{},
}
}

base.Tables = append(base.Tables, table)
}
table.Columns = append(table.Columns, col)

sort.SliceStable(base.Tables, func(i, j int) bool {
return base.Tables[i].Name < base.Tables[j].Name
})
tables[valreader.TableName()] = table
case bool:
col, err := d.AnalyseBool(nilcount, valtyped, valreader)
if err != nil {
return fmt.Errorf("failed to analyse column : %w", err)
}

err := writer.Export(base)
if err != nil {
return fmt.Errorf("failed to export base : %w", err)
table, exists := tables[valreader.TableName()]
if !exists {
table = model.Table{
Name: valreader.TableName(),
Columns: []model.Column{},
}
}

table.Columns = append(table.Columns, col)

tables[valreader.TableName()] = table
case nil:
nilcount++
}
}

return nil
Expand Down
Loading
Loading