Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][flyteadmin] Make New DC and Old DC Default Inputs Compatible #5991

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions flyteadmin/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@
package errors

import (
"bytes"
"context"
"fmt"
"reflect"
"strings"

"github.com/golang/protobuf/proto"
"github.com/shamaton/msgpack/v2"
"github.com/wI2L/jsondiff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/pbhash"
)

type FlyteAdminError interface {
Expand Down Expand Up @@ -181,7 +185,155 @@ func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admi
return statusErr
}

func IsSameDCFormat(oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec) bool {
// Compare dataclass formats by:
// 1. Retrieving default inputs.
// 2. Ensuring both dataclasses have the same keys and length.
// 3. Deserializing values to map[string]interface{} and comparing them.
// 4. If a value is not a dataclass, use pbhash to compare directly, similar to the launch plan manager.
// Reference: https://github.com/flyteorg/flyte/blob/5f695895ed6a7fa45d980023ab874591184b0fc1/flyteadmin/pkg/manager/impl/launch_plan_manager.go

oldParams := oldSpec.GetDefaultInputs().GetParameters()
newParams := newSpec.GetDefaultInputs().GetParameters()

// Step 1: Check if both maps have the same keys
if len(oldParams) != len(newParams) {
return false
}
for key := range oldParams {
if _, exists := newParams[key]; !exists {
return false
}
}

// Step 2: Compare the corresponding values
for key, oldParam := range oldParams {
newParam := newParams[key]

// Compare Parameter values
if !parametersAreEqual(oldParam, newParam) {
return false
}
}

return true
}

// TODO: Handle collection and map types
func parametersAreEqual(oldParam, newParam *core.Parameter) bool {
oldDefault := oldParam.GetDefault()
newDefault := newParam.GetDefault()

// If both defaults are nil, they are equal
if oldDefault == nil && newDefault == nil {
return true
}

// If one is nil and the other is not, they are not equal
if (oldDefault == nil) != (newDefault == nil) {
return false
}

// Step 2.1: Use pbhash to compare the two values directly
oldHash, err1 := pbhash.ComputeHash(context.Background(), oldDefault)
newHash, err2 := pbhash.ComputeHash(context.Background(), newDefault)
if err1 != nil || err2 != nil {
return false
}
if bytes.Equal(oldHash, newHash) {
return true
}

// Step 2.2: Check for Scalar.Generic and Scalar.Binary cases
oldScalar := oldDefault.GetScalar()
newScalar := newDefault.GetScalar()

// If both scalars are nil, they are not equal
if oldScalar == nil && newScalar == nil {
return false
}

// Step 2.3: Check if one is Scalar.Generic and the other is Scalar.Binary
if isGenericScalar(oldScalar) && isBinaryScalar(newScalar) {
decodedNew, err := decodeBinaryLiteral(newScalar.GetBinary().GetValue())
if err != nil {
return false
}
stMap := oldScalar.GetGeneric().AsMap()
return deepEqualMaps(stMap, decodedNew)
}

if isBinaryScalar(oldScalar) && isGenericScalar(newScalar) {
decodedOld, err := decodeBinaryLiteral(oldScalar.GetBinary().GetValue())
if err != nil {
return false
}
stMap := newScalar.GetGeneric().AsMap()
return deepEqualMaps(decodedOld, stMap)
}

// If neither special case applies, they are not equal
return false
}

func isBinaryScalar(scalar *core.Scalar) bool {
return scalar != nil && scalar.GetBinary() != nil
}

func isGenericScalar(scalar *core.Scalar) bool {
return scalar != nil && scalar.GetGeneric() != nil
}

func decodeBinaryLiteral(binaryData []byte) (interface{}, error) {
if binaryData == nil {
return nil, fmt.Errorf("binary data is nil")
}

// Declare 'decoded' as an empty interface to hold any type
var decoded interface{}
err := msgpack.Unmarshal(binaryData, &decoded)
if err != nil {
return nil, err
}
return decoded, nil
}

func deepEqualMaps(a, b interface{}) bool {
normalizedA := normalizeMapKeys(a)
normalizedB := normalizeMapKeys(b)
return reflect.DeepEqual(normalizedA, normalizedB)
}

func normalizeMapKeys(data interface{}) interface{} {
switch v := data.(type) {
case map[interface{}]interface{}:
m := make(map[string]interface{})
for key, value := range v {
keyStr := fmt.Sprintf("%v", key)
m[keyStr] = normalizeMapKeys(value)
}
return m
case map[string]interface{}:
m := make(map[string]interface{})
for key, value := range v {
m[key] = normalizeMapKeys(value)
}
return m
case []interface{}:
for i, value := range v {
v[i] = normalizeMapKeys(value)
}
return v
default:
return v
}
}

func NewLaunchPlanExistsDifferentStructureError(ctx context.Context, request *admin.LaunchPlanCreateRequest, oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec) FlyteAdminError {
if IsSameDCFormat(oldSpec, newSpec) {
return NewLaunchPlanExistsIdenticalStructureError(ctx, request)
}

errorMsg := fmt.Sprintf("%v launch plan with different structure already exists. (Please register a new version of the launch plan):\n", request.Id.Name)
diff, _ := jsondiff.Compare(oldSpec, newSpec)
rdiff, _ := jsondiff.Compare(newSpec, oldSpec)
Expand Down
Loading