Skip to content

Commit

Permalink
Binary IDL Attribute Access for Map Task
Browse files Browse the repository at this point in the history
Signed-off-by: Future-Outlier <[email protected]>
  • Loading branch information
Future-Outlier committed Oct 30, 2024
1 parent 01c3519 commit 182f5e1
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 148 deletions.
28 changes: 23 additions & 5 deletions flytepropeller/pkg/controller/nodes/attr_path_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ func resolveAttrPathInPromise(ctx context.Context, datastore *storage.DataStore,
}
currVal = currVal.GetCollection().GetLiterals()[attr.GetIntValue()]
index++
// scalar is always the leaf, so we can break here
case *core.Literal_Scalar:
default:
break
}
}
Expand Down Expand Up @@ -107,9 +106,7 @@ func resolveAttrPathInPbStruct(nodeID string, st *structpb.Struct, bindAttrPath
}

// resolveAttrPathInBinary resolves the binary idl object (e.g. dataclass, pydantic basemodel) with attribute path
func resolveAttrPathInBinary(nodeID string, binaryIDL *core.Binary, bindAttrPath []*core.PromiseAttribute) (*core.
Literal,
error) {
func resolveAttrPathInBinary(nodeID string, binaryIDL *core.Binary, bindAttrPath []*core.PromiseAttribute) (*core.Literal, error) {

binaryBytes := binaryIDL.GetValue()
serializationFormat := binaryIDL.GetTag()
Expand Down Expand Up @@ -165,6 +162,27 @@ func resolveAttrPathInBinary(nodeID string, binaryIDL *core.Binary, bindAttrPath
}
}

// if currVal is list, convert it to literal collection
// This is for map task handling
if collection, ok := currVal.([]any); ok {
literals := make([]*core.Literal, len(collection))
for i, v := range collection {
resolvedBinaryBytes, err := msgpack.Marshal(v)
if err != nil {
return nil, err
}

Check warning on line 173 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L172-L173

Added lines #L172 - L173 were not covered by tests
literals[i] = constructResolvedBinary(resolvedBinaryBytes, serializationFormat)
}

return &core.Literal{
Value: &core.Literal_Collection{
Collection: &core.LiteralCollection{
Literals: literals,
},
},
}, nil
}

// Marshal the current value to MessagePack bytes
resolvedBinaryBytes, err := msgpack.Marshal(currVal)
if err != nil {
Expand Down
Loading

0 comments on commit 182f5e1

Please sign in to comment.