-
Notifications
You must be signed in to change notification settings - Fork 299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Support creation and reading of StructuredDataset with local or remote uri #2914
[BUG] Support creation and reading of StructuredDataset with local or remote uri #2914
Conversation
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
this is awesome |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is amazing, @JiangJiaWei1103
I am testing this.
def _set_literal(self, ctx: FlyteContext, expected: LiteralType) -> None: | ||
""" | ||
Explicitly set the StructuredDataset Literal to handle the following cases: | ||
|
||
1. Read a dataframe from a StructuredDataset with an uri, for example: | ||
|
||
@task | ||
def return_sd() -> StructuredDataset: | ||
sd = StructuredDataset(uri="s3://my-s3-bucket/s3_flyte_dir/df.parquet", file_format="parquet") | ||
df = sd.open(pd.DataFrame).all() | ||
return df | ||
|
||
For details, please refer to this issue: https://github.com/flyteorg/flyte/issues/5954. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means a lot to hear it from you!
to_literal = loop_manager.synced(flyte_dataset_transformer.async_to_literal) | ||
self._literal_sd = to_literal(ctx, self, StructuredDataset, expected).scalar.structured_dataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if is here the best way to write it.
cc @wild-endeavor @thomasjpfan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out. I'm also pondering if this is a good practice...
Will be glad to learn more from you guys!
@task | ||
def upload_pqt_to_s3(local_path: str, remote_path: str) -> None: | ||
"""Upload local temp parquet file to s3 object storage""" | ||
with tempfile.TemporaryDirectory() as tmp_dir: | ||
fs = FileAccessProvider( | ||
local_sandbox_dir=tmp_dir, | ||
raw_output_prefix="s3://my-s3-bucket" | ||
) | ||
fs.upload(local_path, remote_path) | ||
|
||
@task | ||
def read_sd_from_uri(uri: str) -> pd.DataFrame: | ||
sd = StructuredDataset(uri=uri, file_format="parquet") | ||
df = sd.open(pd.DataFrame).all() | ||
|
||
return df | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @JiangJiaWei1103
you should put these tasks in a workflow,
which is closer to the reality that users use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem, let me modify the test snippets.
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
ff = FlyteFile(path=REMOTE_PATH) | ||
with ff.open(mode="rb") as f: | ||
df_s3 = pd.read_parquet(f) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't do this in a workflow, workflow is like a bridge to build your DAG.
task is the place to write Python code.
https://docs.flyte.org/en/latest/user_guide/concepts/main_concepts/workflows.html#divedeep-workflows
REMOTE_PATH = "s3://my-s3-bucket/my-test/df.parquet" | ||
df = generate_pandas() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why REMOTE_PATH = "s3://my-s3-bucket/my-test/df.parquet"
but not
REMOTE_PATH = "s3://my-s3-bucket/df.parquet"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot that I just created my-test
dir and uploaded df.parquet
manually last time. Hence, the test seems to pass without an error locally.
Your are right. We can just use REMOTE_PATH = "s3://my-s3-bucket/df.parquet"
. Also, the uploading logic in the previous comment should be modified, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes the best way to test this in your computer.
(dev) future@outlier ~ % pwd
/Users/future-outlier/code/dev/flytekit
(dev) future@outlier ~ % pytest -s test_file.py
This is how flytekit run the uni test.
or you can just run
make unit_test
under the folder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2914 +/- ##
===========================================
+ Coverage 46.91% 75.81% +28.89%
===========================================
Files 199 199
Lines 20840 20860 +20
Branches 2681 2685 +4
===========================================
+ Hits 9778 15815 +6037
+ Misses 10582 4281 -6301
- Partials 480 764 +284 ☔ View full report in Codecov by Sentry. |
Hi @Future-Outlier, As demonstrated by CI build errors, it seems that we have no chance to interact with s3 during unit test because there's no sandbox built in this stage. Maybe we could just delete all unit tests which need s3 communication in this PR and retain the local case, e.g., As for remote run behavior, we just use integration test to make sure the bug fix works in the sandbox env. Do you think these cases cover all the desired behaviors we've discussed so far? If it's insufficient, we could discuss more test cases that will be beneficial to real-world usage. I'll move on to the next issue and keep refining this one with you. |
This is ok if your update in this PR is tested.
|
Signed-off-by: JiaWei Jiang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really enjoy to work with you, thank you
Thanks for all the support! |
Tracking issue
Closes #5954.
Why are the changes needed?
When
StructuredDataset
is instantiated with anuri
, the operation of directly reading the dataframe fails. There exist two cases to discuss:StructuredDataset
with a local file path asuri
:StructuredDataset
with a remote file path (e.g., s3 object storage) asuri
:Both cases should successfully read and return
pd.DataFrame
.What changes were proposed in this pull request?
As commented here, users have no options to set
_literal_sd
inStructuredDataset
. To prevent_literal_sd
from beingNoneType
, we useStructuredDatasetTransformerEngine
to construct aStructuredDataset
literal and assign it to_literal_sd
ofStructuredDataset
type.How was this patch tested?
This patch is tested through a newly added unit test.
Setup process
Check all the applicable boxes
Related PRs
Docs link