Skip to content

Commit

Permalink
fix: close and open readers
Browse files Browse the repository at this point in the history
  • Loading branch information
adrienaury committed Dec 11, 2023
1 parent 58c3e27 commit 3f4ff63
Show file tree
Hide file tree
Showing 6 changed files with 466 additions and 70 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Types of changes
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [0.4.0]

- `Added` `Open` and `Close` methods to column/value reader inferfaces.

## [0.3.0]

- `Added` moved `min` and `max` to the main metric.
Expand Down
6 changes: 6 additions & 0 deletions internal/infra/filesReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func NewJSONLFolderReader(folderpath string) (*JSONLFolderReader, error) {
}, nil
}

func (r *JSONLFolderReader) Open() error { return nil }
func (r *JSONLFolderReader) Close() error { return nil }

func (r *JSONLFolderReader) BaseName() string {
return r.basename
}
Expand Down Expand Up @@ -155,6 +158,9 @@ func NewJSONLColReader(table, column string, decoder *json.Decoder) *JSONLColRea
}
}

func (cr *JSONLColReader) Open() error { return nil }
func (cr *JSONLColReader) Close() error { return nil }

func (cr *JSONLColReader) ColName() string {
return cr.column
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/rimo/driven.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@ package rimo

import "github.com/cgi-fr/rimo/pkg/model"

type Resource interface {
Open() error
Close() error
}

type ColReader interface {
Resource
ColName() string
TableName() string
Next() bool
Value() (any, error)
}

type Reader interface {
Resource
BaseName() string
Next() bool
Col() (ColReader, error)
Expand Down
168 changes: 98 additions & 70 deletions pkg/rimo/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,102 +32,130 @@ type Driver struct {
Distinct bool
}

//nolint:funlen,cyclop,gocognit
func (d Driver) AnalyseBase(reader Reader, writer Writer) error {
func (d Driver) AnalyseBase(reader Reader, writer Writer) (err error) {
if err := reader.Open(); err != nil {
return fmt.Errorf("failed to open column reader : %w", err)
}

defer func() {
if localerr := reader.Close(); err != nil {
err = localerr
}
}()

baseName := reader.BaseName()

base := model.NewBase(baseName)
tables := map[string]model.Table{}

for reader.Next() { // itère colonne par colonne
valreader, err := reader.Col()
if err != nil {
return fmt.Errorf("failed to get column reader : %w", err)
if err := d.analyse(reader, tables); err != nil {
return err
}
}

nilcount := 0
for _, table := range tables {
sort.SliceStable(table.Columns, func(i, j int) bool {
return table.Columns[i].Name < table.Columns[j].Name
})

for valreader.Next() {
val, err := valreader.Value()
if err != nil {
return fmt.Errorf("failed to read value : %w", err)
}
base.Tables = append(base.Tables, table)
}

log.Debug().Msgf("Processing [%s base][%s table][%s column]", baseName, valreader.TableName(), valreader.ColName())
sort.SliceStable(base.Tables, func(i, j int) bool {
return base.Tables[i].Name < base.Tables[j].Name
})

switch valtyped := val.(type) {
case string:
col, err := d.AnalyseString(nilcount, valtyped, valreader)
if err != nil {
return fmt.Errorf("failed to analyse column : %w", err)
}
err = writer.Export(base)
if err != nil {
return fmt.Errorf("failed to export base : %w", err)
}

table, exists := tables[valreader.TableName()]
if !exists {
table = model.Table{
Name: valreader.TableName(),
Columns: []model.Column{},
}
}
return nil
}

table.Columns = append(table.Columns, col)
//nolint:funlen,cyclop
func (d Driver) analyse(reader Reader, tables map[string]model.Table) (err error) {
valreader, err := reader.Col()
if err != nil {
return fmt.Errorf("failed to get column reader : %w", err)
}

tables[valreader.TableName()] = table
case float64, float32, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
col, err := d.AnalyseNumeric(nilcount, valtyped, valreader)
if err != nil {
return fmt.Errorf("failed to analyse column : %w", err)
}
if err := valreader.Open(); err != nil {
return fmt.Errorf("failed to open value reader : %w", err)
}

table, exists := tables[valreader.TableName()]
if !exists {
table = model.Table{
Name: valreader.TableName(),
Columns: []model.Column{},
}
}
defer func() {
if localerr := reader.Close(); err != nil {
err = localerr
}
}()

table.Columns = append(table.Columns, col)
nilcount := 0

tables[valreader.TableName()] = table
case bool:
col, err := d.AnalyseBool(nilcount, valtyped, valreader)
if err != nil {
return fmt.Errorf("failed to analyse column : %w", err)
}
for valreader.Next() {
val, err := valreader.Value()
if err != nil {
return fmt.Errorf("failed to read value : %w", err)
}

table, exists := tables[valreader.TableName()]
if !exists {
table = model.Table{
Name: valreader.TableName(),
Columns: []model.Column{},
}
log.Debug().Msgf("Processing [%s][%s][%s]", reader.BaseName(), valreader.TableName(), valreader.ColName())

switch valtyped := val.(type) {
case string:
col, err := d.AnalyseString(nilcount, valtyped, valreader)
if err != nil {
return fmt.Errorf("failed to analyse column : %w", err)
}

table, exists := tables[valreader.TableName()]
if !exists {
table = model.Table{
Name: valreader.TableName(),
Columns: []model.Column{},
}
}

table.Columns = append(table.Columns, col)
table.Columns = append(table.Columns, col)

tables[valreader.TableName()] = table
case nil:
nilcount++
tables[valreader.TableName()] = table
case float64, float32, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
col, err := d.AnalyseNumeric(nilcount, valtyped, valreader)
if err != nil {
return fmt.Errorf("failed to analyse column : %w", err)
}
}
}

for _, table := range tables {
sort.SliceStable(table.Columns, func(i, j int) bool {
return table.Columns[i].Name < table.Columns[j].Name
})
table, exists := tables[valreader.TableName()]
if !exists {
table = model.Table{
Name: valreader.TableName(),
Columns: []model.Column{},
}
}

base.Tables = append(base.Tables, table)
}
table.Columns = append(table.Columns, col)

sort.SliceStable(base.Tables, func(i, j int) bool {
return base.Tables[i].Name < base.Tables[j].Name
})
tables[valreader.TableName()] = table
case bool:
col, err := d.AnalyseBool(nilcount, valtyped, valreader)
if err != nil {
return fmt.Errorf("failed to analyse column : %w", err)
}

err := writer.Export(base)
if err != nil {
return fmt.Errorf("failed to export base : %w", err)
table, exists := tables[valreader.TableName()]
if !exists {
table = model.Table{
Name: valreader.TableName(),
Columns: []model.Column{},
}
}

table.Columns = append(table.Columns, col)

tables[valreader.TableName()] = table
case nil:
nilcount++
}
}

return nil
Expand Down
Loading

0 comments on commit 3f4ff63

Please sign in to comment.