Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][flyteadmin] Make New DC and Old DC Default Inputs Compatible #5991

Closed
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 146 additions & 2 deletions flyteadmin/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
package errors

import (
"bytes"
"context"
"fmt"
"strings"

"github.com/flyteorg/flyte/flytestdlib/pbhash"
"github.com/golang/protobuf/proto"
"github.com/shamaton/msgpack/v2"
"github.com/wI2L/jsondiff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"reflect"
"strings"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
Expand Down Expand Up @@ -181,7 +184,148 @@ func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admi
return statusErr
}

func IsSameDCFormat(oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec) bool {
oldParams := oldSpec.GetDefaultInputs().GetParameters()
newParams := newSpec.GetDefaultInputs().GetParameters()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am writing more comments here to let people understand how this algorithm works.

// Step 1: Check if both maps have the same keys
if len(oldParams) != len(newParams) {
return false
}
for key := range oldParams {
if _, exists := newParams[key]; !exists {
return false
}
}

// Step 2: Compare the corresponding values
for key, oldParam := range oldParams {
newParam := newParams[key]

// Compare Parameter values
if !parametersAreEqual(oldParam, newParam) {
return false
}
}

return true
}

// todo: if collection type or map type, we should handle it
func parametersAreEqual(oldParam, newParam *core.Parameter) bool {
oldDefault := oldParam.GetDefault()
newDefault := newParam.GetDefault()

// If both defaults are nil, they are equal
if oldDefault == nil && newDefault == nil {
return true
}

// If one is nil and the other is not, they are not equal
if (oldDefault == nil) != (newDefault == nil) {
return false
}

// Step 2.1: Use pbhash to compare the two values
oldHash, err1 := pbhash.ComputeHash(context.Background(), oldDefault)
newHash, err2 := pbhash.ComputeHash(context.Background(), newDefault)
if err1 != nil || err2 != nil {
return false
}
if bytes.Equal(oldHash, newHash) {
return true
}

// Step 2.2: Check for Scalar.Generic and Scalar.Binary cases
oldScalar := oldDefault.GetScalar()
newScalar := newDefault.GetScalar()

// If both scalars are nil, they are not equal
if oldScalar == nil && newScalar == nil {
return false
}

// Check if one is Scalar.Generic and the other is Scalar.Binary
if isGenericScalar(oldScalar) && isBinaryScalar(newScalar) {
decodedNew, err := decodeBinaryLiteral(newScalar.GetBinary().GetValue())
if err != nil {
return false
}
stMap := oldScalar.GetGeneric().AsMap()
return deepEqualMaps(stMap, decodedNew)
}

if isBinaryScalar(oldScalar) && isGenericScalar(newScalar) {
decodedOld, err := decodeBinaryLiteral(oldScalar.GetBinary().GetValue())
if err != nil {
return false
}
stMap := newScalar.GetGeneric().AsMap()
return deepEqualMaps(decodedOld, stMap)
}

// If neither special case applies, they are not equal
return false
}

func isBinaryScalar(scalar *core.Scalar) bool {
return scalar != nil && scalar.GetBinary() != nil
}

func isGenericScalar(scalar *core.Scalar) bool {
return scalar != nil && scalar.GetGeneric() != nil
}

func decodeBinaryLiteral(binaryData []byte) (interface{}, error) {
if binaryData == nil {
return nil, fmt.Errorf("binary data is nil")
}

// Declare 'decoded' as an empty interface to hold any type
var decoded interface{}
err := msgpack.Unmarshal(binaryData, &decoded)
if err != nil {
return nil, err
}
return decoded, nil
}

func deepEqualMaps(a, b interface{}) bool {
normalizedA := normalizeMapKeys(a)
normalizedB := normalizeMapKeys(b)
return reflect.DeepEqual(normalizedA, normalizedB)
}

func normalizeMapKeys(data interface{}) interface{} {
switch v := data.(type) {
case map[interface{}]interface{}:
m := make(map[string]interface{})
for key, value := range v {
keyStr := fmt.Sprintf("%v", key)
m[keyStr] = normalizeMapKeys(value)
}
return m
case map[string]interface{}:
m := make(map[string]interface{})
for key, value := range v {
m[key] = normalizeMapKeys(value)
}
return m
case []interface{}:
for i, value := range v {
v[i] = normalizeMapKeys(value)
}
return v
default:
return v
}
}

func NewLaunchPlanExistsDifferentStructureError(ctx context.Context, request *admin.LaunchPlanCreateRequest, oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec) FlyteAdminError {
if IsSameDCFormat(oldSpec, newSpec) {
return NewLaunchPlanExistsIdenticalStructureError(ctx, request)
}

errorMsg := fmt.Sprintf("%v launch plan with different structure already exists. (Please register a new version of the launch plan):\n", request.Id.Name)
diff, _ := jsondiff.Compare(oldSpec, newSpec)
rdiff, _ := jsondiff.Compare(newSpec, oldSpec)
Expand Down
Loading