Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Added multi-shard search #8

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,15 @@ type Recommendation struct {
RecommendedVal int64
}

// BadInputError Special error indicating bad user input as opposed to a database error
type BadInputError struct {
Details error
}

func (bi BadInputError) Error() string {
return bi.Details.Error()
}

// DBType - database type
type DBType struct {
Driver DialectName // driver name (used in the code)
Expand Down
4 changes: 4 additions & 0 deletions db/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ func ParseScheme(s string) (scheme string, uri string, err error) {
return parts[0], parts[1], nil
}

func FormatTimeStamp(timestamp time.Time) string {
return fmt.Sprintf("%vns", timestamp.UTC().UnixNano())
}

// Cond represents a condition
type Cond struct {
Col string
Expand Down
64 changes: 64 additions & 0 deletions db/search/compare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package search

import (
"fmt"

"github.com/acronis/perfkit/db"
)

type comparator[T Searchable[T]] func(a, b T) bool
type comparators[T Searchable[T]] map[string]map[string]comparator[T]

func makeComparator[T Searchable[T]](values []string, comparable comparators[T]) (comparator[T], error) {
var less func(a, b T) bool
if len(values) == 0 {
return less, nil
}

var finalLess func(a, b T) bool

for i := len(values) - 1; i >= 0; i-- {
value := values[i]

fnc, field, err := db.ParseFunc(value)
if err != nil {
return nil, err
}

if fnc == "" {
return nil, fmt.Errorf("empty order function")
}

if field == "" {
return nil, fmt.Errorf("empty order field")
}

fieldComparators, ok := comparable[field]
if !ok {
return nil, fmt.Errorf("bad order field '%v'", field)
}

less, ok := fieldComparators[fnc]
if !ok {
return nil, fmt.Errorf("bad order function '%v'", fnc)
}

if finalLess == nil {
finalLess = less
} else {
var deepLess = finalLess

finalLess = func(a, b T) bool {
if less(a, b) {
return true
} else if less(b, a) {
return false
}

return deepLess(a, b)
}
}
}

return finalLess, nil
}
247 changes: 247 additions & 0 deletions db/search/cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
package search

import (
"fmt"

"github.com/acronis/perfkit/db"
)

// CursorIterable is an interface for entities that can be iterated by cursor
type CursorIterable[T any] interface {
Unique(field string) bool
Nullable(field string) bool
Cursor(field string) (string, error)
}

type sorting struct {
Field string
Func string
}

func uniqueSort[T CursorIterable[T]](encodedSorts []string, cursors map[string]string, instance T) ([]string, []sorting, error) {
var hasUniqueSorting = false
var uniqueOrderDirection int

var encoded []string
var sorts []sorting

for _, v := range encodedSorts {
var fnc, field, err = db.ParseFunc(v)
if err != nil {
return nil, nil, err
}

var unique = instance.Unique(field)
var nullable = instance.Nullable(field)
hasUniqueSorting = unique && !nullable

encoded = append(encoded, v)
sorts = append(sorts, sorting{
Field: field,
Func: fnc,
})

switch fnc {
case "asc":
uniqueOrderDirection++
case "desc":
uniqueOrderDirection--
}

if unique {
if !nullable {
break
} else if cursors != nil {
if val, ok := cursors[field]; ok && val != db.SpecialConditionIsNull {
if fnc != "desc" {
break
}
}
}
}
}

if !hasUniqueSorting {
if uniqueOrderDirection >= 0 {
encoded = append(encoded, "asc(id)")
sorts = append(sorts, sorting{Field: "id", Func: "asc"})
} else {
encoded = append(encoded, "desc(id)")
sorts = append(sorts, sorting{Field: "id", Func: "desc"})
}
}

return encoded, sorts, nil
}

func orderCondition(val, fnc string) (expr string, flag bool, err error) {
var direction string
switch fnc {
case "asc":
switch val {
case db.SpecialConditionIsNull:
return db.SpecialConditionIsNotNull, false, nil
case db.SpecialConditionIsNotNull:
return "", true, nil
default:
direction = "gt"
}
case "desc":
switch val {
case db.SpecialConditionIsNotNull:
return db.SpecialConditionIsNull, false, nil
case db.SpecialConditionIsNull:
return "", true, nil
default:
direction = "lt"
}
default:
return "", false, fmt.Errorf("missing ordering for cursor")
}

return fmt.Sprintf("%s(%v)", direction, val), false, nil
}

func splitQueryOnLightWeightQueries[T CursorIterable[T]](pt PageToken, instance T) ([]PageToken, error) {
var tokens []PageToken

if len(pt.Fields) == 0 {
tokens = append(tokens, pt)
return tokens, nil
}

// check for unique sorting
var encodedSorts, sorts, err = uniqueSort(pt.Order, pt.Cursor, instance)
if err != nil {
return nil, err
}

if len(pt.Cursor) == 0 {
pt.Order = encodedSorts
tokens = append(tokens, pt)
return tokens, nil
}

// construct sort map for fast access
var orderFunctions = map[string]string{}
for _, sort := range sorts {
orderFunctions[sort.Field] = sort.Func
}

// add condition based on cursor
var whereFromCursor = func(fld, val string, pt *PageToken) (bool, error) {
var filter, empty, filterErr = orderCondition(val, orderFunctions[fld])
if filterErr != nil {
return false, filterErr
}

if empty {
return true, nil
}

pt.Filter[fld] = append(pt.Filter[fld], filter)
return false, nil
}

for cursor := range pt.Cursor {
if _, ok := orderFunctions[cursor]; !ok {
return nil, fmt.Errorf("prohibited cursor, not mentioned it order: %v", cursor)
}
}

// split to x page tokens
for i := range sorts {
var cpt = pt
var last = len(sorts) - 1 - i

// copy filters
cpt.Filter = make(map[string][]string, len(sorts)-1-i)
for k, v := range pt.Filter {
cpt.Filter[k] = v
}

// add equal condition on all fields except last in sorts
for j := 0; j <= last-1; j++ {
var fld = sorts[j].Field
var val = pt.Cursor[fld]

cpt.Filter[fld] = append(cpt.Filter[fld], val)
}

// add gt / lt condition for last sorting
var empty bool
if val, ok := cpt.Cursor[sorts[last].Field]; ok {
if empty, err = whereFromCursor(sorts[last].Field, val, &cpt); err != nil {
return nil, err
}
} else {
continue
}

if empty {
continue
}

// Add only needed sort to cpt
cpt.Order = []string{}
for j := last; j <= len(sorts)-1; j++ {
cpt.Order = append(cpt.Order, encodedSorts[j])

var sortField = sorts[j].Field

if instance.Unique(sortField) {
if !instance.Nullable(sortField) {
break
}

var becomeUnique = false
// for ASC if we have a value, that means we already select all null rows
// for DESC Nulls can start at any row
if sorts[j].Func == "asc" {
for _, val := range cpt.Filter[sortField] {
if val != db.SpecialConditionIsNull {
becomeUnique = true
break
}
}
}
if becomeUnique {
break
}
}
}

cpt.Cursor = nil

tokens = append(tokens, cpt)
}

return tokens, nil
}

func createNextCursorBasedPageToken[T CursorIterable[T]](previousPageToken PageToken, items []T, limit int64, instance T) (*PageToken, error) {
if int64(len(items)) < limit {
return nil, nil
}

var pt PageToken
pt.Cursor = make(map[string]string)
pt.Fields = previousPageToken.Fields

var encoded, sorts, err = uniqueSort(previousPageToken.Order, previousPageToken.Cursor, instance)
if err != nil {
return nil, err
}
pt.Order = encoded

var last = items[len(items)-1]
for _, sort := range sorts {
var value string
if value, err = last.Cursor(sort.Field); err != nil {
return nil, err
}
pt.Cursor[sort.Field] = value
}

return &pt, nil
}
Loading