-
Notifications
You must be signed in to change notification settings - Fork 299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Support creation and reading of StructuredDataset with local or remote uri #2914
Changes from all commits
8d3ea89
204d549
56d9fd3
e2344e9
2a689b8
a30df5e
72382ca
14df985
0ef6b6b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
from flytekit.models import types as type_models | ||
from flytekit.models.literals import Binary, Literal, Scalar, StructuredDatasetMetadata | ||
from flytekit.models.types import LiteralType, SchemaType, StructuredDatasetType | ||
from flytekit.utils.asyn import loop_manager | ||
|
||
if typing.TYPE_CHECKING: | ||
import pandas as pd | ||
|
@@ -176,8 +177,32 @@ def all(self) -> DF: # type: ignore | |
if self._dataframe_type is None: | ||
raise ValueError("No dataframe type set. Use open() to set the local dataframe type you want to use.") | ||
ctx = FlyteContextManager.current_context() | ||
|
||
if self.uri is not None and self.dataframe is None: | ||
expected = TypeEngine.to_literal_type(StructuredDataset) | ||
self._set_literal(ctx, expected) | ||
|
||
return flyte_dataset_transformer.open_as(ctx, self.literal, self._dataframe_type, self.metadata) | ||
|
||
def _set_literal(self, ctx: FlyteContext, expected: LiteralType) -> None: | ||
""" | ||
Explicitly set the StructuredDataset Literal to handle the following cases: | ||
|
||
1. Read a dataframe from a StructuredDataset with an uri, for example: | ||
|
||
@task | ||
def return_sd() -> StructuredDataset: | ||
sd = StructuredDataset(uri="s3://my-s3-bucket/s3_flyte_dir/df.parquet", file_format="parquet") | ||
df = sd.open(pd.DataFrame).all() | ||
return df | ||
|
||
For details, please refer to this issue: https://github.com/flyteorg/flyte/issues/5954. | ||
""" | ||
to_literal = loop_manager.synced(flyte_dataset_transformer.async_to_literal) | ||
self._literal_sd = to_literal(ctx, self, StructuredDataset, expected).scalar.structured_dataset | ||
Comment on lines
+201
to
+202
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if is here the best way to write it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing this out. I'm also pondering if this is a good practice... Will be glad to learn more from you guys! |
||
if self.metadata is None: | ||
self._metadata = self._literal_sd.metadata | ||
|
||
def iter(self) -> Generator[DF, None, None]: | ||
if self._dataframe_type is None: | ||
raise ValueError("No dataframe type set. Use open() to set the local dataframe type you want to use.") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means a lot to hear it from you!