Skip to content

Commit

Permalink
feat: add search endpoint - connect to postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
rkettelerij committed Dec 4, 2024
1 parent 84612ff commit 668face
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 13 deletions.
11 changes: 9 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ func main() {
commonDBFlags[dbUsernameFlag],
commonDBFlags[dbPasswordFlag],
commonDBFlags[dbSslModeFlag],
&cli.PathFlag{
Name: searchIndexFlag,
EnvVars: []string{strcase.ToScreamingSnake(searchIndexFlag)},
Usage: "Name of search index to use",
Required: true,
Value: "search_index",
},
},
Action: func(c *cli.Context) error {
log.Println(c.Command.Usage)
Expand All @@ -173,8 +180,8 @@ func main() {
}
// Each OGC API building block makes use of said Engine
ogc.SetupBuildingBlocks(engine, dbConn)
// Start search logic
search.NewSearch(engine)
// Create search endpoint
search.NewSearch(engine, dbConn, c.String(searchIndexFlag))

return engine.Start(address, debugPort, shutdownDelay)
},
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/go-playground/validator/v10 v10.22.1
github.com/go-spatial/geom v0.1.0
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572
github.com/goccy/go-json v0.10.3
github.com/gomarkdown/markdown v0.0.0-20240930133441-72d49d9543d8
github.com/iancoleman/strcase v0.3.0
github.com/jackc/pgx/v5 v5.7.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEe
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM=
github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/gomarkdown/markdown v0.0.0-20240930133441-72d49d9543d8 h1:4txT5G2kqVAKMjzidIabL/8KqjIK71yj30YOeuxLn10=
Expand Down
2 changes: 1 addition & 1 deletion internal/etl/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func newSourceToExtract(filePath string) (Extract, error) {

func newTargetToLoad(dbConn string) (Load, error) {
if strings.HasPrefix(dbConn, "postgres:") {
return load.NewPostgis(dbConn)
return load.NewPostgres(dbConn)
}
// add new targets here (elasticsearch, solr, etc)
return nil, fmt.Errorf("unsupported target database connection: %s", dbConn)
Expand Down
12 changes: 6 additions & 6 deletions internal/etl/load/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
pgxgeom "github.com/twpayne/pgx-geom"
)

type Postgis struct {
type Postgres struct {
db *pgx.Conn
ctx context.Context
}

func NewPostgis(dbConn string) (*Postgis, error) {
func NewPostgres(dbConn string) (*Postgres, error) {
ctx := context.Background()
db, err := pgx.Connect(ctx, dbConn)
if err != nil {
Expand All @@ -24,14 +24,14 @@ func NewPostgis(dbConn string) (*Postgis, error) {
if err := pgxgeom.Register(ctx, db); err != nil {
return nil, err
}
return &Postgis{db: db, ctx: ctx}, nil
return &Postgres{db: db, ctx: ctx}, nil
}

func (p *Postgis) Close() {
func (p *Postgres) Close() {
_ = p.db.Close(p.ctx)
}

func (p *Postgis) Load(records []t.SearchIndexRecord, index string) (int64, error) {
func (p *Postgres) Load(records []t.SearchIndexRecord, index string) (int64, error) {
loaded, err := p.db.CopyFrom(
p.ctx,
pgx.Identifier{index},
Expand All @@ -48,7 +48,7 @@ func (p *Postgis) Load(records []t.SearchIndexRecord, index string) (int64, erro
}

// Init initialize search index
func (p *Postgis) Init(index string) error {
func (p *Postgres) Init(index string) error {
geometryType := `create type geometry_type as enum ('POINT', 'MULTIPOINT', 'LINESTRING', 'MULTILINESTRING', 'POLYGON', 'MULTIPOLYGON');`
_, err := p.db.Exec(p.ctx, geometryType)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions internal/search/datasources/datasource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package datasources

import (
"context"
)

// Datasource knows how make different kinds of queries/actions on the underlying actual datastore.
// This abstraction allows the rest of the system to stay datastore agnostic.
type Datasource interface {
Suggest(ctx context.Context, suggestForThis string) ([]string, error)

// Close closes (connections to) the datasource gracefully
Close()
}
93 changes: 93 additions & 0 deletions internal/search/datasources/postgres/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package postgres

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
pgxgeom "github.com/twpayne/pgx-geom"

"strings"
"time"
)

type Postgres struct {
db *pgx.Conn
ctx context.Context

queryTimeout time.Duration
searchIndex string
}

func NewPostgres(dbConn string, queryTimeout time.Duration, searchIndex string) (*Postgres, error) {
ctx := context.Background()
db, err := pgx.Connect(ctx, dbConn)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
// add support for Go <-> PostGIS conversions
if err := pgxgeom.Register(ctx, db); err != nil {
return nil, err
}
return &Postgres{db, ctx, queryTimeout, searchIndex}, nil
}

func (p *Postgres) Close() {
_ = p.db.Close(p.ctx)
}

func (p *Postgres) Suggest(ctx context.Context, suggestForThis string) ([]string, error) {
queryCtx, cancel := context.WithTimeout(ctx, p.queryTimeout)
defer cancel()

// Prepare dynamic full-text search query
// Split terms by spaces and append :* to each term
terms := strings.Fields(suggestForThis)
for i, term := range terms {
terms[i] = term + ":*"
}
searchTerm := strings.Join(terms, " & ")

sqlQuery := fmt.Sprintf(
`SELECT
r.display_name AS display_name,
max(r.rank) AS rank,
max(r.highlighted_text) AS highlighted_text
FROM (
SELECT display_name,
ts_rank_cd(ts, to_tsquery('%[1]s'), 1) AS rank,
ts_headline('dutch', suggest, to_tsquery('%[2]s')) AS highlighted_text
FROM
%[3]s
WHERE ts @@ to_tsquery('%[4]s') LIMIT 500
) r
GROUP BY display_name
ORDER BY rank DESC, display_name ASC LIMIT 50`,
searchTerm, searchTerm, p.searchIndex, searchTerm)

// Execute query
rows, err := p.db.Query(queryCtx, sqlQuery)
if err != nil {
return nil, fmt.Errorf("query '%s' failed: %w", sqlQuery, err)
}
defer rows.Close()

if queryCtx.Err() != nil {
return nil, queryCtx.Err()
}

var suggestions []string
for rows.Next() {
var displayName, highlightedText string
var rank float64

// Scan all selected columns
if err := rows.Scan(&displayName, &rank, &highlightedText); err != nil {
return nil, err
}

suggestions = append(suggestions, highlightedText) // or displayName, whichever you want to return
}

return suggestions, nil
}
51 changes: 51 additions & 0 deletions internal/search/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package search

import (
stdjson "encoding/json"
"io"
"log"
"net/http"
"os"
"strconv"

"github.com/PDOK/gomagpie/internal/engine"
perfjson "github.com/goccy/go-json"
)

var (
disableJSONPerfOptimization, _ = strconv.ParseBool(os.Getenv("DISABLE_JSON_PERF_OPTIMIZATION"))
)

// serveJSON serves JSON *WITHOUT* OpenAPI validation by writing directly to the response output stream
func serveJSON(input any, contentType string, w http.ResponseWriter) {
w.Header().Set(engine.HeaderContentType, contentType)

if err := getEncoder(w).Encode(input); err != nil {
handleJSONEncodingFailure(err, w)
return
}
}

type jsonEncoder interface {
Encode(input any) error
}

// Create JSONEncoder. Note escaping of '<', '>' and '&' is disabled (HTMLEscape is false).
// Especially the '&' is important since we use this character in the next/prev links.
func getEncoder(w io.Writer) jsonEncoder {
if disableJSONPerfOptimization {
// use Go stdlib JSON encoder
encoder := stdjson.NewEncoder(w)
encoder.SetEscapeHTML(false)
return encoder
}
// use ~7% overall faster 3rd party JSON encoder (in case of issues switch back to stdlib using env variable)
encoder := perfjson.NewEncoder(w)
encoder.SetEscapeHTML(false)
return encoder
}

func handleJSONEncodingFailure(err error, w http.ResponseWriter) {
log.Printf("JSON encoding failed: %v", err)
engine.RenderProblem(engine.ProblemServerError, w, "Failed to write JSON response")
}
33 changes: 29 additions & 4 deletions internal/search/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@ import (
"net/http"
"net/url"
"strings"
"time"

"github.com/PDOK/gomagpie/internal/engine"
ds "github.com/PDOK/gomagpie/internal/search/datasources"
"github.com/PDOK/gomagpie/internal/search/datasources/postgres"
)

const timeout = time.Second * 15

type Search struct {
engine *engine.Engine
engine *engine.Engine
datasource ds.Datasource
}

func NewSearch(e *engine.Engine) *Search {
func NewSearch(e *engine.Engine, dbConn string, searchIndex string) *Search {
s := &Search{
engine: e,
engine: e,
datasource: newDatasource(e, dbConn, searchIndex),
}
e.Router.Get("/search/suggest", s.Suggest())
return s
Expand All @@ -36,8 +43,17 @@ func (s *Search) Suggest() http.HandlerFunc {
delete(params, "f")
crs := params["crs"]
delete(params, "crs")
limit := params["limit"]
delete(params, "limit")

log.Printf("crs %s, limit %d, format %s, query %s, params %v", crs, limit, format, searchQuery, params)

log.Printf("crs %s, format %s, query %s, params %v", crs, format, searchQuery, params)
suggestions, err := s.datasource.Suggest(r.Context(), r.URL.Query().Get("q"))
if err != nil {
engine.RenderProblem(engine.ProblemServerError, w, err.Error())
return
}
serveJSON(suggestions, engine.MediaTypeGeoJSON, w)
}
}

Expand Down Expand Up @@ -66,3 +82,12 @@ func parseQueryParams(query url.Values) (map[string]any, error) {
}
return result, nil
}

func newDatasource(e *engine.Engine, dbConn string, searchIndex string) ds.Datasource {
datasource, err := postgres.NewPostgres(dbConn, timeout, searchIndex)
if err != nil {
log.Fatalf("failed to create datasource: %v", err)
}
e.RegisterShutdownHook(datasource.Close)
return datasource
}
Loading

0 comments on commit 668face

Please sign in to comment.