From d1ff871b4a0a00a4220abd9a8130b830ad40e8b8 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Tue, 12 Nov 2024 12:36:40 +0800 Subject: [PATCH 1/4] [flyteadmin] Make New DC and Old DC Default Inputs Compatible Signed-off-by: Future-Outlier --- flyteadmin/pkg/errors/errors.go | 147 +++++++++++++++++++++++++++++++- 1 file changed, 145 insertions(+), 2 deletions(-) diff --git a/flyteadmin/pkg/errors/errors.go b/flyteadmin/pkg/errors/errors.go index 5fc48b0b67..9c3bd14374 100644 --- a/flyteadmin/pkg/errors/errors.go +++ b/flyteadmin/pkg/errors/errors.go @@ -2,14 +2,17 @@ package errors import ( + "bytes" "context" "fmt" - "strings" - + "github.com/flyteorg/flyte/flytestdlib/pbhash" "github.com/golang/protobuf/proto" + "github.com/shamaton/msgpack/v2" "github.com/wI2L/jsondiff" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "reflect" + "strings" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -181,7 +184,147 @@ func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admi return statusErr } +func IsSameDCFormat(oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec) bool { + oldParams := oldSpec.GetDefaultInputs().GetParameters() + newParams := newSpec.GetDefaultInputs().GetParameters() + + // Step 1: Check if both maps have the same keys + if len(oldParams) != len(newParams) { + return false + } + for key := range oldParams { + if _, exists := newParams[key]; !exists { + return false + } + } + + // Step 2: Compare the corresponding values + for key, oldParam := range oldParams { + newParam := newParams[key] + + // Compare Parameter values + if !parametersAreEqual(oldParam, newParam) { + return false + } + } + + return true +} + +func parametersAreEqual(oldParam, newParam *core.Parameter) bool { + oldDefault := oldParam.GetDefault() + newDefault := newParam.GetDefault() + + // If both defaults are nil, they are equal + if oldDefault == nil && newDefault == nil { + return true + } + + // If one is nil and the other is not, they are not equal + if (oldDefault == nil) != (newDefault == nil) { + return false + } + + // Step 2.1: Use pbhash to compare the two values + oldHash, err1 := pbhash.ComputeHash(context.Background(), oldDefault) + newHash, err2 := pbhash.ComputeHash(context.Background(), newDefault) + if err1 != nil || err2 != nil { + return false + } + if bytes.Equal(oldHash, newHash) { + return true + } + + // Step 2.2: Check for Scalar.Generic and Scalar.Binary cases + oldScalar := oldDefault.GetScalar() + newScalar := newDefault.GetScalar() + + // If both scalars are nil, they are not equal + if oldScalar == nil && newScalar == nil { + return false + } + + // Check if one is Scalar.Generic and the other is Scalar.Binary + if isGenericScalar(oldScalar) && isBinaryScalar(newScalar) { + decodedNew, err := decodeBinaryLiteral(newScalar.GetBinary().GetValue()) + if err != nil { + return false + } + stMap := oldScalar.GetGeneric().AsMap() + return deepEqualMaps(stMap, decodedNew) + } + + if isBinaryScalar(oldScalar) && isGenericScalar(newScalar) { + decodedOld, err := decodeBinaryLiteral(oldScalar.GetBinary().GetValue()) + if err != nil { + return false + } + stMap := newScalar.GetGeneric().AsMap() + return deepEqualMaps(decodedOld, stMap) + } + + // If neither special case applies, they are not equal + return false +} + +func isBinaryScalar(scalar *core.Scalar) bool { + return scalar != nil && scalar.GetBinary() != nil +} + +func isGenericScalar(scalar *core.Scalar) bool { + return scalar != nil && scalar.GetGeneric() != nil +} + +func decodeBinaryLiteral(binaryData []byte) (interface{}, error) { + if binaryData == nil { + return nil, fmt.Errorf("binary data is nil") + } + + // Declare 'decoded' as an empty interface to hold any type + var decoded interface{} + err := msgpack.Unmarshal(binaryData, &decoded) + if err != nil { + return nil, err + } + return decoded, nil +} + +func deepEqualMaps(a, b interface{}) bool { + normalizedA := normalizeMapKeys(a) + normalizedB := normalizeMapKeys(b) + return reflect.DeepEqual(normalizedA, normalizedB) +} + +func normalizeMapKeys(data interface{}) interface{} { + switch v := data.(type) { + case map[interface{}]interface{}: + m := make(map[string]interface{}) + for key, value := range v { + keyStr := fmt.Sprintf("%v", key) + m[keyStr] = normalizeMapKeys(value) + } + return m + case map[string]interface{}: + m := make(map[string]interface{}) + for key, value := range v { + m[key] = normalizeMapKeys(value) + } + return m + case []interface{}: + for i, value := range v { + v[i] = normalizeMapKeys(value) + } + return v + default: + return v + } +} + func NewLaunchPlanExistsDifferentStructureError(ctx context.Context, request *admin.LaunchPlanCreateRequest, oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec) FlyteAdminError { + if IsSameDCFormat(oldSpec, newSpec) { + return NewLaunchPlanExistsIdenticalStructureError(ctx, request) + } + errorMsg := fmt.Sprintf("%v launch plan with different structure already exists. (Please register a new version of the launch plan):\n", request.Id.Name) diff, _ := jsondiff.Compare(oldSpec, newSpec) rdiff, _ := jsondiff.Compare(newSpec, oldSpec) From c81a385e93965900fd7fd1a3b2527b999e8213a2 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 13 Nov 2024 21:39:44 +0800 Subject: [PATCH 2/4] nit Signed-off-by: Future-Outlier --- flyteadmin/pkg/errors/errors.go | 1 + 1 file changed, 1 insertion(+) diff --git a/flyteadmin/pkg/errors/errors.go b/flyteadmin/pkg/errors/errors.go index 9c3bd14374..a78852ea32 100644 --- a/flyteadmin/pkg/errors/errors.go +++ b/flyteadmin/pkg/errors/errors.go @@ -211,6 +211,7 @@ func IsSameDCFormat(oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec return true } +// todo: if collection type or map type, we should handle it func parametersAreEqual(oldParam, newParam *core.Parameter) bool { oldDefault := oldParam.GetDefault() newDefault := newParam.GetDefault() From 163d8c1b8b857c94100b29859061d70d22132dec Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 13 Nov 2024 23:27:41 +0800 Subject: [PATCH 3/4] better commetns Signed-off-by: Future-Outlier --- flyteadmin/pkg/errors/errors.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/flyteadmin/pkg/errors/errors.go b/flyteadmin/pkg/errors/errors.go index a78852ea32..d564ec1d28 100644 --- a/flyteadmin/pkg/errors/errors.go +++ b/flyteadmin/pkg/errors/errors.go @@ -185,6 +185,13 @@ func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admi } func IsSameDCFormat(oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec) bool { + // Compare dataclass formats by: + // 1. Retrieving default inputs. + // 2. Ensuring both dataclasses have the same keys and length. + // 3. Deserializing values to map[string]interface{} and comparing them. + // 4. If a value is not a dataclass, use pbhash to compare directly, similar to the launch plan manager. + // Reference: https://github.com/flyteorg/flyte/blob/5f695895ed6a7fa45d980023ab874591184b0fc1/flyteadmin/pkg/manager/impl/launch_plan_manager.go + oldParams := oldSpec.GetDefaultInputs().GetParameters() newParams := newSpec.GetDefaultInputs().GetParameters() @@ -211,7 +218,7 @@ func IsSameDCFormat(oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec return true } -// todo: if collection type or map type, we should handle it +// TODO: Handle collection and map types func parametersAreEqual(oldParam, newParam *core.Parameter) bool { oldDefault := oldParam.GetDefault() newDefault := newParam.GetDefault() @@ -226,7 +233,7 @@ func parametersAreEqual(oldParam, newParam *core.Parameter) bool { return false } - // Step 2.1: Use pbhash to compare the two values + // Step 2.1: Use pbhash to compare the two values directly oldHash, err1 := pbhash.ComputeHash(context.Background(), oldDefault) newHash, err2 := pbhash.ComputeHash(context.Background(), newDefault) if err1 != nil || err2 != nil { @@ -245,7 +252,7 @@ func parametersAreEqual(oldParam, newParam *core.Parameter) bool { return false } - // Check if one is Scalar.Generic and the other is Scalar.Binary + // Step 2.3: Check if one is Scalar.Generic and the other is Scalar.Binary if isGenericScalar(oldScalar) && isBinaryScalar(newScalar) { decodedNew, err := decodeBinaryLiteral(newScalar.GetBinary().GetValue()) if err != nil { From 05fd0d57fd8f4ce87840b4b9a1ff9b8db92b635b Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Thu, 14 Nov 2024 00:38:48 +0800 Subject: [PATCH 4/4] fix lint Signed-off-by: Future-Outlier --- flyteadmin/pkg/errors/errors.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flyteadmin/pkg/errors/errors.go b/flyteadmin/pkg/errors/errors.go index d564ec1d28..79569fc7e0 100644 --- a/flyteadmin/pkg/errors/errors.go +++ b/flyteadmin/pkg/errors/errors.go @@ -5,18 +5,19 @@ import ( "bytes" "context" "fmt" - "github.com/flyteorg/flyte/flytestdlib/pbhash" + "reflect" + "strings" + "github.com/golang/protobuf/proto" "github.com/shamaton/msgpack/v2" "github.com/wI2L/jsondiff" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "reflect" - "strings" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/pbhash" ) type FlyteAdminError interface {