diff --git a/db/db.go b/db/db.go index 140fc8c..bff8e96 100644 --- a/db/db.go +++ b/db/db.go @@ -339,6 +339,15 @@ type Recommendation struct { RecommendedVal int64 } +// BadInputError Special error indicating bad user input as opposed to a database error +type BadInputError struct { + Details error +} + +func (bi BadInputError) Error() string { + return bi.Details.Error() +} + // DBType - database type type DBType struct { Driver DialectName // driver name (used in the code) diff --git a/db/helpers.go b/db/helpers.go index 62b8a60..1a1aab9 100644 --- a/db/helpers.go +++ b/db/helpers.go @@ -277,6 +277,10 @@ func ParseScheme(s string) (scheme string, uri string, err error) { return parts[0], parts[1], nil } +func FormatTimeStamp(timestamp time.Time) string { + return fmt.Sprintf("%vns", timestamp.UTC().UnixNano()) +} + // Cond represents a condition type Cond struct { Col string diff --git a/db/search/compare.go b/db/search/compare.go new file mode 100644 index 0000000..4b548a6 --- /dev/null +++ b/db/search/compare.go @@ -0,0 +1,64 @@ +package search + +import ( + "fmt" + + "github.com/acronis/perfkit/db" +) + +type comparator[T Searchable[T]] func(a, b T) bool +type comparators[T Searchable[T]] map[string]map[string]comparator[T] + +func makeComparator[T Searchable[T]](values []string, comparable comparators[T]) (comparator[T], error) { + var less func(a, b T) bool + if len(values) == 0 { + return less, nil + } + + var finalLess func(a, b T) bool + + for i := len(values) - 1; i >= 0; i-- { + value := values[i] + + fnc, field, err := db.ParseFunc(value) + if err != nil { + return nil, err + } + + if fnc == "" { + return nil, fmt.Errorf("empty order function") + } + + if field == "" { + return nil, fmt.Errorf("empty order field") + } + + fieldComparators, ok := comparable[field] + if !ok { + return nil, fmt.Errorf("bad order field '%v'", field) + } + + less, ok := fieldComparators[fnc] + if !ok { + return nil, fmt.Errorf("bad order function '%v'", fnc) + } + + if finalLess == nil { + finalLess = less + } else { + var deepLess = finalLess + + finalLess = func(a, b T) bool { + if less(a, b) { + return true + } else if less(b, a) { + return false + } + + return deepLess(a, b) + } + } + } + + return finalLess, nil +} diff --git a/db/search/cursor.go b/db/search/cursor.go new file mode 100644 index 0000000..8446e13 --- /dev/null +++ b/db/search/cursor.go @@ -0,0 +1,247 @@ +package search + +import ( + "fmt" + + "github.com/acronis/perfkit/db" +) + +// CursorIterable is an interface for entities that can be iterated by cursor +type CursorIterable[T any] interface { + Unique(field string) bool + Nullable(field string) bool + Cursor(field string) (string, error) +} + +type sorting struct { + Field string + Func string +} + +func uniqueSort[T CursorIterable[T]](encodedSorts []string, cursors map[string]string, instance T) ([]string, []sorting, error) { + var hasUniqueSorting = false + var uniqueOrderDirection int + + var encoded []string + var sorts []sorting + + for _, v := range encodedSorts { + var fnc, field, err = db.ParseFunc(v) + if err != nil { + return nil, nil, err + } + + var unique = instance.Unique(field) + var nullable = instance.Nullable(field) + hasUniqueSorting = unique && !nullable + + encoded = append(encoded, v) + sorts = append(sorts, sorting{ + Field: field, + Func: fnc, + }) + + switch fnc { + case "asc": + uniqueOrderDirection++ + case "desc": + uniqueOrderDirection-- + } + + if unique { + if !nullable { + break + } else if cursors != nil { + if val, ok := cursors[field]; ok && val != db.SpecialConditionIsNull { + if fnc != "desc" { + break + } + } + } + } + } + + if !hasUniqueSorting { + if uniqueOrderDirection >= 0 { + encoded = append(encoded, "asc(id)") + sorts = append(sorts, sorting{Field: "id", Func: "asc"}) + } else { + encoded = append(encoded, "desc(id)") + sorts = append(sorts, sorting{Field: "id", Func: "desc"}) + } + } + + return encoded, sorts, nil +} + +func orderCondition(val, fnc string) (expr string, flag bool, err error) { + var direction string + switch fnc { + case "asc": + switch val { + case db.SpecialConditionIsNull: + return db.SpecialConditionIsNotNull, false, nil + case db.SpecialConditionIsNotNull: + return "", true, nil + default: + direction = "gt" + } + case "desc": + switch val { + case db.SpecialConditionIsNotNull: + return db.SpecialConditionIsNull, false, nil + case db.SpecialConditionIsNull: + return "", true, nil + default: + direction = "lt" + } + default: + return "", false, fmt.Errorf("missing ordering for cursor") + } + + return fmt.Sprintf("%s(%v)", direction, val), false, nil +} + +func splitQueryOnLightWeightQueries[T CursorIterable[T]](pt PageToken, instance T) ([]PageToken, error) { + var tokens []PageToken + + if len(pt.Fields) == 0 { + tokens = append(tokens, pt) + return tokens, nil + } + + // check for unique sorting + var encodedSorts, sorts, err = uniqueSort(pt.Order, pt.Cursor, instance) + if err != nil { + return nil, err + } + + if len(pt.Cursor) == 0 { + pt.Order = encodedSorts + tokens = append(tokens, pt) + return tokens, nil + } + + // construct sort map for fast access + var orderFunctions = map[string]string{} + for _, sort := range sorts { + orderFunctions[sort.Field] = sort.Func + } + + // add condition based on cursor + var whereFromCursor = func(fld, val string, pt *PageToken) (bool, error) { + var filter, empty, filterErr = orderCondition(val, orderFunctions[fld]) + if filterErr != nil { + return false, filterErr + } + + if empty { + return true, nil + } + + pt.Filter[fld] = append(pt.Filter[fld], filter) + return false, nil + } + + for cursor := range pt.Cursor { + if _, ok := orderFunctions[cursor]; !ok { + return nil, fmt.Errorf("prohibited cursor, not mentioned it order: %v", cursor) + } + } + + // split to x page tokens + for i := range sorts { + var cpt = pt + var last = len(sorts) - 1 - i + + // copy filters + cpt.Filter = make(map[string][]string, len(sorts)-1-i) + for k, v := range pt.Filter { + cpt.Filter[k] = v + } + + // add equal condition on all fields except last in sorts + for j := 0; j <= last-1; j++ { + var fld = sorts[j].Field + var val = pt.Cursor[fld] + + cpt.Filter[fld] = append(cpt.Filter[fld], val) + } + + // add gt / lt condition for last sorting + var empty bool + if val, ok := cpt.Cursor[sorts[last].Field]; ok { + if empty, err = whereFromCursor(sorts[last].Field, val, &cpt); err != nil { + return nil, err + } + } else { + continue + } + + if empty { + continue + } + + // Add only needed sort to cpt + cpt.Order = []string{} + for j := last; j <= len(sorts)-1; j++ { + cpt.Order = append(cpt.Order, encodedSorts[j]) + + var sortField = sorts[j].Field + + if instance.Unique(sortField) { + if !instance.Nullable(sortField) { + break + } + + var becomeUnique = false + // for ASC if we have a value, that means we already select all null rows + // for DESC Nulls can start at any row + if sorts[j].Func == "asc" { + for _, val := range cpt.Filter[sortField] { + if val != db.SpecialConditionIsNull { + becomeUnique = true + break + } + } + } + if becomeUnique { + break + } + } + } + + cpt.Cursor = nil + + tokens = append(tokens, cpt) + } + + return tokens, nil +} + +func createNextCursorBasedPageToken[T CursorIterable[T]](previousPageToken PageToken, items []T, limit int64, instance T) (*PageToken, error) { + if int64(len(items)) < limit { + return nil, nil + } + + var pt PageToken + pt.Cursor = make(map[string]string) + pt.Fields = previousPageToken.Fields + + var encoded, sorts, err = uniqueSort(previousPageToken.Order, previousPageToken.Cursor, instance) + if err != nil { + return nil, err + } + pt.Order = encoded + + var last = items[len(items)-1] + for _, sort := range sorts { + var value string + if value, err = last.Cursor(sort.Field); err != nil { + return nil, err + } + pt.Cursor[sort.Field] = value + } + + return &pt, nil +} diff --git a/db/search/cursor_test.go b/db/search/cursor_test.go new file mode 100644 index 0000000..68665bb --- /dev/null +++ b/db/search/cursor_test.go @@ -0,0 +1,753 @@ +package search + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/acronis/perfkit/db" +) + +type execState string +type retCode string + +// Execution states and return codes +const ( + stateEnqueued execState = "enqueued" + stateCompleted execState = "completed" + + doneError retCode = "error" + doneAbandoned retCode = "abandoned" +) + +type testStruct struct { + ID int64 `json:"id"` + UUID string `json:"uuid" svd:"uuid,tm_uuid"` + Queue string `json:"queue" svd:"queue,tm_str64"` + State execState `json:"state"` + EnqueuedAt time.Time `json:"enqueuedAt"` + AssignedAt *time.Time `json:"assignedAt,omitempty"` + StartedAt *time.Time `json:"startedAt,omitempty"` + UpdatedAt time.Time `json:"updatedAt"` + CompletedAt *time.Time `json:"completedAt,omitempty"` + ResultCode retCode `json:"resultCode,omitempty"` +} + +func (entity testStruct) Unique(field string) bool { + switch field { + case "id", "uuid", "enqueuedAt", "assignedAt", "startedAt", "updatedAt", "completedAt": + return true + default: + return false + } +} + +func (entity testStruct) Nullable(field string) bool { + switch field { + case "assignedAt", "startedAt", "completedAt": + return true + default: + return false + } +} + +func (entity testStruct) Cursor(field string) (string, error) { + if field == "" { + return "", fmt.Errorf("empty order field") + } + + // mapping for all fields that can be used for ordering + var cursor string + switch field { + case "id": + cursor = id2str(entity.ID) + case "uuid": + cursor = entity.UUID + case "state": + cursor = string(entity.State) + case "enqueuedAt": + cursor = entity.EnqueuedAt.UTC().Format(time.RFC3339Nano) + case "assignedAt": + if entity.AssignedAt != nil { + cursor = entity.AssignedAt.UTC().Format(time.RFC3339Nano) + } + case "startedAt": + if entity.StartedAt != nil { + cursor = entity.StartedAt.UTC().Format(time.RFC3339Nano) + } + case "updatedAt": + cursor = entity.UpdatedAt.UTC().Format(time.RFC3339Nano) + case "completedAt": + if entity.CompletedAt != nil { + cursor = entity.CompletedAt.UTC().Format(time.RFC3339Nano) + } + case "resultCode": + cursor = string(entity.ResultCode) + default: + return "", fmt.Errorf("unknown field %s", field) + } + + if cursor == "" { + cursor = db.SpecialConditionIsNull + } + + return cursor, nil +} + +func TestUniqueSort(t *testing.T) { + type testUniqueSort struct { + initialSort []string + + encoded []string + sorts []sorting + err error + } + + var tests = []testUniqueSort{ + { + initialSort: []string{"asc(id)"}, + + encoded: []string{"asc(id)"}, + sorts: []sorting{ + { + Field: "id", + Func: "asc", + }, + }, + }, + { + initialSort: []string{"asc(state)", "asc(resultCode)"}, + + encoded: []string{"asc(state)", "asc(resultCode)", "asc(id)"}, + sorts: []sorting{ + { + Field: "state", + Func: "asc", + }, + { + Field: "resultCode", + Func: "asc", + }, + { + Field: "id", + Func: "asc", + }, + }, + }, + { + initialSort: []string{"desc(state)", "desc(resultCode)"}, + + encoded: []string{"desc(state)", "desc(resultCode)", "desc(id)"}, + sorts: []sorting{ + { + Field: "state", + Func: "desc", + }, + { + Field: "resultCode", + Func: "desc", + }, + { + Field: "id", + Func: "desc", + }, + }, + }, + { + initialSort: []string{"desc(enqueuedAt)"}, + + encoded: []string{"desc(enqueuedAt)"}, + sorts: []sorting{ + { + Field: "enqueuedAt", + Func: "desc", + }, + }, + }, + { + initialSort: []string{"asc(completedAt)", "asc(id)"}, + + encoded: []string{"asc(completedAt)", "asc(id)"}, + sorts: []sorting{ + { + Field: "completedAt", + Func: "asc", + }, + { + Field: "id", + Func: "asc", + }, + }, + }, + { + initialSort: []string{"asc(completedAt)", "desc(id)"}, + + encoded: []string{"asc(completedAt)", "desc(id)"}, + sorts: []sorting{ + { + Field: "completedAt", + Func: "asc", + }, + { + Field: "id", + Func: "desc", + }, + }, + }, + { + initialSort: []string{"asc(completedAt)"}, + + encoded: []string{"asc(completedAt)", "asc(id)"}, + sorts: []sorting{ + { + Field: "completedAt", + Func: "asc", + }, + { + Field: "id", + Func: "asc", + }, + }, + }, + { + initialSort: []string{"asc(id)", "asc(completedAt)"}, + + encoded: []string{"asc(id)"}, + sorts: []sorting{ + { + Field: "id", + Func: "asc", + }, + }, + }, + } + + for _, test := range tests { + var encodedSorts, sorts, err = uniqueSort[testStruct](test.initialSort, nil, testStruct{}) + if err != nil { + if test.err != nil { + assert.Equalf(t, test.err.Error(), err.Error(), "failure in test for sorts %v", test.initialSort) + } else { + t.Errorf("failure in test for sorts %v, err: %v", test.initialSort, err) + } + + continue + } + + assert.Equalf(t, test.encoded, encodedSorts, "failure in test for sorts %v, err: %v", test.initialSort, err) + assert.Equalf(t, test.sorts, sorts, "failure in test for sorts %v, err: %v", test.initialSort, err) + } +} + +func TestSplitQueryOnLightWeightQueries(t *testing.T) { + type splitTest struct { + pt PageToken + uf map[string]bool + pts []PageToken + err error + } + + var tests = []splitTest{ + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"asc(state)", "asc(resultCode)"}, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Order: []string{"asc(state)", "asc(resultCode)", "asc(id)"}, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"desc(enqueuedAt)"}, + Cursor: map[string]string{ + "enqueuedAt": "10", + }, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "enqueuedAt": {"lt(10)"}, + }, + Order: []string{"desc(enqueuedAt)"}, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"asc(state)", "asc(resultCode)"}, + Cursor: map[string]string{ + "state": string(stateCompleted), + "resultCode": string(doneAbandoned), + "id": id2str(10), + }, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "state": {string(stateCompleted)}, + "resultCode": {string(doneAbandoned)}, + "id": {"gt(10)"}, + }, + Order: []string{"asc(id)"}, + }, + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "state": {string(stateCompleted)}, + "resultCode": {fmt.Sprintf("gt(%s)", doneAbandoned)}, + }, + Order: []string{"asc(resultCode)", "asc(id)"}, + }, + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "state": {fmt.Sprintf("gt(%s)", stateCompleted)}, + }, + Order: []string{"asc(state)", "asc(resultCode)", "asc(id)"}, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"asc(state)", "asc(resultCode)"}, + Cursor: map[string]string{ + "state": string(stateEnqueued), + "resultCode": db.SpecialConditionIsNull, + "id": id2str(10), + }, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "state": {string(stateEnqueued)}, + "resultCode": {db.SpecialConditionIsNull}, + "id": {"gt(10)"}, + }, + Order: []string{"asc(id)"}, + }, + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "state": {string(stateEnqueued)}, + "resultCode": {db.SpecialConditionIsNotNull}, + }, + Order: []string{"asc(resultCode)", "asc(id)"}, + }, + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "state": {fmt.Sprintf("gt(%s)", stateEnqueued)}, + }, + Order: []string{"asc(state)", "asc(resultCode)", "asc(id)"}, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"asc(state)", "asc(resultCode)"}, + Cursor: map[string]string{ + "state": string(stateCompleted), + "resultCode": string(doneError), + "id": id2str(10), + }, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "state": {string(stateCompleted)}, + "resultCode": {string(doneError)}, + "id": {"gt(10)"}, + }, + Order: []string{"asc(id)"}, + }, + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "state": {string(stateCompleted)}, + "resultCode": {fmt.Sprintf("gt(%s)", string(doneError))}, + }, + Order: []string{"asc(resultCode)", "asc(id)"}, + }, + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "state": {fmt.Sprintf("gt(%s)", stateCompleted)}, + }, + Order: []string{"asc(state)", "asc(resultCode)", "asc(id)"}, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"desc(state)", "desc(resultCode)"}, + Cursor: map[string]string{ + "state": string(stateEnqueued), + "resultCode": db.SpecialConditionIsNull, + "id": id2str(10), + }, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "state": {string(stateEnqueued)}, + "resultCode": {db.SpecialConditionIsNull}, + "id": {"lt(10)"}, + }, + Order: []string{"desc(id)"}, + }, + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "state": {fmt.Sprintf("lt(%s)", stateEnqueued)}, + }, + Order: []string{"desc(state)", "desc(resultCode)", "desc(id)"}, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{}, // Count + Order: []string{"asc(some_id)", "asc(time)", "asc(id)"}, + Filter: map[string][]string{ + "some_id": {"2000"}, + "time": {"567"}, + "id": {"10"}, + }, + }, + pts: []PageToken{ + { + Fields: []string{}, // Count + Order: []string{"asc(some_id)", "asc(time)", "asc(id)"}, + Filter: map[string][]string{ + "some_id": {"2000"}, + "time": {"567"}, + "id": {"10"}, + }, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"asc(completedAt)", "asc(id)"}, + Cursor: map[string]string{ + "completedAt": db.SpecialConditionIsNull, + "id": "10", + }, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "completedAt": {db.SpecialConditionIsNull}, + "id": {"gt(10)"}, + }, + Order: []string{"asc(id)"}, + }, + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "completedAt": {db.SpecialConditionIsNotNull}, + }, + Order: []string{"asc(completedAt)"}, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"asc(completedAt)", "desc(id)"}, + Cursor: map[string]string{ + "completedAt": db.SpecialConditionIsNull, + "id": "10", + }, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "completedAt": {db.SpecialConditionIsNull}, + "id": {"lt(10)"}, + }, + Order: []string{"desc(id)"}, + }, + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "completedAt": {db.SpecialConditionIsNotNull}, + }, + Order: []string{"asc(completedAt)"}, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Cursor: map[string]string{ + "completedAt": db.SpecialConditionIsNull, + "id": "10", + }, + Order: []string{"asc(completedAt)"}, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "completedAt": {db.SpecialConditionIsNull}, + "id": {"gt(10)"}, + }, + Order: []string{"asc(id)"}, + }, + { + Fields: []string{"id"}, + Filter: map[string][]string{ + "completedAt": {db.SpecialConditionIsNotNull}, + }, + Order: []string{"asc(completedAt)"}, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"asc(completedAt)"}, + Cursor: map[string]string{ + "completedAt": "15", + }, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Order: []string{"asc(completedAt)"}, + Filter: map[string][]string{ + "completedAt": {"gt(15)"}, + }, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"desc(completedAt)"}, + Cursor: map[string]string{ + "completedAt": "15", + }, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Order: []string{"desc(completedAt)", "desc(id)"}, + Filter: map[string][]string{ + "completedAt": {"lt(15)"}, + }, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"desc(completedAt)", "desc(id)"}, + Cursor: nil, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Order: []string{"desc(completedAt)", "desc(id)"}, + Filter: nil, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"desc(completedAt)"}, + Cursor: nil, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Order: []string{"desc(completedAt)", "desc(id)"}, + Filter: nil, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"desc(completedAt)"}, + Cursor: map[string]string{ + "completedAt": db.SpecialConditionIsNull, + "id": "10", + }, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Order: []string{"desc(id)"}, + Filter: map[string][]string{ + "completedAt": {db.SpecialConditionIsNull}, + "id": {"lt(10)"}, + }, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"asc(state)", "asc(completedAt)"}, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Order: []string{"asc(state)", "asc(completedAt)", "asc(id)"}, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"asc(state)", "asc(completedAt)"}, + Cursor: map[string]string{ + "state": string(stateEnqueued), + "completedAt": db.SpecialConditionIsNull, + "id": "10", + }, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Order: []string{"asc(id)"}, + Filter: map[string][]string{ + "state": {string(stateEnqueued)}, + "completedAt": {db.SpecialConditionIsNull}, + "id": {"gt(10)"}, + }, + }, + { + Fields: []string{"id"}, + Order: []string{"asc(completedAt)"}, + Filter: map[string][]string{ + "state": {string(stateEnqueued)}, + "completedAt": {db.SpecialConditionIsNotNull}, + }, + }, + { + Fields: []string{"id"}, + Order: []string{"asc(state)", "asc(completedAt)", "asc(id)"}, + Filter: map[string][]string{ + "state": {fmt.Sprintf("gt(%s)", stateEnqueued)}, + }, + }, + }, + }, + { + pt: PageToken{ + Fields: []string{"id"}, + Order: []string{"asc(state)", "asc(startedAt)", "asc(completedAt)"}, + Cursor: map[string]string{ + "state": string(stateEnqueued), + "startedAt": db.SpecialConditionIsNull, + "completedAt": db.SpecialConditionIsNull, + "id": "10", + }, + }, + pts: []PageToken{ + { + Fields: []string{"id"}, + Order: []string{"asc(id)"}, + Filter: map[string][]string{ + "state": {string(stateEnqueued)}, + "startedAt": {db.SpecialConditionIsNull}, + "completedAt": {db.SpecialConditionIsNull}, + "id": {"gt(10)"}, + }, + }, + { + Fields: []string{"id"}, + Order: []string{"asc(completedAt)"}, + Filter: map[string][]string{ + "state": {string(stateEnqueued)}, + "startedAt": {db.SpecialConditionIsNull}, + "completedAt": {db.SpecialConditionIsNotNull}, + }, + }, + { + Fields: []string{"id"}, + Order: []string{"asc(startedAt)"}, + Filter: map[string][]string{ + "state": {string(stateEnqueued)}, + "startedAt": {db.SpecialConditionIsNotNull}, + }, + }, + { + Fields: []string{"id"}, + Order: []string{"asc(state)", "asc(startedAt)", "asc(completedAt)", "asc(id)"}, + Filter: map[string][]string{ + "state": {fmt.Sprintf("gt(%s)", string(stateEnqueued))}, + }, + }, + }, + }, + } + + for _, test := range tests { + var res, err = splitQueryOnLightWeightQueries[testStruct](test.pt, testStruct{}) + if err != nil { + if test.err != nil { + assert.Equalf(t, test.err.Error(), err.Error(), "failure in test for pt %v with unique values %v", test.pt, test.uf) + } else { + t.Errorf("failure in test for pt %v with unique values %v, err: %v", test.pt, test.uf, err) + } + + continue + } + + require.Equal(t, len(test.pts), len(res)) + for i := range test.pts { + require.Equal(t, test.pts[i], res[i]) + } + } +} + +func TestCreateNextCursorBasedPageTokenForTasks(t *testing.T) { + type taskCursorTest struct { + items []testStruct + pt *PageToken + err error + } + + var tests = []taskCursorTest{ + { + items: nil, + pt: nil, + err: nil, + }, + } + + for _, test := range tests { + var res, err = createNextCursorBasedPageToken[testStruct](PageToken{}, test.items, 10, testStruct{}) + if err != nil { + assert.Equalf(t, test.err.Error(), err.Error(), "failure in test for tasks %v", test.items) + continue + } + + assert.Equalf(t, test.pt, res, "wrong empty value in test for tasks %v", test.items) + } +} diff --git a/db/search/engine.go b/db/search/engine.go new file mode 100644 index 0000000..f26e59e --- /dev/null +++ b/db/search/engine.go @@ -0,0 +1,457 @@ +package search + +import ( + "fmt" + "strconv" + "sync" + "time" + + "github.com/acronis/perfkit/db" +) + +func id2str(id int64) string { + return fmt.Sprintf("%v", id) +} + +func str2id(s string) (int64, error) { + return strconv.ParseInt(s, 10, 64) +} + +func int64Min(a, b int64) int64 { + if a < b { + return a + } + return b +} + +// Comparable is an interface for entities that can be compared +type Comparable[T any] interface { + Less(T) bool + Equal(T) bool +} + +type Searchable[T any] interface { + Comparable[T] + CursorIterable[T] +} + +type Searcher[T any] interface { + Search(*db.SelectCtrl) ([]T, int64, error) +} + +type loadedPage[T Searchable[T]] struct { + Items []T + Size int64 + Timestamp string +} + +type searchEngine[T Searchable[T]] struct { + lods map[string][]string + comparators map[string]map[string]comparator[T] + equalFunc comparator[T] + loadFunc func(Searcher[T], *db.SelectCtrl) ([]T, int64, error) + uniqueFields map[string]bool + cursorGen func(entity *T, field string) (string, error) +} + +func cleanSources[T Searchable[T]](sources map[string]Searcher[T]) map[string]Searcher[T] { + for key, source := range sources { + if source == nil { + delete(sources, key) + } + } + + return sources +} + +func (e *searchEngine[T]) search(sources map[string]Searcher[T], pt PageToken, limit int64, cursorPagingEnabled bool, instance T) (*loadedPage[T], *PageToken, error) { + var finalPage = loadedPage[T]{Timestamp: db.FormatTimeStamp(time.Now())} + + sources = cleanSources(sources) + if len(sources) == 0 { + return &finalPage, nil, nil + } + + var sorts []string + if cursorPagingEnabled { + if encoded, _, sortsErr := uniqueSort(pt.Order, pt.Cursor, instance); sortsErr != nil { + return nil, nil, db.BadInputError{Details: fmt.Errorf("unique sort creation error: %v", sortsErr)} + } else { + sorts = encoded + } + } else { + sorts = pt.Order + } + + var less, cmpErr = makeComparator(sorts, e.comparators) + if cmpErr != nil { + return nil, nil, db.BadInputError{Details: fmt.Errorf("creation cmp error: %v", cmpErr)} + } + + var pts []PageToken + var offsets map[string]int64 + if cursorPagingEnabled { + var err error + if pts, err = splitQueryOnLightWeightQueries(pt, instance); err != nil { + return nil, nil, db.BadInputError{Details: fmt.Errorf("cursor modify error: %v", err)} + } + } else { + pts = append(pts, pt) + offsets = pt.Offsets + sources = cutSourcesByOffsets(sources, offsets) + } + + var finalList = []T{} + var finalCount int64 + for _, cpt := range pts { + var pages, counts, err = e.load(sources, cpt, limit) + if err != nil { + return nil, nil, err + } + + if len(cpt.Fields) == 0 { + for _, count := range counts { + finalCount += count + } + continue + } + + pages = cleanPages(pages) + if less == nil && len(pages) > 1 { + return nil, nil, db.BadInputError{Details: fmt.Errorf("failed to merge multiple pages, order is empty: %v", err)} + } + finalList, offsets = e.merge(finalList, pages, limit, less, offsets) + + if len(finalList) >= int(limit) { + break + } + } + + if len(pt.Fields) == 0 { + finalPage.Size = finalCount + return &finalPage, nil, nil + } + + var nextPageToken *PageToken + var err error + if cursorPagingEnabled { + if nextPageToken, err = createNextCursorBasedPageToken(pt, finalList, limit, instance); err != nil { + return nil, nil, db.BadInputError{Details: fmt.Errorf("creation cmp error: %v", err)} + } + } else { + nextPageToken = createNextOffsetBasedPageToken(pt, offsets) + } + + finalPage.Items = finalList + finalPage.Size = int64(len(finalPage.Items)) + + return &finalPage, nextPageToken, nil +} + +type loadResult[T Searchable[T]] struct { + key string + items []T + count int64 + err error +} + +func (e *searchEngine[T]) load(sources map[string]Searcher[T], pt PageToken, limit int64) (pages map[string][]T, counts map[string]int64, err error) { + sources = cleanSources(sources) + + pages = map[string][]T{} + counts = map[string]int64{} + if len(sources) == 0 { + return nil, nil, nil + } + + if len(sources) == 1 { + for key, source := range sources { + var offset = pt.Offsets[key] + var ctrl = newSelectCtrl(pt, limit, offset) + var items, count, err = e.loadFunc(source, ctrl) + if err != nil { + return nil, nil, err + } + pages[key] = items + counts[key] = count + return pages, counts, nil + } + } + + var results = make([]loadResult[T], len(sources)) + var wg sync.WaitGroup + wg.Add(len(sources)) + + var i = 0 + for k, s := range sources { + go func(key string, source Searcher[T], index int) { + var offset = pt.Offsets[key] + var ctrl = newSelectCtrl(pt, limit, offset) + + var result = loadResult[T]{key: key} + result.items, result.count, result.err = e.loadFunc(source, ctrl) + results[index] = result + + wg.Done() + }(k, s, i) + + i++ + } + wg.Wait() + + for _, result := range results { + if result.err != nil { + return nil, nil, result.err + } + + // nolint:gocritic // TODO: >= is suspicious here. should be > ? + if len(result.items) >= 0 { + pages[result.key] = result.items + counts[result.key] = result.count + } + } + + return pages, counts, nil +} + +type pageToMerge[T Searchable[T]] struct { + key string + items []T + used int64 +} + +// Not safe +func (p *pageToMerge[T]) headPlus(index int64) T { + return p.items[p.used+index] +} + +func (p *pageToMerge[T]) isSafeHeadPlus(index int64) bool { + return p.used+index < int64(len(p.items)) +} + +func cleanPages[T Searchable[T]](pages map[string][]T) map[string][]T { + for key, page := range pages { + if len(page) == 0 { + delete(pages, key) + } + } + + return pages +} + +// nolint:funlen // TODO: Try to split this function +func (e *searchEngine[T]) merge(finalList []T, pages map[string][]T, limit int64, less comparator[T], offsets map[string]int64) ([]T, map[string]int64) { + pages = cleanPages(pages) + + if len(pages) == 0 { + return finalList, nil + } + + var usedTombstones = map[string]int64{} + var pagesToMerge []pageToMerge[T] + for key, page := range pages { + pagesToMerge = append(pagesToMerge, pageToMerge[T]{ + key: key, + items: page, + used: 0, + }) + } + + // Algorithm explanation + // + // At the beginning we have + // 1. Final list + // 2. Multiple (or single) arrays in any order + // + // Elements in each array are sorted in 'less' order. + // Assume 'less' order is just asc(id). + // So, may have: + // + // Final list: + // 1 - 2 - 3 - 4 + // + // Pages to merge: + // 7 - 8 - 9 + // 9 - 10 - 12 + // 5 - 6 - 11 + // + // Limit: 7 + // + // Initial finalList that we have can be empty or already have some elements. + + for len(pagesToMerge) > 0 && int64(len(finalList)) < limit { + // If there is only one page, append it without merge + // Final list: 1 - 2 - 3 - 4 + // Page to merge: 5 - 6 - 7 + // Limit: 7 + // + // Final list: 1 - 2 - 3 - 4 - 5 - 6 - 7 + if len(pagesToMerge) == 1 { + var numberOfItemsToTake = int64Min(limit-int64(len(finalList)), int64(len(pagesToMerge[0].items))-pagesToMerge[0].used) + finalList = append(finalList, pagesToMerge[0].items[pagesToMerge[0].used:pagesToMerge[0].used+numberOfItemsToTake]...) + pagesToMerge[0].used += numberOfItemsToTake + break + } + + // If there are multiple pages, execute invariant of loop. + // Invariant of loop: all loaded pages heads have to be arranged in order defined by function ctor.less(a, b). + // Head is page.items[p.used] + // (i) - means head + // If it was: + // Pages to merge: + // (7) - 8 - 9 + // (9) - 10 - 12 + // (5) - 6 - 11 + // + // We should make: + // Pages to merge: + // (5) - 6 - 11 + // (7) - 8 - 9 + // (9) - 10 - 12 + // + // Sort to invariant condition: + var swapped = true + for swapped { + swapped = false + for i := 1; i < len(pagesToMerge); i++ { + if less(pagesToMerge[i].headPlus(0), pagesToMerge[i-1].headPlus(0)) { + pagesToMerge[i], pagesToMerge[i-1] = pagesToMerge[i-1], pagesToMerge[i] + swapped = true + } + } + } + + // Most of the logic will be executed only with 0 and 1 pages in the list, so let's define them. + var first = pagesToMerge[0] + var second = pagesToMerge[1] + + // We should search minimal element from first page, larger than the head from the second one + // (i) - means head + // If it was: + // Pages to merge: + // (5) - 6 - 11 + // (7) - 8 - 9 + // (9) - 10 - 12 + // + // We should make: + // Pages to merge: + // 5 - 6 - (11) + // (7) - 8 - 9 + // (9) - 10 - 12 + var numberOfItemsInFinalList = int64(len(finalList)) + var numberOfItemsToTake int64 = 0 + for numberOfItemsInFinalList+numberOfItemsToTake < limit && + first.isSafeHeadPlus(numberOfItemsToTake) && + less(first.headPlus(numberOfItemsToTake), second.headPlus(0)) { + numberOfItemsToTake++ + } + + // We should search equal elements from all pages to merge and deduplicate them + // Here is very important assumption: + // Duplicates can be presented not more than in two pages. + // + // (i) - means head + // If it was: + // Pages to merge: + // 7 - 8 - (9) + // (9) - 10 - 12 + // 5 - 6 - (11) + // + // We should make: + // Pages to merge: + // 7 - 8 - 9 - () + // 9 - (10) - 12 + // 5 - 6 - (11) + for numberOfItemsInFinalList+numberOfItemsToTake < limit && + first.isSafeHeadPlus(numberOfItemsToTake) && + second.isSafeHeadPlus(0) && + e.equalFunc(first.headPlus(numberOfItemsToTake), second.headPlus(0)) { + numberOfItemsToTake++ + second.used++ + } + + // Not same, not less and not bigger + // Increment numberOfItemsToTake anyway to avoid infinite loop + if numberOfItemsToTake == 0 { + numberOfItemsToTake++ + } + + // Append the smallest items to final page + // (i) - means head + // If it was: + // Final list: 1 - 2 - 3 - 4 + // Pages to merge: + // 5 - 6 - (11) + // (7) - 8 - 9 + // (9) - 10 - 12 + // + // We should make: + // Final list: 1 - 2 - 3 - 4 - 5 - 6 + // Pages to merge: + // 5 - 6 - (11) + // (7) - 8 - 9 + // (9) - 10 - 12 + finalList = append(finalList, first.items[first.used:first.used+numberOfItemsToTake]...) + first.used += numberOfItemsToTake + pagesToMerge[0] = first + pagesToMerge[1] = second + + // Drop pages to merge with all used elements + // (i) - means head + // If it was: + // Pages to merge: + // 7 - 8 - 9 - () + // 9 - (10) - 12 + // 5 - 6 - (11) + // + // We should make: + // Pages to merge: + // 9 - (10) - 12 + // 5 - 6 - (11) + for i := 0; i < len(pagesToMerge); i++ { + if pagesToMerge[i].isSafeHeadPlus(0) { + continue + } + for j := i + 1; j < len(pagesToMerge); j++ { + pagesToMerge[j], pagesToMerge[j-1] = pagesToMerge[j-1], pagesToMerge[j] + } + + var pageToRemove = pagesToMerge[len(pagesToMerge)-1] + if pageToRemove.used == limit { + usedTombstones[pageToRemove.key] = pageToRemove.used + } + + pagesToMerge = pagesToMerge[:len(pagesToMerge)-1] + i-- + } + } + + var lastMergeOffsets map[string]int64 + for _, page := range pagesToMerge { + if page.used == int64(len(page.items)) && int64(len(page.items)) < limit { + continue + } + if lastMergeOffsets == nil { + lastMergeOffsets = map[string]int64{} + } + lastMergeOffsets[page.key] = page.used + } + + for src, offset := range usedTombstones { + if lastMergeOffsets == nil { + lastMergeOffsets = map[string]int64{} + } + lastMergeOffsets[src] = offset + } + + if offsets != nil { + for src := range lastMergeOffsets { + if used, ok := offsets[src]; ok { + lastMergeOffsets[src] += used + } + } + } + offsets = lastMergeOffsets + + return finalList, offsets +} diff --git a/db/search/offsets.go b/db/search/offsets.go new file mode 100644 index 0000000..edb20ed --- /dev/null +++ b/db/search/offsets.go @@ -0,0 +1,29 @@ +package search + +func cutSourcesByOffsets[T Searchable[T]](sources map[string]Searcher[T], offsets map[string]int64) map[string]Searcher[T] { + if offsets == nil { + return sources + } + + for src := range sources { + if _, ok := offsets[src]; !ok { + delete(sources, src) + } + } + + return sources +} + +// nolint:unparam TODO: Add more error handling? +func createNextOffsetBasedPageToken(previousPageToken PageToken, usedFromSources map[string]int64) *PageToken { + if usedFromSources == nil { + return nil + } + + return &PageToken{ + Fields: previousPageToken.Fields, + Offsets: usedFromSources, + Filter: previousPageToken.Filter, + Order: previousPageToken.Order, + } +} diff --git a/db/search/paging.go b/db/search/paging.go new file mode 100644 index 0000000..b7a7e92 --- /dev/null +++ b/db/search/paging.go @@ -0,0 +1,67 @@ +package search + +import ( + "encoding/base64" + "encoding/json" + + "github.com/acronis/perfkit/db" +) + +type PageToken struct { + Fields []string `json:"fields"` + Filter map[string][]string `json:"filter"` + Order []string `json:"order"` + Offsets map[string]int64 `json:"offsets"` + Cursor map[string]string `json:"cursor"` +} + +func (pt *PageToken) HasFilter(key string) bool { + if pt.Filter == nil { + return false + } + f, ok := pt.Filter[key] + return ok && len(f) > 0 +} + +func (pt *PageToken) HasOrder(key string) bool { + if pt.Order == nil { + return false + } + for _, o := range pt.Order { + _, field, err := db.ParseFunc(o) + if err != nil { + return false + } + if key == field { + return true + } + } + return false +} + +func (pt *PageToken) Pack() (string, error) { + var b, err = json.Marshal(&pt) + if err != nil { + return "", err + } + + return base64.StdEncoding.EncodeToString(b), nil +} + +func (pt *PageToken) Unpack(s string) error { + var b, err = base64.StdEncoding.DecodeString(s) + if err != nil { + return err + } + + return json.Unmarshal(b, pt) +} + +func newSelectCtrl(pt PageToken, limit int64, offset int64) *db.SelectCtrl { + return &db.SelectCtrl{ + Fields: pt.Fields, + Page: db.Page{Limit: limit, Offset: offset}, + Where: pt.Filter, + Order: pt.Order, + } +} diff --git a/db/search/paging_test.go b/db/search/paging_test.go new file mode 100644 index 0000000..20e86d7 --- /dev/null +++ b/db/search/paging_test.go @@ -0,0 +1,114 @@ +package search + +import ( + "reflect" + "testing" +) + +func TestPageTokenPacking(t *testing.T) { + var pts, ptd PageToken + var s string + var err error + + var eq = func(l, r PageToken) bool { + return reflect.DeepEqual(l, r) + } + + if s, err = pts.Pack(); err != nil { + t.Error(err) + return + } + + if err = ptd.Unpack(s); err != nil { + t.Error(err) + return + } + + if !eq(pts, ptd) { + t.Error("tokens differ", pts, ptd) + return + } + + pts.Fields = []string{"id", "uuid"} + + if s, err = pts.Pack(); err != nil { + t.Error(err) + return + } + + if err = ptd.Unpack(s); err != nil { + t.Error(err) + return + } + + if !eq(pts, ptd) { + t.Error("tokens differ", pts, ptd) + return + } + + pts.Offsets = map[string]int64{} + pts.Offsets["default:1"] = 100500 + + if s, err = pts.Pack(); err != nil { + t.Error(err) + return + } + + if err = ptd.Unpack(s); err != nil { + t.Error(err) + return + } + + if !eq(pts, ptd) { + t.Error("tokens differ", pts, ptd) + return + } + + pts.Filter = map[string][]string{ + "xk": {"xv"}, + "yk": {"yv1, yv2"}, + "zk": {}, + "nk": nil, + } + + if s, err = pts.Pack(); err != nil { + t.Error(err) + return + } + + if err = ptd.Unpack(s); err != nil { + t.Error(err) + return + } + + if !eq(pts, ptd) { + t.Error("tokens differ", pts, ptd) + return + } +} + +func TestCursorGen(t *testing.T) { + var pt = PageToken{ + Filter: map[string][]string{ + "f1": {"1"}, + }, + Order: []string{"asc(f2)"}, + } + + var ctrl = newSelectCtrl(pt, 10, 0) + + if ctrl.Page.Limit != 10 || ctrl.Page.Offset != 0 { + t.Error("wrong page", ctrl.Page) + return + } + + if len(ctrl.Order) != 1 || ctrl.Order[0] != "asc(f2)" { + t.Error("wrong order", ctrl.Order) + return + } + + if len(ctrl.Where) != 1 || len(ctrl.Where["f1"]) != 1 || ctrl.Where["f1"][0] != "1" { + t.Error("wrong where", ctrl.Where) + return + } +}