From 233797fc2ed93a9772b2b25a543fe6b03b55b745 Mon Sep 17 00:00:00 2001
From: Adrien Aury <44274230+adrienaury@users.noreply.github.com>
Date: Wed, 27 Mar 2024 09:13:11 +0000
Subject: [PATCH 01/10] feat(scan): alias and filters
---
CHANGELOG.md | 6 ++++
internal/app/cli/scan.go | 14 +++++---
pkg/silo/config.go | 50 ++++++++++++++++++++++++++
pkg/silo/driver.go | 43 ++++++++++++++++++-----
pkg/silo/errors.go | 13 ++++++-
pkg/silo/options.go | 76 ++++++++++++++++++++++++++++++++++++++++
6 files changed, 188 insertions(+), 14 deletions(-)
create mode 100644 pkg/silo/config.go
create mode 100644 pkg/silo/options.go
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b276206..f6c359e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -14,6 +14,12 @@ Types of changes
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.
+## [0.2.0]
+
+- `Added` flag `--only` (short `-o`) to only scan a specific list of fields
+- `Added` flag `--alias` (short `-a`) to rename fields on the fly
+- `Fixed` self reference link are no longer counted in the links counter while scanning
+
## [0.1.0]
- `Added` initial version
diff --git a/internal/app/cli/scan.go b/internal/app/cli/scan.go
index d3cebf1..2e35bdb 100644
--- a/internal/app/cli/scan.go
+++ b/internal/app/cli/scan.go
@@ -28,7 +28,11 @@ import (
)
func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.File) *cobra.Command {
- var passthrough bool
+ var (
+ passthrough bool
+ only []string
+ aliases map[string]string
+ )
cmd := &cobra.Command{ //nolint:exhaustruct
Use: "scan path",
@@ -36,13 +40,15 @@ func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F
Example: " lino pull database --table client | " + parent + " scan clients",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
- if err := scan(cmd, args[0], passthrough); err != nil {
+ if err := scan(cmd, args[0], passthrough, only, aliases); err != nil {
log.Fatal().Err(err).Int("return", 1).Msg("end SILO")
}
},
}
cmd.Flags().BoolVarP(&passthrough, "passthrough", "p", false, "pass stdin to stdout")
+ cmd.Flags().StringSliceVarP(&only, "only", "o", []string{}, "only scan these columns, exclude all others")
+ cmd.Flags().StringToStringVarP(&aliases, "alias", "a", map[string]string{}, "use given aliases for each columns")
cmd.SetOut(stdout)
cmd.SetErr(stderr)
@@ -51,7 +57,7 @@ func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F
return cmd
}
-func scan(cmd *cobra.Command, path string, passthrough bool) error {
+func scan(cmd *cobra.Command, path string, passthrough bool, only []string, aliases map[string]string) error {
backend, err := infra.NewBackend(path)
if err != nil {
return fmt.Errorf("%w", err)
@@ -59,7 +65,7 @@ func scan(cmd *cobra.Command, path string, passthrough bool) error {
defer backend.Close()
- driver := silo.NewDriver(backend, nil)
+ driver := silo.NewDriver(backend, nil, silo.WithKeys(only), silo.WithAliases(aliases))
var reader silo.DataRowReader
diff --git a/pkg/silo/config.go b/pkg/silo/config.go
new file mode 100644
index 0000000..e63689d
--- /dev/null
+++ b/pkg/silo/config.go
@@ -0,0 +1,50 @@
+// Copyright (C) 2024 CGI France
+//
+// This file is part of SILO.
+//
+// SILO is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// SILO is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with SILO. If not, see .
+
+package silo
+
+import "errors"
+
+type Config struct {
+ Include map[string]bool
+ Aliases map[string]string
+}
+
+func DefaultConfig() *Config {
+ config := Config{
+ Include: map[string]bool{},
+ Aliases: map[string]string{},
+ }
+
+ return &config
+}
+
+func (cfg *Config) validate() error {
+ var errs []error
+
+ for key := range cfg.Aliases {
+ if _, ok := cfg.Include[key]; !ok && len(cfg.Include) > 0 {
+ errs = append(errs, &ConfigScanAliasIsNotIncludedError{alias: key})
+ }
+ }
+
+ if len(errs) != 0 {
+ return errors.Join(errs...)
+ }
+
+ return nil
+}
diff --git a/pkg/silo/driver.go b/pkg/silo/driver.go
index b573519..96c2366 100644
--- a/pkg/silo/driver.go
+++ b/pkg/silo/driver.go
@@ -26,14 +26,33 @@ import (
)
type Driver struct {
+ *Config
backend Backend
writer DumpWriter
}
-func NewDriver(backend Backend, writer DumpWriter) *Driver {
+func NewDriver(backend Backend, writer DumpWriter, options ...Option) *Driver {
+ errs := []error{}
+ config := DefaultConfig()
+
+ for _, option := range options {
+ if err := option.apply(config); err != nil {
+ errs = append(errs, err)
+ }
+ }
+
+ if err := config.validate(); err != nil {
+ errs = append(errs, err)
+ }
+
+ if len(errs) > 0 {
+ panic(errs)
+ }
+
return &Driver{
backend: backend,
writer: writer,
+ Config: config,
}
}
@@ -102,7 +121,7 @@ func (d *Driver) Scan(input DataRowReader, observers ...ScanObserver) error {
break
}
- links := Scan(datarow)
+ links := d.scan(datarow)
log.Info().Int("links", len(links)).Interface("row", datarow).Msg("datarow scanned")
@@ -120,12 +139,14 @@ func (d *Driver) ingest(datarow DataRow, links []DataLink, observers ...ScanObse
return fmt.Errorf("%w: %w", ErrPersistingData, err)
}
- if err := d.backend.Store(link.E2, link.E1); err != nil {
- return fmt.Errorf("%w: %w", ErrPersistingData, err)
- }
+ if link.E1 != link.E2 {
+ if err := d.backend.Store(link.E2, link.E1); err != nil {
+ return fmt.Errorf("%w: %w", ErrPersistingData, err)
+ }
- for _, observer := range observers {
- observer.IngestedLink(link)
+ for _, observer := range observers {
+ observer.IngestedLink(link)
+ }
}
}
@@ -136,12 +157,16 @@ func (d *Driver) ingest(datarow DataRow, links []DataLink, observers ...ScanObse
return nil
}
-func Scan(datarow DataRow) []DataLink {
+func (cfg *Config) scan(datarow DataRow) []DataLink {
nodes := []DataNode{}
links := []DataLink{}
for key, value := range datarow {
- if value != nil {
+ if _, included := cfg.Include[key]; value != nil && (included || len(cfg.Include) == 0) {
+ if alias, exist := cfg.Aliases[key]; exist {
+ key = alias
+ }
+
nodes = append(nodes, DataNode{Key: key, Data: value})
}
}
diff --git a/pkg/silo/errors.go b/pkg/silo/errors.go
index cf477c6..4956ba1 100644
--- a/pkg/silo/errors.go
+++ b/pkg/silo/errors.go
@@ -17,10 +17,21 @@
package silo
-import "errors"
+import (
+ "errors"
+ "fmt"
+)
var (
ErrReadingNextInput = errors.New("error while reading next input")
ErrPersistingData = errors.New("error while persisting data")
ErrReadingPersistedData = errors.New("error while reading persisted data")
)
+
+type ConfigScanAliasIsNotIncludedError struct {
+ alias string
+}
+
+func (e *ConfigScanAliasIsNotIncludedError) Error() string {
+ return fmt.Sprintf("configuration error : alias [%s] is not included", e.alias)
+}
diff --git a/pkg/silo/options.go b/pkg/silo/options.go
new file mode 100644
index 0000000..ed1c7b7
--- /dev/null
+++ b/pkg/silo/options.go
@@ -0,0 +1,76 @@
+// Copyright (C) 2024 CGI France
+//
+// This file is part of SILO.
+//
+// SILO is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// SILO is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with SILO. If not, see .
+
+package silo
+
+type Option interface {
+ applier
+}
+
+type option func(*Config) error
+
+func (f option) apply(cfg *Config) error {
+ return f(cfg)
+}
+
+type applier interface {
+ apply(cfg *Config) error
+}
+
+func Alias(key, alias string) Option { //nolint:ireturn
+ applier := func(cfg *Config) error {
+ cfg.Aliases[key] = alias
+
+ return nil
+ }
+
+ return option(applier)
+}
+
+func Include(key string) Option { //nolint:ireturn
+ applier := func(cfg *Config) error {
+ cfg.Include[key] = true
+
+ return nil
+ }
+
+ return option(applier)
+}
+
+func WithAliases(aliases map[string]string) Option { //nolint:ireturn
+ applier := func(cfg *Config) error {
+ for key, alias := range aliases {
+ cfg.Aliases[key] = alias
+ }
+
+ return nil
+ }
+
+ return option(applier)
+}
+
+func WithKeys(keys []string) Option { //nolint:ireturn
+ applier := func(cfg *Config) error {
+ for _, key := range keys {
+ cfg.Include[key] = true
+ }
+
+ return nil
+ }
+
+ return option(applier)
+}
From 0618cf52db3df2eee825c997c7dc2c7e6207e29c Mon Sep 17 00:00:00 2001
From: Adrien Aury <44274230+adrienaury@users.noreply.github.com>
Date: Wed, 27 Mar 2024 09:16:11 +0000
Subject: [PATCH 02/10] test(scan): fix venom tests
---
test/suites/02-scan.yml | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/test/suites/02-scan.yml b/test/suites/02-scan.yml
index feca70e..a4e6509 100644
--- a/test/suites/02-scan.yml
+++ b/test/suites/02-scan.yml
@@ -27,14 +27,14 @@ testcases:
steps:
- script: silo scan ../silos/sparse < ../data/clients_sparse.jsonl
assertions:
- - result.systemout ShouldContainSubstring "Scanned 3 rows, found 3 links"
+ - result.systemout ShouldContainSubstring "Scanned 3 rows, found 2 links"
- result.code ShouldEqual 0
- name: update silo
steps:
- script: silo scan ../silos/sparse < ../data/clients_sparse.jsonl
assertions:
- - result.systemout ShouldContainSubstring "Scanned 3 rows, found 3 links"
+ - result.systemout ShouldContainSubstring "Scanned 3 rows, found 2 links"
- result.code ShouldEqual 0
- name: passthrough
From 0034d2d3d2ded9510a73df48b44a5edf116b55d1 Mon Sep 17 00:00:00 2001
From: Adrien Aury <44274230+adrienaury@users.noreply.github.com>
Date: Wed, 27 Mar 2024 10:35:07 +0000
Subject: [PATCH 03/10] fix: self reference optimization
---
pkg/silo/driver.go | 37 +++++++++++++++++++------------------
1 file changed, 19 insertions(+), 18 deletions(-)
diff --git a/pkg/silo/driver.go b/pkg/silo/driver.go
index 96c2366..99718b2 100644
--- a/pkg/silo/driver.go
+++ b/pkg/silo/driver.go
@@ -121,11 +121,11 @@ func (d *Driver) Scan(input DataRowReader, observers ...ScanObserver) error {
break
}
- links := d.scan(datarow)
+ nodes, links := d.scan(datarow)
log.Info().Int("links", len(links)).Interface("row", datarow).Msg("datarow scanned")
- if err := d.ingest(datarow, links, observers...); err != nil {
+ if err := d.ingest(datarow, nodes, links, observers...); err != nil {
return err
}
}
@@ -133,20 +133,25 @@ func (d *Driver) Scan(input DataRowReader, observers ...ScanObserver) error {
return nil
}
-func (d *Driver) ingest(datarow DataRow, links []DataLink, observers ...ScanObserver) error {
+func (d *Driver) ingest(datarow DataRow, nodes []DataNode, links []DataLink, observers ...ScanObserver) error {
for _, link := range links {
if err := d.backend.Store(link.E1, link.E2); err != nil {
return fmt.Errorf("%w: %w", ErrPersistingData, err)
}
- if link.E1 != link.E2 {
- if err := d.backend.Store(link.E2, link.E1); err != nil {
- return fmt.Errorf("%w: %w", ErrPersistingData, err)
- }
+ if err := d.backend.Store(link.E2, link.E1); err != nil {
+ return fmt.Errorf("%w: %w", ErrPersistingData, err)
+ }
- for _, observer := range observers {
- observer.IngestedLink(link)
- }
+ for _, observer := range observers {
+ observer.IngestedLink(link)
+ }
+ }
+
+ // optimization : self reference is useful only if no link has been found, and nodes will contain a single node
+ if len(links) == 0 && len(nodes) > 0 {
+ if err := d.backend.Store(nodes[0], nodes[0]); err != nil {
+ return fmt.Errorf("%w: %w", ErrPersistingData, err)
}
}
@@ -157,13 +162,13 @@ func (d *Driver) ingest(datarow DataRow, links []DataLink, observers ...ScanObse
return nil
}
-func (cfg *Config) scan(datarow DataRow) []DataLink {
+func (d *Driver) scan(datarow DataRow) ([]DataNode, []DataLink) {
nodes := []DataNode{}
links := []DataLink{}
for key, value := range datarow {
- if _, included := cfg.Include[key]; value != nil && (included || len(cfg.Include) == 0) {
- if alias, exist := cfg.Aliases[key]; exist {
+ if _, included := d.Config.Include[key]; value != nil && (included || len(d.Config.Include) == 0) {
+ if alias, exist := d.Config.Aliases[key]; exist {
key = alias
}
@@ -171,10 +176,6 @@ func (cfg *Config) scan(datarow DataRow) []DataLink {
}
}
- if len(nodes) == 1 {
- links = append(links, DataLink{E1: nodes[0], E2: nodes[0]})
- }
-
// find all pairs in nodes
for i := 0; i < len(nodes); i++ {
for j := i + 1; j < len(nodes); j++ {
@@ -182,5 +183,5 @@ func (cfg *Config) scan(datarow DataRow) []DataLink {
}
}
- return links
+ return nodes, links
}
From 1a0b00963bd25b5d528b63283d190b889204499c Mon Sep 17 00:00:00 2001
From: Adrien Aury <44274230+adrienaury@users.noreply.github.com>
Date: Wed, 27 Mar 2024 10:46:49 +0000
Subject: [PATCH 04/10] feat(scan): rename only to include
---
CHANGELOG.md | 4 ++--
README.md | 28 ++++++++++++++++++++++++++++
internal/app/cli/scan.go | 10 +++++-----
3 files changed, 35 insertions(+), 7 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f6c359e..132d82b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,8 +16,8 @@ Types of changes
## [0.2.0]
-- `Added` flag `--only` (short `-o`) to only scan a specific list of fields
-- `Added` flag `--alias` (short `-a`) to rename fields on the fly
+- `Added` flag `--include` (short `-i`) to only scan a specific list of fields, this flag is repeatable
+- `Added` flag `--alias` (short `-a`) to rename fields on the fly, this flag is repeatable
- `Fixed` self reference link are no longer counted in the links counter while scanning
## [0.1.0]
diff --git a/README.md b/README.md
index d438aff..27b253c 100644
--- a/README.md
+++ b/README.md
@@ -65,6 +65,34 @@ $ silo scan my-silo < input.jsonl
Analysis data is persisted on disk on the `my-silo` path relative to the current directory.
+#### passthrough stdin to stdout
+
+Use `--passthrough` (short : `-p`) to pass input to stdout instead of diplaying informations.
+
+```console
+$ silo scan my-silo --passthrough < input.jsonl
+{"ID_CLIENT":"0001","EMAIL_CLIENT":"jonh.doe@domain.com","ACCOUNT_NUMBER":null}
+{"ID_CLIENT":null,"EMAIL_CLIENT":null,"ACCOUNT_NUMBER":"C01"}
+```
+
+#### include only specific fields/columns
+
+Use `--include ` (short : `-i `, repeatable) to select only given columns to scan.
+
+```console
+$ silo scan my-silo --include ID_CLIENT --include EMAIL_CLIENT < input.jsonl
+⣾ Scanned 5 rows, found 15 links (4084 row/s) [0s]
+```
+
+#### rename fields/columns on the fly
+
+Use `--alias =` (short : `-a =`, repeatable) to rename fields before storing links.
+
+```console
+$ silo scan my-silo --alias ID_CLIENT=CLIENT --alias EMAIL_CLIENT=EMAIL < input.jsonl
+⣾ Scanned 5 rows, found 15 links (4084 row/s) [0s]
+```
+
### silo dump
The silo dump command is used to dump each connected entity into a file. This allows users to create a referential of all entities discovered within the JSONLine data. Here's how to use it:
diff --git a/internal/app/cli/scan.go b/internal/app/cli/scan.go
index 2e35bdb..121b914 100644
--- a/internal/app/cli/scan.go
+++ b/internal/app/cli/scan.go
@@ -30,7 +30,7 @@ import (
func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.File) *cobra.Command {
var (
passthrough bool
- only []string
+ include []string
aliases map[string]string
)
@@ -40,14 +40,14 @@ func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F
Example: " lino pull database --table client | " + parent + " scan clients",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
- if err := scan(cmd, args[0], passthrough, only, aliases); err != nil {
+ if err := scan(cmd, args[0], passthrough, include, aliases); err != nil {
log.Fatal().Err(err).Int("return", 1).Msg("end SILO")
}
},
}
cmd.Flags().BoolVarP(&passthrough, "passthrough", "p", false, "pass stdin to stdout")
- cmd.Flags().StringSliceVarP(&only, "only", "o", []string{}, "only scan these columns, exclude all others")
+ cmd.Flags().StringSliceVarP(&include, "include", "i", []string{}, "include only these columns, exclude all others")
cmd.Flags().StringToStringVarP(&aliases, "alias", "a", map[string]string{}, "use given aliases for each columns")
cmd.SetOut(stdout)
@@ -57,7 +57,7 @@ func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F
return cmd
}
-func scan(cmd *cobra.Command, path string, passthrough bool, only []string, aliases map[string]string) error {
+func scan(cmd *cobra.Command, path string, passthrough bool, include []string, aliases map[string]string) error {
backend, err := infra.NewBackend(path)
if err != nil {
return fmt.Errorf("%w", err)
@@ -65,7 +65,7 @@ func scan(cmd *cobra.Command, path string, passthrough bool, only []string, alia
defer backend.Close()
- driver := silo.NewDriver(backend, nil, silo.WithKeys(only), silo.WithAliases(aliases))
+ driver := silo.NewDriver(backend, nil, silo.WithKeys(include), silo.WithAliases(aliases))
var reader silo.DataRowReader
From 36ff70c6b030dcc7df0d166edeae8bd7013fa982 Mon Sep 17 00:00:00 2001
From: Adrien Aury <44274230+adrienaury@users.noreply.github.com>
Date: Wed, 27 Mar 2024 11:04:20 +0000
Subject: [PATCH 05/10] feat(dump): wip! include only fields
---
pkg/silo/config.go | 22 ++++++++++++----------
pkg/silo/driver.go | 10 +++++-----
pkg/silo/model_entity.go | 18 +++++++++++-------
pkg/silo/options.go | 30 +++++++++++++++++++-----------
4 files changed, 47 insertions(+), 33 deletions(-)
diff --git a/pkg/silo/config.go b/pkg/silo/config.go
index e63689d..8258744 100644
--- a/pkg/silo/config.go
+++ b/pkg/silo/config.go
@@ -19,25 +19,27 @@ package silo
import "errors"
-type Config struct {
- Include map[string]bool
- Aliases map[string]string
+type config struct {
+ include map[string]bool
+ includeList []string
+ aliases map[string]string
}
-func DefaultConfig() *Config {
- config := Config{
- Include: map[string]bool{},
- Aliases: map[string]string{},
+func DefaultConfig() *config {
+ config := config{
+ include: map[string]bool{},
+ includeList: []string{},
+ aliases: map[string]string{},
}
return &config
}
-func (cfg *Config) validate() error {
+func (cfg *config) validate() error {
var errs []error
- for key := range cfg.Aliases {
- if _, ok := cfg.Include[key]; !ok && len(cfg.Include) > 0 {
+ for key := range cfg.aliases {
+ if _, ok := cfg.include[key]; !ok && len(cfg.include) > 0 {
errs = append(errs, &ConfigScanAliasIsNotIncludedError{alias: key})
}
}
diff --git a/pkg/silo/driver.go b/pkg/silo/driver.go
index 99718b2..4347829 100644
--- a/pkg/silo/driver.go
+++ b/pkg/silo/driver.go
@@ -26,7 +26,7 @@ import (
)
type Driver struct {
- *Config
+ *config
backend Backend
writer DumpWriter
}
@@ -52,7 +52,7 @@ func NewDriver(backend Backend, writer DumpWriter, options ...Option) *Driver {
return &Driver{
backend: backend,
writer: writer,
- Config: config,
+ config: config,
}
}
@@ -71,7 +71,7 @@ func (d *Driver) Dump() error {
break
}
- entity := NewEntity(entryNode)
+ entity := NewEntity(d.config.includeList, d.writer, entryNode)
if err := d.writer.Write(entryNode, entity.UUID()); err != nil {
return fmt.Errorf("%w", err)
@@ -167,8 +167,8 @@ func (d *Driver) scan(datarow DataRow) ([]DataNode, []DataLink) {
links := []DataLink{}
for key, value := range datarow {
- if _, included := d.Config.Include[key]; value != nil && (included || len(d.Config.Include) == 0) {
- if alias, exist := d.Config.Aliases[key]; exist {
+ if _, included := d.config.include[key]; value != nil && (included || len(d.config.include) == 0) {
+ if alias, exist := d.config.aliases[key]; exist {
key = alias
}
diff --git a/pkg/silo/model_entity.go b/pkg/silo/model_entity.go
index ec059c2..0bb77f9 100644
--- a/pkg/silo/model_entity.go
+++ b/pkg/silo/model_entity.go
@@ -31,16 +31,20 @@ const (
)
type Entity struct {
- nodes map[DataNode]int
- counts map[string]int
- uuid string
+ include []string
+ nodes map[DataNode]int
+ counts map[string]int
+ uuid string
+ writer DumpWriter
}
-func NewEntity(nodes ...DataNode) Entity {
+func NewEntity(include []string, writer DumpWriter, nodes ...DataNode) Entity {
entity := Entity{
- nodes: make(map[DataNode]int, defaultEntitySize),
- counts: make(map[string]int, defaultEntitySize),
- uuid: uuid.NewString(),
+ include: include,
+ nodes: make(map[DataNode]int, defaultEntitySize),
+ counts: make(map[string]int, defaultEntitySize),
+ uuid: uuid.NewString(),
+ writer: writer,
}
for _, node := range nodes {
entity.Append(node)
diff --git a/pkg/silo/options.go b/pkg/silo/options.go
index ed1c7b7..a16364d 100644
--- a/pkg/silo/options.go
+++ b/pkg/silo/options.go
@@ -21,19 +21,19 @@ type Option interface {
applier
}
-type option func(*Config) error
+type option func(*config) error
-func (f option) apply(cfg *Config) error {
+func (f option) apply(cfg *config) error {
return f(cfg)
}
type applier interface {
- apply(cfg *Config) error
+ apply(cfg *config) error
}
func Alias(key, alias string) Option { //nolint:ireturn
- applier := func(cfg *Config) error {
- cfg.Aliases[key] = alias
+ applier := func(cfg *config) error {
+ cfg.aliases[key] = alias
return nil
}
@@ -42,8 +42,12 @@ func Alias(key, alias string) Option { //nolint:ireturn
}
func Include(key string) Option { //nolint:ireturn
- applier := func(cfg *Config) error {
- cfg.Include[key] = true
+ applier := func(cfg *config) error {
+ if _, exist := cfg.include[key]; !exist {
+ cfg.includeList = append(cfg.includeList, key)
+ }
+
+ cfg.include[key] = true
return nil
}
@@ -52,9 +56,9 @@ func Include(key string) Option { //nolint:ireturn
}
func WithAliases(aliases map[string]string) Option { //nolint:ireturn
- applier := func(cfg *Config) error {
+ applier := func(cfg *config) error {
for key, alias := range aliases {
- cfg.Aliases[key] = alias
+ cfg.aliases[key] = alias
}
return nil
@@ -64,9 +68,13 @@ func WithAliases(aliases map[string]string) Option { //nolint:ireturn
}
func WithKeys(keys []string) Option { //nolint:ireturn
- applier := func(cfg *Config) error {
+ applier := func(cfg *config) error {
for _, key := range keys {
- cfg.Include[key] = true
+ if _, exist := cfg.include[key]; !exist {
+ cfg.includeList = append(cfg.includeList, key)
+ }
+
+ cfg.include[key] = true
}
return nil
From 8e889402f622314633620bffc49283c8d582b98b Mon Sep 17 00:00:00 2001
From: Adrien Aury <44274230+adrienaury@users.noreply.github.com>
Date: Wed, 27 Mar 2024 11:04:40 +0000
Subject: [PATCH 06/10] feat(dump): wip! add include flag
---
internal/app/cli/dump.go | 12 +++++++++---
internal/app/cli/scan.go | 4 +++-
2 files changed, 12 insertions(+), 4 deletions(-)
diff --git a/internal/app/cli/dump.go b/internal/app/cli/dump.go
index 3deeea3..65377a1 100644
--- a/internal/app/cli/dump.go
+++ b/internal/app/cli/dump.go
@@ -28,18 +28,24 @@ import (
)
func NewDumpCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.File) *cobra.Command {
+ var include []string
+
cmd := &cobra.Command{ //nolint:exhaustruct
Use: "dump path",
Short: "Dump silo database stored in given path into stdout",
Example: " " + parent + " dump clients",
Args: cobra.ExactArgs(1),
Run: func(_ *cobra.Command, args []string) {
- if err := dump(args[0]); err != nil {
+ if err := dump(args[0], include); err != nil {
log.Fatal().Err(err).Int("return", 1).Msg("end SILO")
}
},
}
+ cmd.Flags().StringSliceVarP(&include, "include", "i", []string{}, "include only these columns, exclude all others")
+
+ cmd.Flags().SortFlags = false
+
cmd.SetOut(stdout)
cmd.SetErr(stderr)
cmd.SetIn(stdin)
@@ -47,7 +53,7 @@ func NewDumpCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F
return cmd
}
-func dump(path string) error {
+func dump(path string, include []string) error {
backend, err := infra.NewBackend(path)
if err != nil {
return fmt.Errorf("%w", err)
@@ -55,7 +61,7 @@ func dump(path string) error {
defer backend.Close()
- driver := silo.NewDriver(backend, infra.NewDumpJSONLine())
+ driver := silo.NewDriver(backend, infra.NewDumpJSONLine(), silo.WithKeys(include))
if err := driver.Dump(); err != nil {
return fmt.Errorf("%w", err)
diff --git a/internal/app/cli/scan.go b/internal/app/cli/scan.go
index 121b914..445bd82 100644
--- a/internal/app/cli/scan.go
+++ b/internal/app/cli/scan.go
@@ -37,7 +37,7 @@ func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F
cmd := &cobra.Command{ //nolint:exhaustruct
Use: "scan path",
Short: "Ingest data from stdin and update silo database stored in given path",
- Example: " lino pull database --table client | " + parent + " scan clients",
+ Example: " " + parent + " scan clients < clients.jsonl",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
if err := scan(cmd, args[0], passthrough, include, aliases); err != nil {
@@ -50,6 +50,8 @@ func NewScanCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F
cmd.Flags().StringSliceVarP(&include, "include", "i", []string{}, "include only these columns, exclude all others")
cmd.Flags().StringToStringVarP(&aliases, "alias", "a", map[string]string{}, "use given aliases for each columns")
+ cmd.Flags().SortFlags = false
+
cmd.SetOut(stdout)
cmd.SetErr(stderr)
cmd.SetIn(stdin)
From 2c9534523eb821d28df84e9c49b9b2dd43a34e2a Mon Sep 17 00:00:00 2001
From: Adrien Aury <44274230+adrienaury@users.noreply.github.com>
Date: Wed, 27 Mar 2024 11:34:22 +0000
Subject: [PATCH 07/10] feat(dump): include flag + detect complete entity
---
pkg/silo/driver.go | 16 +++++++++++++---
pkg/silo/model_entity.go | 41 +++++++++++++++++++++++++---------------
2 files changed, 39 insertions(+), 18 deletions(-)
diff --git a/pkg/silo/driver.go b/pkg/silo/driver.go
index 4347829..8312947 100644
--- a/pkg/silo/driver.go
+++ b/pkg/silo/driver.go
@@ -71,9 +71,9 @@ func (d *Driver) Dump() error {
break
}
- entity := NewEntity(d.config.includeList, d.writer, entryNode)
+ entity := NewEntity(d.config.includeList, entryNode)
- if err := d.writer.Write(entryNode, entity.UUID()); err != nil {
+ if err := d.write(entryNode, entity.UUID()); err != nil {
return fmt.Errorf("%w", err)
}
@@ -95,7 +95,7 @@ func (d *Driver) dump(snapshot Snapshot, node DataNode, entity Entity) error {
for _, connectedNode := range connectedNodes {
if entity.Append(connectedNode) {
- if err := d.writer.Write(connectedNode, entity.UUID()); err != nil {
+ if err := d.write(connectedNode, entity.UUID()); err != nil {
return fmt.Errorf("%w", err)
}
@@ -108,6 +108,16 @@ func (d *Driver) dump(snapshot Snapshot, node DataNode, entity Entity) error {
return nil
}
+func (d *Driver) write(node DataNode, uuid string) error {
+ if _, included := d.config.include[node.Key]; included || len(d.config.include) == 0 {
+ if err := d.writer.Write(node, uuid); err != nil {
+ return fmt.Errorf("%w", err)
+ }
+ }
+
+ return nil
+}
+
func (d *Driver) Scan(input DataRowReader, observers ...ScanObserver) error {
defer input.Close()
diff --git a/pkg/silo/model_entity.go b/pkg/silo/model_entity.go
index 0bb77f9..299f01c 100644
--- a/pkg/silo/model_entity.go
+++ b/pkg/silo/model_entity.go
@@ -25,9 +25,10 @@ import (
const defaultEntitySize = 10
const (
- statusEntityOK = "consistent"
- statusEntityPartial = "partial"
+ statusEntityComplete = "complete"
+ statusEntityConsistent = "consistent"
statusEntityInconsistent = "inconsistent"
+ statusEntityEmpty = "empty"
)
type Entity struct {
@@ -35,16 +36,14 @@ type Entity struct {
nodes map[DataNode]int
counts map[string]int
uuid string
- writer DumpWriter
}
-func NewEntity(include []string, writer DumpWriter, nodes ...DataNode) Entity {
+func NewEntity(include []string, nodes ...DataNode) Entity {
entity := Entity{
include: include,
nodes: make(map[DataNode]int, defaultEntitySize),
counts: make(map[string]int, defaultEntitySize),
uuid: uuid.NewString(),
- writer: writer,
}
for _, node := range nodes {
entity.Append(node)
@@ -75,26 +74,38 @@ func (s Entity) UUID() string {
return s.uuid
}
-//nolint:zerologlint
func (s Entity) Finalize() {
- msg := log.Info().Str("status", statusEntityOK)
+ msg := log.Info().Str("status", statusEntityConsistent)
- for _, count := range s.counts {
+ counts := s.counts
+
+ if len(s.include) > 0 {
+ counts = make(map[string]int, len(s.include))
+ for _, key := range s.include {
+ if s.counts[key] > 0 {
+ counts[key] = s.counts[key]
+ }
+ }
+ }
+
+ if len(counts) == len(s.include) && len(s.include) > 0 {
+ msg.Str("status", statusEntityComplete)
+ } else if len(counts) == 0 {
+ msg.Str("status", statusEntityEmpty)
+ }
+
+ for _, count := range counts {
if count > 1 {
msg = log.Warn().Str("status", statusEntityInconsistent)
break
}
-
- if count == 0 {
- msg = log.Warn().Str("status", statusEntityPartial)
- }
}
- msg = msg.Str("uuid", s.UUID())
+ msg.Str("uuid", s.UUID())
- for id, count := range s.counts {
- msg.Int(id, count)
+ for id, count := range counts {
+ msg.Int("count-"+id, count)
}
msg.Msg("entity identified")
From 766db4c67d793bcdef8c6718e34dd3092b3f1bdc Mon Sep 17 00:00:00 2001
From: Adrien Aury <44274230+adrienaury@users.noreply.github.com>
Date: Wed, 27 Mar 2024 12:25:58 +0000
Subject: [PATCH 08/10] feat(dump): add dump observers
---
CHANGELOG.md | 2 +-
pkg/silo/config.go | 2 +-
pkg/silo/driven.go | 4 ++++
pkg/silo/driver.go | 12 +++++++++---
pkg/silo/model_entity.go | 26 +++++++++++++++++---------
5 files changed, 32 insertions(+), 14 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 132d82b..69ba4b2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,7 +16,7 @@ Types of changes
## [0.2.0]
-- `Added` flag `--include` (short `-i`) to only scan a specific list of fields, this flag is repeatable
+- `Added` flag `--include` (short `-i`) to only scan/dump a specific list of fields, this flag is repeatable
- `Added` flag `--alias` (short `-a`) to rename fields on the fly, this flag is repeatable
- `Fixed` self reference link are no longer counted in the links counter while scanning
diff --git a/pkg/silo/config.go b/pkg/silo/config.go
index 8258744..dfecb9a 100644
--- a/pkg/silo/config.go
+++ b/pkg/silo/config.go
@@ -25,7 +25,7 @@ type config struct {
aliases map[string]string
}
-func DefaultConfig() *config {
+func newConfig() *config {
config := config{
include: map[string]bool{},
includeList: []string{},
diff --git a/pkg/silo/driven.go b/pkg/silo/driven.go
index 475753d..7a6b2e2 100644
--- a/pkg/silo/driven.go
+++ b/pkg/silo/driven.go
@@ -43,3 +43,7 @@ type ScanObserver interface {
IngestedRow(row DataRow)
IngestedLink(link DataLink)
}
+
+type DumpObserver interface {
+ Entity(status Status, counts map[string]int)
+}
diff --git a/pkg/silo/driver.go b/pkg/silo/driver.go
index 8312947..1b1d099 100644
--- a/pkg/silo/driver.go
+++ b/pkg/silo/driver.go
@@ -33,7 +33,7 @@ type Driver struct {
func NewDriver(backend Backend, writer DumpWriter, options ...Option) *Driver {
errs := []error{}
- config := DefaultConfig()
+ config := newConfig()
for _, option := range options {
if err := option.apply(config); err != nil {
@@ -56,7 +56,7 @@ func NewDriver(backend Backend, writer DumpWriter, options ...Option) *Driver {
}
}
-func (d *Driver) Dump() error {
+func (d *Driver) Dump(observers ...DumpObserver) error {
snapshot := d.backend.Snapshot()
defer snapshot.Close()
@@ -81,7 +81,13 @@ func (d *Driver) Dump() error {
return fmt.Errorf("%w", err)
}
- entity.Finalize()
+ status, counts := entity.Finalize()
+
+ for _, observer := range observers {
+ if observer != nil {
+ observer.Entity(status, counts)
+ }
+ }
}
return nil
diff --git a/pkg/silo/model_entity.go b/pkg/silo/model_entity.go
index 299f01c..1bf4d3c 100644
--- a/pkg/silo/model_entity.go
+++ b/pkg/silo/model_entity.go
@@ -24,11 +24,13 @@ import (
const defaultEntitySize = 10
+type Status string
+
const (
- statusEntityComplete = "complete"
- statusEntityConsistent = "consistent"
- statusEntityInconsistent = "inconsistent"
- statusEntityEmpty = "empty"
+ StatusEntityComplete Status = "complete"
+ StatusEntityConsistent Status = "consistent"
+ StatusEntityInconsistent Status = "inconsistent"
+ StatusEntityEmpty Status = "empty"
)
type Entity struct {
@@ -74,9 +76,10 @@ func (s Entity) UUID() string {
return s.uuid
}
-func (s Entity) Finalize() {
- msg := log.Info().Str("status", statusEntityConsistent)
+func (s Entity) Finalize() (Status, map[string]int) {
+ msg := log.Info().Str("status", string(StatusEntityConsistent))
+ status := StatusEntityConsistent
counts := s.counts
if len(s.include) > 0 {
@@ -89,14 +92,17 @@ func (s Entity) Finalize() {
}
if len(counts) == len(s.include) && len(s.include) > 0 {
- msg.Str("status", statusEntityComplete)
+ msg.Str("status", string(StatusEntityComplete))
+ status = StatusEntityComplete
} else if len(counts) == 0 {
- msg.Str("status", statusEntityEmpty)
+ msg.Str("status", string(StatusEntityEmpty))
+ status = StatusEntityEmpty
}
for _, count := range counts {
if count > 1 {
- msg = log.Warn().Str("status", statusEntityInconsistent)
+ msg = log.Warn().Str("status", string(StatusEntityInconsistent))
+ status = StatusEntityInconsistent
break
}
@@ -109,4 +115,6 @@ func (s Entity) Finalize() {
}
msg.Msg("entity identified")
+
+ return status, counts
}
From 36190612da68aa348dccc366bac5a3e62908d56a Mon Sep 17 00:00:00 2001
From: Adrien Aury <44274230+adrienaury@users.noreply.github.com>
Date: Wed, 27 Mar 2024 12:43:41 +0000
Subject: [PATCH 09/10] feat(dump): add watch flag
---
CHANGELOG.md | 1 +
internal/app/cli/dump.go | 19 +++++--
internal/infra/dump_observer.go | 87 +++++++++++++++++++++++++++++++++
3 files changed, 103 insertions(+), 4 deletions(-)
create mode 100644 internal/infra/dump_observer.go
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 69ba4b2..15e949e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -18,6 +18,7 @@ Types of changes
- `Added` flag `--include` (short `-i`) to only scan/dump a specific list of fields, this flag is repeatable
- `Added` flag `--alias` (short `-a`) to rename fields on the fly, this flag is repeatable
+- `Added` flag `--watch` (short `-w`) to the dump command
- `Fixed` self reference link are no longer counted in the links counter while scanning
## [0.1.0]
diff --git a/internal/app/cli/dump.go b/internal/app/cli/dump.go
index 65377a1..228b304 100644
--- a/internal/app/cli/dump.go
+++ b/internal/app/cli/dump.go
@@ -28,7 +28,10 @@ import (
)
func NewDumpCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.File) *cobra.Command {
- var include []string
+ var (
+ include []string
+ watch bool
+ )
cmd := &cobra.Command{ //nolint:exhaustruct
Use: "dump path",
@@ -36,13 +39,14 @@ func NewDumpCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F
Example: " " + parent + " dump clients",
Args: cobra.ExactArgs(1),
Run: func(_ *cobra.Command, args []string) {
- if err := dump(args[0], include); err != nil {
+ if err := dump(args[0], include, watch); err != nil {
log.Fatal().Err(err).Int("return", 1).Msg("end SILO")
}
},
}
cmd.Flags().StringSliceVarP(&include, "include", "i", []string{}, "include only these columns, exclude all others")
+ cmd.Flags().BoolVarP(&watch, "watch", "w", false, "watch statistics about dumped entities in stderr")
cmd.Flags().SortFlags = false
@@ -53,7 +57,7 @@ func NewDumpCommand(parent string, stderr *os.File, stdout *os.File, stdin *os.F
return cmd
}
-func dump(path string, include []string) error {
+func dump(path string, include []string, watch bool) error {
backend, err := infra.NewBackend(path)
if err != nil {
return fmt.Errorf("%w", err)
@@ -63,7 +67,14 @@ func dump(path string, include []string) error {
driver := silo.NewDriver(backend, infra.NewDumpJSONLine(), silo.WithKeys(include))
- if err := driver.Dump(); err != nil {
+ if watch {
+ observer := infra.NewDumpObserver()
+ defer observer.Close()
+
+ if err := driver.Dump(observer); err != nil {
+ return fmt.Errorf("%w", err)
+ }
+ } else if err := driver.Dump(); err != nil {
return fmt.Errorf("%w", err)
}
diff --git a/internal/infra/dump_observer.go b/internal/infra/dump_observer.go
new file mode 100644
index 0000000..7265094
--- /dev/null
+++ b/internal/infra/dump_observer.go
@@ -0,0 +1,87 @@
+// Copyright (C) 2024 CGI France
+//
+// This file is part of SILO.
+//
+// SILO is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// SILO is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with SILO. If not, see .
+
+package infra
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/cgi-fr/silo/pkg/silo"
+ "github.com/schollz/progressbar/v3"
+)
+
+type DumpObserver struct {
+ countTotal int
+ countComplete int
+ countConsistent int
+ countInconsistent int
+ countEmpty int
+ bar *progressbar.ProgressBar
+}
+
+func NewDumpObserver() *DumpObserver {
+ //nolint:gomnd
+ pgb := progressbar.NewOptions(-1,
+ progressbar.OptionSetDescription("Dumping ... "),
+ progressbar.OptionSetItsString("entity"),
+ progressbar.OptionSetWriter(os.Stderr),
+ progressbar.OptionShowIts(),
+ progressbar.OptionSpinnerType(11),
+ progressbar.OptionThrottle(time.Millisecond*10),
+ progressbar.OptionOnCompletion(func() { fmt.Fprintln(os.Stderr) }),
+ // progressbar.OptionShowDescriptionAtLineEnd(),
+ )
+
+ return &DumpObserver{
+ countTotal: 0,
+ countComplete: 0,
+ countConsistent: 0,
+ countInconsistent: 0,
+ countEmpty: 0,
+ bar: pgb,
+ }
+}
+
+func (o *DumpObserver) Entity(status silo.Status, _ map[string]int) {
+ o.countTotal++
+
+ switch status {
+ case silo.StatusEntityComplete:
+ o.countComplete++
+ case silo.StatusEntityConsistent:
+ o.countConsistent++
+ case silo.StatusEntityInconsistent:
+ o.countInconsistent++
+ case silo.StatusEntityEmpty:
+ o.countEmpty++
+ }
+
+ _ = o.bar.Add(1)
+
+ o.bar.Describe(fmt.Sprintf("Dumped %d entities / complete=%d / incomplete=%d / inconsistent=%d / empty=%d",
+ o.countTotal,
+ o.countComplete,
+ o.countConsistent,
+ o.countInconsistent,
+ o.countEmpty))
+}
+
+func (o *DumpObserver) Close() {
+ _ = o.bar.Close()
+}
From 49c76e014a19f595ec666b81794174151be77239 Mon Sep 17 00:00:00 2001
From: Adrien Aury <44274230+adrienaury@users.noreply.github.com>
Date: Wed, 27 Mar 2024 12:49:53 +0000
Subject: [PATCH 10/10] feat(dump): rename incomplete to consistent
---
internal/infra/dump_observer.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/internal/infra/dump_observer.go b/internal/infra/dump_observer.go
index 7265094..c4d3b0a 100644
--- a/internal/infra/dump_observer.go
+++ b/internal/infra/dump_observer.go
@@ -74,7 +74,7 @@ func (o *DumpObserver) Entity(status silo.Status, _ map[string]int) {
_ = o.bar.Add(1)
- o.bar.Describe(fmt.Sprintf("Dumped %d entities / complete=%d / incomplete=%d / inconsistent=%d / empty=%d",
+ o.bar.Describe(fmt.Sprintf("Dumped %d entities / complete=%d / consistent=%d / inconsistent=%d / empty=%d",
o.countTotal,
o.countComplete,
o.countConsistent,