Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PDOK-17304] add ETLFilter to Collection.Search #10

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ type Search struct {
// One or more optional templates that make up the autosuggestions. Uses Go text/template syntax to reference fields.
SuggestTemplates []string `yaml:"suggestTemplates,omitempty" json:"suggestTemplates,omitempty"`

// SQLite WHERE clause to filter features when importing/ETL-ing
// (Without the WHERE keyword, only the clause)
// +Optional
ETLFilter string `yaml:"etlFilter,omitempty" json:"etlFilter,omitempty"`
roelarents marked this conversation as resolved.
Show resolved Hide resolved

// Version of the collection used to link to search results
Version int `yaml:"version,omitempty" json:"version,omitempty" default:"1"`

Expand Down
4 changes: 2 additions & 2 deletions internal/etl/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type Extract interface {

// Extract raw records from source database to be transformed and loaded into target search index
Extract(table config.FeatureTable, fields []string, limit int, offset int) ([]t.RawRecord, error)
Extract(table config.FeatureTable, fields []string, where string, limit int, offset int) ([]t.RawRecord, error)

// Close connection to source database
Close()
Expand Down Expand Up @@ -81,7 +81,7 @@ func ImportFile(cfg *config.Config, searchIndex string, filePath string, table c
// import records in batches depending on page size
offset := 0
for {
sourceRecords, err := source.Extract(table, collection.Search.Fields, pageSize, offset)
sourceRecords, err := source.Extract(table, collection.Search.Fields, collection.Search.ETLFilter, pageSize, offset)
if err != nil {
return fmt.Errorf("failed extracting source records: %w", err)
}
Expand Down
85 changes: 54 additions & 31 deletions internal/etl/etl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,39 +51,62 @@ func TestImportGeoPackage(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
ctx := context.Background()

// given
dbPort, postgisContainer, err := setupPostgis(ctx, t)
if err != nil {
t.Error(err)
tests := []struct {
name string
where string
count int
}{
{
name: "import everything",
where: "",
count: 33030 * 2, // * 2 because 2 suggest templates
},
{
name: "with where clause",
where: "fid <= 2",
count: 2 * 2, // * 2 because 2 suggest templates
},
}
defer terminateContainer(ctx, t, postgisContainer)

dbConn := fmt.Sprintf("postgres://postgres:[email protected]:%d/%s?sslmode=disable", dbPort.Int(), "test_db")

cfg, err := config.NewConfig(pwd + "/testdata/config.yaml")
if err != nil {
t.Error(err)
for _, tt := range tests {
ctx := context.Background()

// given
dbPort, postgisContainer, err := setupPostgis(ctx, t)
if err != nil {
t.Error(err)
}
defer terminateContainer(ctx, t, postgisContainer)

dbConn := fmt.Sprintf("postgres://postgres:[email protected]:%d/%s?sslmode=disable", dbPort.Int(), "test_db")

cfg, err := config.NewConfig(pwd + "/testdata/config.yaml")
if err != nil {
t.Error(err)
}
assert.NotNil(t, cfg)
for _, collection := range cfg.Collections {
if collection.Search != nil {
collection.Search.ETLFilter = tt.where
}
}

// when/then
err = CreateSearchIndex(dbConn, "search_index")
assert.NoError(t, err)

table := config.FeatureTable{Name: "addresses", FID: "fid", Geom: "geom"}
err = ImportFile(cfg, "search_index", pwd+"/testdata/addresses-crs84.gpkg", table, 1000, dbConn)
assert.NoError(t, err)

// check nr of records
db, err := pgx.Connect(ctx, dbConn)
assert.NoError(t, err)
var count int
err = db.QueryRow(ctx, "select count(*) from search_index").Scan(&count)
defer db.Close(ctx)
assert.NoError(t, err)
assert.Equal(t, tt.count, count)
}
assert.NotNil(t, cfg)

// when/then
err = CreateSearchIndex(dbConn, "search_index")
assert.NoError(t, err)

table := config.FeatureTable{Name: "addresses", FID: "fid", Geom: "geom"}
err = ImportFile(cfg, "search_index", pwd+"/testdata/addresses-crs84.gpkg", table, 1000, dbConn)
assert.NoError(t, err)

// check nr of records
db, err := pgx.Connect(ctx, dbConn)
assert.NoError(t, err)
var count int
err = db.QueryRow(ctx, "select count(*) from search_index").Scan(&count)
defer db.Close(ctx)
assert.NoError(t, err)
assert.Equal(t, 33030*2, count)
}

func setupPostgis(ctx context.Context, t *testing.T) (nat.Port, testcontainers.Container, error) {
Expand Down
8 changes: 6 additions & 2 deletions internal/etl/extract/geopackage.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ func (g *GeoPackage) Close() {
_ = g.db.Close()
}

func (g *GeoPackage) Extract(table config.FeatureTable, fields []string, limit int, offset int) ([]t.RawRecord, error) {
func (g *GeoPackage) Extract(table config.FeatureTable, fields []string, where string, limit int, offset int) ([]t.RawRecord, error) {
if len(fields) == 0 {
return nil, errors.New("no fields provided to read from GeoPackage")
}
if where != "" {
where = "where " + where
}

// TODO we might need WGS84 transformation here of bbox
query := fmt.Sprintf(`
Expand All @@ -71,8 +74,9 @@ func (g *GeoPackage) Extract(table config.FeatureTable, fields []string, limit i
st_geometrytype(castautomagic(%[4]s)) as geom_type,
%[1]s -- all feature specific fields
from %[2]s
%[5]s
limit :limit
offset :offset`, strings.Join(fields, ","), table.Name, table.FID, table.Geom)
offset :offset`, strings.Join(fields, ","), table.Name, table.FID, table.Geom, where)

rows, err := g.db.NamedQuery(query, map[string]any{"limit": limit, "offset": offset})
if err != nil {
Expand Down
Loading