From 233797fc2ed93a9772b2b25a543fe6b03b55b745 Mon Sep 17 00:00:00 2001 From: Adrien Aury <44274230+adrienaury@users.noreply.github.com> Date: Wed, 27 Mar 2024 09:13:11 +0000 Subject: [PATCH 01/10] feat(scan): alias and filters --- CHANGELOG.md | 6 ++++ internal/app/cli/scan.go | 14 +++++--- pkg/silo/config.go | 50 ++++++++++++++++++++++++++ pkg/silo/driver.go | 43 ++++++++++++++++++----- pkg/silo/errors.go | 13 ++++++- pkg/silo/options.go | 76 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 188 insertions(+), 14 deletions(-) create mode 100644 pkg/silo/config.go create mode 100644 pkg/silo/options.go diff --git a/CHANGELOG.md b/CHANGELOG.md index b276206..f6c359e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,12 @@ Types of changes - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## [0.2.0] + +- `Added` flag `--only` (short `-o`) to only scan a specific list of fields +- `Added` flag `--alias` (short `-a`) to rename fields on the fly +- `Fixed` self reference link are no longer counted in the links counter while scanning + ## [0.1.0] - `Added` initial version diff --git a/internal/app/cli/scan.go b/internal/app/cli/scan.go index d3cebf1..2e35bdb 100644 --- a/internal/app/cli/scan.go +++ b/internal/app/cli/scan.go @@ -28,7 +28,11 @@ import ( ) func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.File) *cobra.Command { - var passthrough bool + var ( + passthrough bool + only []string + aliases map[string]string + ) cmd := &cobra.Command{ //nolint:exhaustruct Use: "scan path", @@ -36,13 +40,15 @@ func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F Example: " lino pull database --table client | " + parent + " scan clients", Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { - if err := scan(cmd, args[0], passthrough); err != nil { + if err := scan(cmd, args[0], passthrough, only, aliases); err != nil { log.Fatal().Err(err).Int("return", 1).Msg("end SILO") } }, } cmd.Flags().BoolVarP(&passthrough, "passthrough", "p", false, "pass stdin to stdout") + cmd.Flags().StringSliceVarP(&only, "only", "o", []string{}, "only scan these columns, exclude all others") + cmd.Flags().StringToStringVarP(&aliases, "alias", "a", map[string]string{}, "use given aliases for each columns") cmd.SetOut(stdout) cmd.SetErr(stderr) @@ -51,7 +57,7 @@ func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F return cmd } -func scan(cmd *cobra.Command, path string, passthrough bool) error { +func scan(cmd *cobra.Command, path string, passthrough bool, only []string, aliases map[string]string) error { backend, err := infra.NewBackend(path) if err != nil { return fmt.Errorf("%w", err) @@ -59,7 +65,7 @@ func scan(cmd *cobra.Command, path string, passthrough bool) error { defer backend.Close() - driver := silo.NewDriver(backend, nil) + driver := silo.NewDriver(backend, nil, silo.WithKeys(only), silo.WithAliases(aliases)) var reader silo.DataRowReader diff --git a/pkg/silo/config.go b/pkg/silo/config.go new file mode 100644 index 0000000..e63689d --- /dev/null +++ b/pkg/silo/config.go @@ -0,0 +1,50 @@ +// Copyright (C) 2024 CGI France +// +// This file is part of SILO. +// +// SILO is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// SILO is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with SILO. If not, see . + +package silo + +import "errors" + +type Config struct { + Include map[string]bool + Aliases map[string]string +} + +func DefaultConfig() *Config { + config := Config{ + Include: map[string]bool{}, + Aliases: map[string]string{}, + } + + return &config +} + +func (cfg *Config) validate() error { + var errs []error + + for key := range cfg.Aliases { + if _, ok := cfg.Include[key]; !ok && len(cfg.Include) > 0 { + errs = append(errs, &ConfigScanAliasIsNotIncludedError{alias: key}) + } + } + + if len(errs) != 0 { + return errors.Join(errs...) + } + + return nil +} diff --git a/pkg/silo/driver.go b/pkg/silo/driver.go index b573519..96c2366 100644 --- a/pkg/silo/driver.go +++ b/pkg/silo/driver.go @@ -26,14 +26,33 @@ import ( ) type Driver struct { + *Config backend Backend writer DumpWriter } -func NewDriver(backend Backend, writer DumpWriter) *Driver { +func NewDriver(backend Backend, writer DumpWriter, options ...Option) *Driver { + errs := []error{} + config := DefaultConfig() + + for _, option := range options { + if err := option.apply(config); err != nil { + errs = append(errs, err) + } + } + + if err := config.validate(); err != nil { + errs = append(errs, err) + } + + if len(errs) > 0 { + panic(errs) + } + return &Driver{ backend: backend, writer: writer, + Config: config, } } @@ -102,7 +121,7 @@ func (d *Driver) Scan(input DataRowReader, observers ...ScanObserver) error { break } - links := Scan(datarow) + links := d.scan(datarow) log.Info().Int("links", len(links)).Interface("row", datarow).Msg("datarow scanned") @@ -120,12 +139,14 @@ func (d *Driver) ingest(datarow DataRow, links []DataLink, observers ...ScanObse return fmt.Errorf("%w: %w", ErrPersistingData, err) } - if err := d.backend.Store(link.E2, link.E1); err != nil { - return fmt.Errorf("%w: %w", ErrPersistingData, err) - } + if link.E1 != link.E2 { + if err := d.backend.Store(link.E2, link.E1); err != nil { + return fmt.Errorf("%w: %w", ErrPersistingData, err) + } - for _, observer := range observers { - observer.IngestedLink(link) + for _, observer := range observers { + observer.IngestedLink(link) + } } } @@ -136,12 +157,16 @@ func (d *Driver) ingest(datarow DataRow, links []DataLink, observers ...ScanObse return nil } -func Scan(datarow DataRow) []DataLink { +func (cfg *Config) scan(datarow DataRow) []DataLink { nodes := []DataNode{} links := []DataLink{} for key, value := range datarow { - if value != nil { + if _, included := cfg.Include[key]; value != nil && (included || len(cfg.Include) == 0) { + if alias, exist := cfg.Aliases[key]; exist { + key = alias + } + nodes = append(nodes, DataNode{Key: key, Data: value}) } } diff --git a/pkg/silo/errors.go b/pkg/silo/errors.go index cf477c6..4956ba1 100644 --- a/pkg/silo/errors.go +++ b/pkg/silo/errors.go @@ -17,10 +17,21 @@ package silo -import "errors" +import ( + "errors" + "fmt" +) var ( ErrReadingNextInput = errors.New("error while reading next input") ErrPersistingData = errors.New("error while persisting data") ErrReadingPersistedData = errors.New("error while reading persisted data") ) + +type ConfigScanAliasIsNotIncludedError struct { + alias string +} + +func (e *ConfigScanAliasIsNotIncludedError) Error() string { + return fmt.Sprintf("configuration error : alias [%s] is not included", e.alias) +} diff --git a/pkg/silo/options.go b/pkg/silo/options.go new file mode 100644 index 0000000..ed1c7b7 --- /dev/null +++ b/pkg/silo/options.go @@ -0,0 +1,76 @@ +// Copyright (C) 2024 CGI France +// +// This file is part of SILO. +// +// SILO is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// SILO is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with SILO. If not, see . + +package silo + +type Option interface { + applier +} + +type option func(*Config) error + +func (f option) apply(cfg *Config) error { + return f(cfg) +} + +type applier interface { + apply(cfg *Config) error +} + +func Alias(key, alias string) Option { //nolint:ireturn + applier := func(cfg *Config) error { + cfg.Aliases[key] = alias + + return nil + } + + return option(applier) +} + +func Include(key string) Option { //nolint:ireturn + applier := func(cfg *Config) error { + cfg.Include[key] = true + + return nil + } + + return option(applier) +} + +func WithAliases(aliases map[string]string) Option { //nolint:ireturn + applier := func(cfg *Config) error { + for key, alias := range aliases { + cfg.Aliases[key] = alias + } + + return nil + } + + return option(applier) +} + +func WithKeys(keys []string) Option { //nolint:ireturn + applier := func(cfg *Config) error { + for _, key := range keys { + cfg.Include[key] = true + } + + return nil + } + + return option(applier) +} From 0618cf52db3df2eee825c997c7dc2c7e6207e29c Mon Sep 17 00:00:00 2001 From: Adrien Aury <44274230+adrienaury@users.noreply.github.com> Date: Wed, 27 Mar 2024 09:16:11 +0000 Subject: [PATCH 02/10] test(scan): fix venom tests --- test/suites/02-scan.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/suites/02-scan.yml b/test/suites/02-scan.yml index feca70e..a4e6509 100644 --- a/test/suites/02-scan.yml +++ b/test/suites/02-scan.yml @@ -27,14 +27,14 @@ testcases: steps: - script: silo scan ../silos/sparse < ../data/clients_sparse.jsonl assertions: - - result.systemout ShouldContainSubstring "Scanned 3 rows, found 3 links" + - result.systemout ShouldContainSubstring "Scanned 3 rows, found 2 links" - result.code ShouldEqual 0 - name: update silo steps: - script: silo scan ../silos/sparse < ../data/clients_sparse.jsonl assertions: - - result.systemout ShouldContainSubstring "Scanned 3 rows, found 3 links" + - result.systemout ShouldContainSubstring "Scanned 3 rows, found 2 links" - result.code ShouldEqual 0 - name: passthrough From 0034d2d3d2ded9510a73df48b44a5edf116b55d1 Mon Sep 17 00:00:00 2001 From: Adrien Aury <44274230+adrienaury@users.noreply.github.com> Date: Wed, 27 Mar 2024 10:35:07 +0000 Subject: [PATCH 03/10] fix: self reference optimization --- pkg/silo/driver.go | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/pkg/silo/driver.go b/pkg/silo/driver.go index 96c2366..99718b2 100644 --- a/pkg/silo/driver.go +++ b/pkg/silo/driver.go @@ -121,11 +121,11 @@ func (d *Driver) Scan(input DataRowReader, observers ...ScanObserver) error { break } - links := d.scan(datarow) + nodes, links := d.scan(datarow) log.Info().Int("links", len(links)).Interface("row", datarow).Msg("datarow scanned") - if err := d.ingest(datarow, links, observers...); err != nil { + if err := d.ingest(datarow, nodes, links, observers...); err != nil { return err } } @@ -133,20 +133,25 @@ func (d *Driver) Scan(input DataRowReader, observers ...ScanObserver) error { return nil } -func (d *Driver) ingest(datarow DataRow, links []DataLink, observers ...ScanObserver) error { +func (d *Driver) ingest(datarow DataRow, nodes []DataNode, links []DataLink, observers ...ScanObserver) error { for _, link := range links { if err := d.backend.Store(link.E1, link.E2); err != nil { return fmt.Errorf("%w: %w", ErrPersistingData, err) } - if link.E1 != link.E2 { - if err := d.backend.Store(link.E2, link.E1); err != nil { - return fmt.Errorf("%w: %w", ErrPersistingData, err) - } + if err := d.backend.Store(link.E2, link.E1); err != nil { + return fmt.Errorf("%w: %w", ErrPersistingData, err) + } - for _, observer := range observers { - observer.IngestedLink(link) - } + for _, observer := range observers { + observer.IngestedLink(link) + } + } + + // optimization : self reference is useful only if no link has been found, and nodes will contain a single node + if len(links) == 0 && len(nodes) > 0 { + if err := d.backend.Store(nodes[0], nodes[0]); err != nil { + return fmt.Errorf("%w: %w", ErrPersistingData, err) } } @@ -157,13 +162,13 @@ func (d *Driver) ingest(datarow DataRow, links []DataLink, observers ...ScanObse return nil } -func (cfg *Config) scan(datarow DataRow) []DataLink { +func (d *Driver) scan(datarow DataRow) ([]DataNode, []DataLink) { nodes := []DataNode{} links := []DataLink{} for key, value := range datarow { - if _, included := cfg.Include[key]; value != nil && (included || len(cfg.Include) == 0) { - if alias, exist := cfg.Aliases[key]; exist { + if _, included := d.Config.Include[key]; value != nil && (included || len(d.Config.Include) == 0) { + if alias, exist := d.Config.Aliases[key]; exist { key = alias } @@ -171,10 +176,6 @@ func (cfg *Config) scan(datarow DataRow) []DataLink { } } - if len(nodes) == 1 { - links = append(links, DataLink{E1: nodes[0], E2: nodes[0]}) - } - // find all pairs in nodes for i := 0; i < len(nodes); i++ { for j := i + 1; j < len(nodes); j++ { @@ -182,5 +183,5 @@ func (cfg *Config) scan(datarow DataRow) []DataLink { } } - return links + return nodes, links } From 1a0b00963bd25b5d528b63283d190b889204499c Mon Sep 17 00:00:00 2001 From: Adrien Aury <44274230+adrienaury@users.noreply.github.com> Date: Wed, 27 Mar 2024 10:46:49 +0000 Subject: [PATCH 04/10] feat(scan): rename only to include --- CHANGELOG.md | 4 ++-- README.md | 28 ++++++++++++++++++++++++++++ internal/app/cli/scan.go | 10 +++++----- 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6c359e..132d82b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,8 +16,8 @@ Types of changes ## [0.2.0] -- `Added` flag `--only` (short `-o`) to only scan a specific list of fields -- `Added` flag `--alias` (short `-a`) to rename fields on the fly +- `Added` flag `--include` (short `-i`) to only scan a specific list of fields, this flag is repeatable +- `Added` flag `--alias` (short `-a`) to rename fields on the fly, this flag is repeatable - `Fixed` self reference link are no longer counted in the links counter while scanning ## [0.1.0] diff --git a/README.md b/README.md index d438aff..27b253c 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,34 @@ $ silo scan my-silo < input.jsonl Analysis data is persisted on disk on the `my-silo` path relative to the current directory. +#### passthrough stdin to stdout + +Use `--passthrough` (short : `-p`) to pass input to stdout instead of diplaying informations. + +```console +$ silo scan my-silo --passthrough < input.jsonl +{"ID_CLIENT":"0001","EMAIL_CLIENT":"jonh.doe@domain.com","ACCOUNT_NUMBER":null} +{"ID_CLIENT":null,"EMAIL_CLIENT":null,"ACCOUNT_NUMBER":"C01"} +``` + +#### include only specific fields/columns + +Use `--include ` (short : `-i `, repeatable) to select only given columns to scan. + +```console +$ silo scan my-silo --include ID_CLIENT --include EMAIL_CLIENT < input.jsonl +⣾ Scanned 5 rows, found 15 links (4084 row/s) [0s] +``` + +#### rename fields/columns on the fly + +Use `--alias =` (short : `-a =`, repeatable) to rename fields before storing links. + +```console +$ silo scan my-silo --alias ID_CLIENT=CLIENT --alias EMAIL_CLIENT=EMAIL < input.jsonl +⣾ Scanned 5 rows, found 15 links (4084 row/s) [0s] +``` + ### silo dump The silo dump command is used to dump each connected entity into a file. This allows users to create a referential of all entities discovered within the JSONLine data. Here's how to use it: diff --git a/internal/app/cli/scan.go b/internal/app/cli/scan.go index 2e35bdb..121b914 100644 --- a/internal/app/cli/scan.go +++ b/internal/app/cli/scan.go @@ -30,7 +30,7 @@ import ( func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.File) *cobra.Command { var ( passthrough bool - only []string + include []string aliases map[string]string ) @@ -40,14 +40,14 @@ func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F Example: " lino pull database --table client | " + parent + " scan clients", Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { - if err := scan(cmd, args[0], passthrough, only, aliases); err != nil { + if err := scan(cmd, args[0], passthrough, include, aliases); err != nil { log.Fatal().Err(err).Int("return", 1).Msg("end SILO") } }, } cmd.Flags().BoolVarP(&passthrough, "passthrough", "p", false, "pass stdin to stdout") - cmd.Flags().StringSliceVarP(&only, "only", "o", []string{}, "only scan these columns, exclude all others") + cmd.Flags().StringSliceVarP(&include, "include", "i", []string{}, "include only these columns, exclude all others") cmd.Flags().StringToStringVarP(&aliases, "alias", "a", map[string]string{}, "use given aliases for each columns") cmd.SetOut(stdout) @@ -57,7 +57,7 @@ func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F return cmd } -func scan(cmd *cobra.Command, path string, passthrough bool, only []string, aliases map[string]string) error { +func scan(cmd *cobra.Command, path string, passthrough bool, include []string, aliases map[string]string) error { backend, err := infra.NewBackend(path) if err != nil { return fmt.Errorf("%w", err) @@ -65,7 +65,7 @@ func scan(cmd *cobra.Command, path string, passthrough bool, only []string, alia defer backend.Close() - driver := silo.NewDriver(backend, nil, silo.WithKeys(only), silo.WithAliases(aliases)) + driver := silo.NewDriver(backend, nil, silo.WithKeys(include), silo.WithAliases(aliases)) var reader silo.DataRowReader From 36ff70c6b030dcc7df0d166edeae8bd7013fa982 Mon Sep 17 00:00:00 2001 From: Adrien Aury <44274230+adrienaury@users.noreply.github.com> Date: Wed, 27 Mar 2024 11:04:20 +0000 Subject: [PATCH 05/10] feat(dump): wip! include only fields --- pkg/silo/config.go | 22 ++++++++++++---------- pkg/silo/driver.go | 10 +++++----- pkg/silo/model_entity.go | 18 +++++++++++------- pkg/silo/options.go | 30 +++++++++++++++++++----------- 4 files changed, 47 insertions(+), 33 deletions(-) diff --git a/pkg/silo/config.go b/pkg/silo/config.go index e63689d..8258744 100644 --- a/pkg/silo/config.go +++ b/pkg/silo/config.go @@ -19,25 +19,27 @@ package silo import "errors" -type Config struct { - Include map[string]bool - Aliases map[string]string +type config struct { + include map[string]bool + includeList []string + aliases map[string]string } -func DefaultConfig() *Config { - config := Config{ - Include: map[string]bool{}, - Aliases: map[string]string{}, +func DefaultConfig() *config { + config := config{ + include: map[string]bool{}, + includeList: []string{}, + aliases: map[string]string{}, } return &config } -func (cfg *Config) validate() error { +func (cfg *config) validate() error { var errs []error - for key := range cfg.Aliases { - if _, ok := cfg.Include[key]; !ok && len(cfg.Include) > 0 { + for key := range cfg.aliases { + if _, ok := cfg.include[key]; !ok && len(cfg.include) > 0 { errs = append(errs, &ConfigScanAliasIsNotIncludedError{alias: key}) } } diff --git a/pkg/silo/driver.go b/pkg/silo/driver.go index 99718b2..4347829 100644 --- a/pkg/silo/driver.go +++ b/pkg/silo/driver.go @@ -26,7 +26,7 @@ import ( ) type Driver struct { - *Config + *config backend Backend writer DumpWriter } @@ -52,7 +52,7 @@ func NewDriver(backend Backend, writer DumpWriter, options ...Option) *Driver { return &Driver{ backend: backend, writer: writer, - Config: config, + config: config, } } @@ -71,7 +71,7 @@ func (d *Driver) Dump() error { break } - entity := NewEntity(entryNode) + entity := NewEntity(d.config.includeList, d.writer, entryNode) if err := d.writer.Write(entryNode, entity.UUID()); err != nil { return fmt.Errorf("%w", err) @@ -167,8 +167,8 @@ func (d *Driver) scan(datarow DataRow) ([]DataNode, []DataLink) { links := []DataLink{} for key, value := range datarow { - if _, included := d.Config.Include[key]; value != nil && (included || len(d.Config.Include) == 0) { - if alias, exist := d.Config.Aliases[key]; exist { + if _, included := d.config.include[key]; value != nil && (included || len(d.config.include) == 0) { + if alias, exist := d.config.aliases[key]; exist { key = alias } diff --git a/pkg/silo/model_entity.go b/pkg/silo/model_entity.go index ec059c2..0bb77f9 100644 --- a/pkg/silo/model_entity.go +++ b/pkg/silo/model_entity.go @@ -31,16 +31,20 @@ const ( ) type Entity struct { - nodes map[DataNode]int - counts map[string]int - uuid string + include []string + nodes map[DataNode]int + counts map[string]int + uuid string + writer DumpWriter } -func NewEntity(nodes ...DataNode) Entity { +func NewEntity(include []string, writer DumpWriter, nodes ...DataNode) Entity { entity := Entity{ - nodes: make(map[DataNode]int, defaultEntitySize), - counts: make(map[string]int, defaultEntitySize), - uuid: uuid.NewString(), + include: include, + nodes: make(map[DataNode]int, defaultEntitySize), + counts: make(map[string]int, defaultEntitySize), + uuid: uuid.NewString(), + writer: writer, } for _, node := range nodes { entity.Append(node) diff --git a/pkg/silo/options.go b/pkg/silo/options.go index ed1c7b7..a16364d 100644 --- a/pkg/silo/options.go +++ b/pkg/silo/options.go @@ -21,19 +21,19 @@ type Option interface { applier } -type option func(*Config) error +type option func(*config) error -func (f option) apply(cfg *Config) error { +func (f option) apply(cfg *config) error { return f(cfg) } type applier interface { - apply(cfg *Config) error + apply(cfg *config) error } func Alias(key, alias string) Option { //nolint:ireturn - applier := func(cfg *Config) error { - cfg.Aliases[key] = alias + applier := func(cfg *config) error { + cfg.aliases[key] = alias return nil } @@ -42,8 +42,12 @@ func Alias(key, alias string) Option { //nolint:ireturn } func Include(key string) Option { //nolint:ireturn - applier := func(cfg *Config) error { - cfg.Include[key] = true + applier := func(cfg *config) error { + if _, exist := cfg.include[key]; !exist { + cfg.includeList = append(cfg.includeList, key) + } + + cfg.include[key] = true return nil } @@ -52,9 +56,9 @@ func Include(key string) Option { //nolint:ireturn } func WithAliases(aliases map[string]string) Option { //nolint:ireturn - applier := func(cfg *Config) error { + applier := func(cfg *config) error { for key, alias := range aliases { - cfg.Aliases[key] = alias + cfg.aliases[key] = alias } return nil @@ -64,9 +68,13 @@ func WithAliases(aliases map[string]string) Option { //nolint:ireturn } func WithKeys(keys []string) Option { //nolint:ireturn - applier := func(cfg *Config) error { + applier := func(cfg *config) error { for _, key := range keys { - cfg.Include[key] = true + if _, exist := cfg.include[key]; !exist { + cfg.includeList = append(cfg.includeList, key) + } + + cfg.include[key] = true } return nil From 8e889402f622314633620bffc49283c8d582b98b Mon Sep 17 00:00:00 2001 From: Adrien Aury <44274230+adrienaury@users.noreply.github.com> Date: Wed, 27 Mar 2024 11:04:40 +0000 Subject: [PATCH 06/10] feat(dump): wip! add include flag --- internal/app/cli/dump.go | 12 +++++++++--- internal/app/cli/scan.go | 4 +++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/internal/app/cli/dump.go b/internal/app/cli/dump.go index 3deeea3..65377a1 100644 --- a/internal/app/cli/dump.go +++ b/internal/app/cli/dump.go @@ -28,18 +28,24 @@ import ( ) func NewDumpCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.File) *cobra.Command { + var include []string + cmd := &cobra.Command{ //nolint:exhaustruct Use: "dump path", Short: "Dump silo database stored in given path into stdout", Example: " " + parent + " dump clients", Args: cobra.ExactArgs(1), Run: func(_ *cobra.Command, args []string) { - if err := dump(args[0]); err != nil { + if err := dump(args[0], include); err != nil { log.Fatal().Err(err).Int("return", 1).Msg("end SILO") } }, } + cmd.Flags().StringSliceVarP(&include, "include", "i", []string{}, "include only these columns, exclude all others") + + cmd.Flags().SortFlags = false + cmd.SetOut(stdout) cmd.SetErr(stderr) cmd.SetIn(stdin) @@ -47,7 +53,7 @@ func NewDumpCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F return cmd } -func dump(path string) error { +func dump(path string, include []string) error { backend, err := infra.NewBackend(path) if err != nil { return fmt.Errorf("%w", err) @@ -55,7 +61,7 @@ func dump(path string) error { defer backend.Close() - driver := silo.NewDriver(backend, infra.NewDumpJSONLine()) + driver := silo.NewDriver(backend, infra.NewDumpJSONLine(), silo.WithKeys(include)) if err := driver.Dump(); err != nil { return fmt.Errorf("%w", err) diff --git a/internal/app/cli/scan.go b/internal/app/cli/scan.go index 121b914..445bd82 100644 --- a/internal/app/cli/scan.go +++ b/internal/app/cli/scan.go @@ -37,7 +37,7 @@ func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F cmd := &cobra.Command{ //nolint:exhaustruct Use: "scan path", Short: "Ingest data from stdin and update silo database stored in given path", - Example: " lino pull database --table client | " + parent + " scan clients", + Example: " " + parent + " scan clients < clients.jsonl", Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { if err := scan(cmd, args[0], passthrough, include, aliases); err != nil { @@ -50,6 +50,8 @@ func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F cmd.Flags().StringSliceVarP(&include, "include", "i", []string{}, "include only these columns, exclude all others") cmd.Flags().StringToStringVarP(&aliases, "alias", "a", map[string]string{}, "use given aliases for each columns") + cmd.Flags().SortFlags = false + cmd.SetOut(stdout) cmd.SetErr(stderr) cmd.SetIn(stdin) From 2c9534523eb821d28df84e9c49b9b2dd43a34e2a Mon Sep 17 00:00:00 2001 From: Adrien Aury <44274230+adrienaury@users.noreply.github.com> Date: Wed, 27 Mar 2024 11:34:22 +0000 Subject: [PATCH 07/10] feat(dump): include flag + detect complete entity --- pkg/silo/driver.go | 16 +++++++++++++--- pkg/silo/model_entity.go | 41 +++++++++++++++++++++++++--------------- 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/pkg/silo/driver.go b/pkg/silo/driver.go index 4347829..8312947 100644 --- a/pkg/silo/driver.go +++ b/pkg/silo/driver.go @@ -71,9 +71,9 @@ func (d *Driver) Dump() error { break } - entity := NewEntity(d.config.includeList, d.writer, entryNode) + entity := NewEntity(d.config.includeList, entryNode) - if err := d.writer.Write(entryNode, entity.UUID()); err != nil { + if err := d.write(entryNode, entity.UUID()); err != nil { return fmt.Errorf("%w", err) } @@ -95,7 +95,7 @@ func (d *Driver) dump(snapshot Snapshot, node DataNode, entity Entity) error { for _, connectedNode := range connectedNodes { if entity.Append(connectedNode) { - if err := d.writer.Write(connectedNode, entity.UUID()); err != nil { + if err := d.write(connectedNode, entity.UUID()); err != nil { return fmt.Errorf("%w", err) } @@ -108,6 +108,16 @@ func (d *Driver) dump(snapshot Snapshot, node DataNode, entity Entity) error { return nil } +func (d *Driver) write(node DataNode, uuid string) error { + if _, included := d.config.include[node.Key]; included || len(d.config.include) == 0 { + if err := d.writer.Write(node, uuid); err != nil { + return fmt.Errorf("%w", err) + } + } + + return nil +} + func (d *Driver) Scan(input DataRowReader, observers ...ScanObserver) error { defer input.Close() diff --git a/pkg/silo/model_entity.go b/pkg/silo/model_entity.go index 0bb77f9..299f01c 100644 --- a/pkg/silo/model_entity.go +++ b/pkg/silo/model_entity.go @@ -25,9 +25,10 @@ import ( const defaultEntitySize = 10 const ( - statusEntityOK = "consistent" - statusEntityPartial = "partial" + statusEntityComplete = "complete" + statusEntityConsistent = "consistent" statusEntityInconsistent = "inconsistent" + statusEntityEmpty = "empty" ) type Entity struct { @@ -35,16 +36,14 @@ type Entity struct { nodes map[DataNode]int counts map[string]int uuid string - writer DumpWriter } -func NewEntity(include []string, writer DumpWriter, nodes ...DataNode) Entity { +func NewEntity(include []string, nodes ...DataNode) Entity { entity := Entity{ include: include, nodes: make(map[DataNode]int, defaultEntitySize), counts: make(map[string]int, defaultEntitySize), uuid: uuid.NewString(), - writer: writer, } for _, node := range nodes { entity.Append(node) @@ -75,26 +74,38 @@ func (s Entity) UUID() string { return s.uuid } -//nolint:zerologlint func (s Entity) Finalize() { - msg := log.Info().Str("status", statusEntityOK) + msg := log.Info().Str("status", statusEntityConsistent) - for _, count := range s.counts { + counts := s.counts + + if len(s.include) > 0 { + counts = make(map[string]int, len(s.include)) + for _, key := range s.include { + if s.counts[key] > 0 { + counts[key] = s.counts[key] + } + } + } + + if len(counts) == len(s.include) && len(s.include) > 0 { + msg.Str("status", statusEntityComplete) + } else if len(counts) == 0 { + msg.Str("status", statusEntityEmpty) + } + + for _, count := range counts { if count > 1 { msg = log.Warn().Str("status", statusEntityInconsistent) break } - - if count == 0 { - msg = log.Warn().Str("status", statusEntityPartial) - } } - msg = msg.Str("uuid", s.UUID()) + msg.Str("uuid", s.UUID()) - for id, count := range s.counts { - msg.Int(id, count) + for id, count := range counts { + msg.Int("count-"+id, count) } msg.Msg("entity identified") From 766db4c67d793bcdef8c6718e34dd3092b3f1bdc Mon Sep 17 00:00:00 2001 From: Adrien Aury <44274230+adrienaury@users.noreply.github.com> Date: Wed, 27 Mar 2024 12:25:58 +0000 Subject: [PATCH 08/10] feat(dump): add dump observers --- CHANGELOG.md | 2 +- pkg/silo/config.go | 2 +- pkg/silo/driven.go | 4 ++++ pkg/silo/driver.go | 12 +++++++++--- pkg/silo/model_entity.go | 26 +++++++++++++++++--------- 5 files changed, 32 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 132d82b..69ba4b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ Types of changes ## [0.2.0] -- `Added` flag `--include` (short `-i`) to only scan a specific list of fields, this flag is repeatable +- `Added` flag `--include` (short `-i`) to only scan/dump a specific list of fields, this flag is repeatable - `Added` flag `--alias` (short `-a`) to rename fields on the fly, this flag is repeatable - `Fixed` self reference link are no longer counted in the links counter while scanning diff --git a/pkg/silo/config.go b/pkg/silo/config.go index 8258744..dfecb9a 100644 --- a/pkg/silo/config.go +++ b/pkg/silo/config.go @@ -25,7 +25,7 @@ type config struct { aliases map[string]string } -func DefaultConfig() *config { +func newConfig() *config { config := config{ include: map[string]bool{}, includeList: []string{}, diff --git a/pkg/silo/driven.go b/pkg/silo/driven.go index 475753d..7a6b2e2 100644 --- a/pkg/silo/driven.go +++ b/pkg/silo/driven.go @@ -43,3 +43,7 @@ type ScanObserver interface { IngestedRow(row DataRow) IngestedLink(link DataLink) } + +type DumpObserver interface { + Entity(status Status, counts map[string]int) +} diff --git a/pkg/silo/driver.go b/pkg/silo/driver.go index 8312947..1b1d099 100644 --- a/pkg/silo/driver.go +++ b/pkg/silo/driver.go @@ -33,7 +33,7 @@ type Driver struct { func NewDriver(backend Backend, writer DumpWriter, options ...Option) *Driver { errs := []error{} - config := DefaultConfig() + config := newConfig() for _, option := range options { if err := option.apply(config); err != nil { @@ -56,7 +56,7 @@ func NewDriver(backend Backend, writer DumpWriter, options ...Option) *Driver { } } -func (d *Driver) Dump() error { +func (d *Driver) Dump(observers ...DumpObserver) error { snapshot := d.backend.Snapshot() defer snapshot.Close() @@ -81,7 +81,13 @@ func (d *Driver) Dump() error { return fmt.Errorf("%w", err) } - entity.Finalize() + status, counts := entity.Finalize() + + for _, observer := range observers { + if observer != nil { + observer.Entity(status, counts) + } + } } return nil diff --git a/pkg/silo/model_entity.go b/pkg/silo/model_entity.go index 299f01c..1bf4d3c 100644 --- a/pkg/silo/model_entity.go +++ b/pkg/silo/model_entity.go @@ -24,11 +24,13 @@ import ( const defaultEntitySize = 10 +type Status string + const ( - statusEntityComplete = "complete" - statusEntityConsistent = "consistent" - statusEntityInconsistent = "inconsistent" - statusEntityEmpty = "empty" + StatusEntityComplete Status = "complete" + StatusEntityConsistent Status = "consistent" + StatusEntityInconsistent Status = "inconsistent" + StatusEntityEmpty Status = "empty" ) type Entity struct { @@ -74,9 +76,10 @@ func (s Entity) UUID() string { return s.uuid } -func (s Entity) Finalize() { - msg := log.Info().Str("status", statusEntityConsistent) +func (s Entity) Finalize() (Status, map[string]int) { + msg := log.Info().Str("status", string(StatusEntityConsistent)) + status := StatusEntityConsistent counts := s.counts if len(s.include) > 0 { @@ -89,14 +92,17 @@ func (s Entity) Finalize() { } if len(counts) == len(s.include) && len(s.include) > 0 { - msg.Str("status", statusEntityComplete) + msg.Str("status", string(StatusEntityComplete)) + status = StatusEntityComplete } else if len(counts) == 0 { - msg.Str("status", statusEntityEmpty) + msg.Str("status", string(StatusEntityEmpty)) + status = StatusEntityEmpty } for _, count := range counts { if count > 1 { - msg = log.Warn().Str("status", statusEntityInconsistent) + msg = log.Warn().Str("status", string(StatusEntityInconsistent)) + status = StatusEntityInconsistent break } @@ -109,4 +115,6 @@ func (s Entity) Finalize() { } msg.Msg("entity identified") + + return status, counts } From 36190612da68aa348dccc366bac5a3e62908d56a Mon Sep 17 00:00:00 2001 From: Adrien Aury <44274230+adrienaury@users.noreply.github.com> Date: Wed, 27 Mar 2024 12:43:41 +0000 Subject: [PATCH 09/10] feat(dump): add watch flag --- CHANGELOG.md | 1 + internal/app/cli/dump.go | 19 +++++-- internal/infra/dump_observer.go | 87 +++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 4 deletions(-) create mode 100644 internal/infra/dump_observer.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 69ba4b2..15e949e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Types of changes - `Added` flag `--include` (short `-i`) to only scan/dump a specific list of fields, this flag is repeatable - `Added` flag `--alias` (short `-a`) to rename fields on the fly, this flag is repeatable +- `Added` flag `--watch` (short `-w`) to the dump command - `Fixed` self reference link are no longer counted in the links counter while scanning ## [0.1.0] diff --git a/internal/app/cli/dump.go b/internal/app/cli/dump.go index 65377a1..228b304 100644 --- a/internal/app/cli/dump.go +++ b/internal/app/cli/dump.go @@ -28,7 +28,10 @@ import ( ) func NewDumpCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.File) *cobra.Command { - var include []string + var ( + include []string + watch bool + ) cmd := &cobra.Command{ //nolint:exhaustruct Use: "dump path", @@ -36,13 +39,14 @@ func NewDumpCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F Example: " " + parent + " dump clients", Args: cobra.ExactArgs(1), Run: func(_ *cobra.Command, args []string) { - if err := dump(args[0], include); err != nil { + if err := dump(args[0], include, watch); err != nil { log.Fatal().Err(err).Int("return", 1).Msg("end SILO") } }, } cmd.Flags().StringSliceVarP(&include, "include", "i", []string{}, "include only these columns, exclude all others") + cmd.Flags().BoolVarP(&watch, "watch", "w", false, "watch statistics about dumped entities in stderr") cmd.Flags().SortFlags = false @@ -53,7 +57,7 @@ func NewDumpCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F return cmd } -func dump(path string, include []string) error { +func dump(path string, include []string, watch bool) error { backend, err := infra.NewBackend(path) if err != nil { return fmt.Errorf("%w", err) @@ -63,7 +67,14 @@ func dump(path string, include []string) error { driver := silo.NewDriver(backend, infra.NewDumpJSONLine(), silo.WithKeys(include)) - if err := driver.Dump(); err != nil { + if watch { + observer := infra.NewDumpObserver() + defer observer.Close() + + if err := driver.Dump(observer); err != nil { + return fmt.Errorf("%w", err) + } + } else if err := driver.Dump(); err != nil { return fmt.Errorf("%w", err) } diff --git a/internal/infra/dump_observer.go b/internal/infra/dump_observer.go new file mode 100644 index 0000000..7265094 --- /dev/null +++ b/internal/infra/dump_observer.go @@ -0,0 +1,87 @@ +// Copyright (C) 2024 CGI France +// +// This file is part of SILO. +// +// SILO is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// SILO is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with SILO. If not, see . + +package infra + +import ( + "fmt" + "os" + "time" + + "github.com/cgi-fr/silo/pkg/silo" + "github.com/schollz/progressbar/v3" +) + +type DumpObserver struct { + countTotal int + countComplete int + countConsistent int + countInconsistent int + countEmpty int + bar *progressbar.ProgressBar +} + +func NewDumpObserver() *DumpObserver { + //nolint:gomnd + pgb := progressbar.NewOptions(-1, + progressbar.OptionSetDescription("Dumping ... "), + progressbar.OptionSetItsString("entity"), + progressbar.OptionSetWriter(os.Stderr), + progressbar.OptionShowIts(), + progressbar.OptionSpinnerType(11), + progressbar.OptionThrottle(time.Millisecond*10), + progressbar.OptionOnCompletion(func() { fmt.Fprintln(os.Stderr) }), + // progressbar.OptionShowDescriptionAtLineEnd(), + ) + + return &DumpObserver{ + countTotal: 0, + countComplete: 0, + countConsistent: 0, + countInconsistent: 0, + countEmpty: 0, + bar: pgb, + } +} + +func (o *DumpObserver) Entity(status silo.Status, _ map[string]int) { + o.countTotal++ + + switch status { + case silo.StatusEntityComplete: + o.countComplete++ + case silo.StatusEntityConsistent: + o.countConsistent++ + case silo.StatusEntityInconsistent: + o.countInconsistent++ + case silo.StatusEntityEmpty: + o.countEmpty++ + } + + _ = o.bar.Add(1) + + o.bar.Describe(fmt.Sprintf("Dumped %d entities / complete=%d / incomplete=%d / inconsistent=%d / empty=%d", + o.countTotal, + o.countComplete, + o.countConsistent, + o.countInconsistent, + o.countEmpty)) +} + +func (o *DumpObserver) Close() { + _ = o.bar.Close() +} From 49c76e014a19f595ec666b81794174151be77239 Mon Sep 17 00:00:00 2001 From: Adrien Aury <44274230+adrienaury@users.noreply.github.com> Date: Wed, 27 Mar 2024 12:49:53 +0000 Subject: [PATCH 10/10] feat(dump): rename incomplete to consistent --- internal/infra/dump_observer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/infra/dump_observer.go b/internal/infra/dump_observer.go index 7265094..c4d3b0a 100644 --- a/internal/infra/dump_observer.go +++ b/internal/infra/dump_observer.go @@ -74,7 +74,7 @@ func (o *DumpObserver) Entity(status silo.Status, _ map[string]int) { _ = o.bar.Add(1) - o.bar.Describe(fmt.Sprintf("Dumped %d entities / complete=%d / incomplete=%d / inconsistent=%d / empty=%d", + o.bar.Describe(fmt.Sprintf("Dumped %d entities / complete=%d / consistent=%d / inconsistent=%d / empty=%d", o.countTotal, o.countComplete, o.countConsistent,