Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Support creation and reading of StructuredDataset with local or remote uri #2914

Conversation

JiangJiaWei1103
Copy link
Contributor

@JiangJiaWei1103 JiangJiaWei1103 commented Nov 8, 2024

Tracking issue

Closes #5954.

Why are the changes needed?

When StructuredDataset is instantiated with an uri, the operation of directly reading the dataframe fails. There exist two cases to discuss:

  1. Read the dataframe from StructuredDataset with a local file path as uri:
@task
def read_sd_from_local_uri() -> pd.DataFrame:
    sd = StructuredDataset(uri="./df.parquet", file_format="parquet")
    df = sd.open(pd.DataFrame).all()

    return df
  1. Read the dataframe from StructuredDataset with a remote file path (e.g., s3 object storage) as uri:
@task
def read_sd_from_remote_uri() -> pd.DataFrame:
    sd = StructuredDataset(uri="s3://my-s3-bucket/s3_flyte_dir/df.parquet", file_format="parquet")
    df = sd.open(pd.DataFrame).all()

    return df

Both cases should successfully read and return pd.DataFrame.

What changes were proposed in this pull request?

As commented here, users have no options to set _literal_sd in StructuredDataset. To prevent _literal_sd from being NoneType, we use StructuredDatasetTransformerEngine to construct a StructuredDataset literal and assign it to _literal_sd of StructuredDataset type.

How was this patch tested?

This patch is tested through a newly added unit test.

Setup process

git clone https://github.com/flyteorg/flytekit.git
gh pr checkout 2914
make setup && pip install -e .

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

@JiangJiaWei1103 JiangJiaWei1103 marked this pull request as ready for review November 10, 2024 06:43
@JiangJiaWei1103 JiangJiaWei1103 changed the title [WIP] Support creation and reading of StructuredDataset with remote uri Support creation and reading of StructuredDataset with local or remote uri Nov 10, 2024
@JiangJiaWei1103 JiangJiaWei1103 changed the title Support creation and reading of StructuredDataset with local or remote uri [BUG] Support creation and reading of StructuredDataset with local or remote uri Nov 10, 2024
@Future-Outlier Future-Outlier self-assigned this Nov 11, 2024
@kumare3
Copy link
Contributor

kumare3 commented Nov 11, 2024

this is awesome

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is amazing, @JiangJiaWei1103
I am testing this.

Comment on lines +187 to +200
def _set_literal(self, ctx: FlyteContext, expected: LiteralType) -> None:
"""
Explicitly set the StructuredDataset Literal to handle the following cases:

1. Read a dataframe from a StructuredDataset with an uri, for example:

@task
def return_sd() -> StructuredDataset:
sd = StructuredDataset(uri="s3://my-s3-bucket/s3_flyte_dir/df.parquet", file_format="parquet")
df = sd.open(pd.DataFrame).all()
return df

For details, please refer to this issue: https://github.com/flyteorg/flyte/issues/5954.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means a lot to hear it from you!

Comment on lines +201 to +202
to_literal = loop_manager.synced(flyte_dataset_transformer.async_to_literal)
self._literal_sd = to_literal(ctx, self, StructuredDataset, expected).scalar.structured_dataset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if is here the best way to write it.
cc @wild-endeavor @thomasjpfan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I'm also pondering if this is a good practice...

Will be glad to learn more from you guys!

Comment on lines 668 to 684
@task
def upload_pqt_to_s3(local_path: str, remote_path: str) -> None:
"""Upload local temp parquet file to s3 object storage"""
with tempfile.TemporaryDirectory() as tmp_dir:
fs = FileAccessProvider(
local_sandbox_dir=tmp_dir,
raw_output_prefix="s3://my-s3-bucket"
)
fs.upload(local_path, remote_path)

@task
def read_sd_from_uri(uri: str) -> pd.DataFrame:
sd = StructuredDataset(uri=uri, file_format="parquet")
df = sd.open(pd.DataFrame).all()

return df

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @JiangJiaWei1103
you should put these tasks in a workflow,
which is closer to the reality that users use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, let me modify the test snippets.

JiangJiaWei1103 and others added 3 commits November 13, 2024 21:55
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Comment on lines 705 to 707
ff = FlyteFile(path=REMOTE_PATH)
with ff.open(mode="rb") as f:
df_s3 = pd.read_parquet(f)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't do this in a workflow, workflow is like a bridge to build your DAG.
task is the place to write Python code.

https://docs.flyte.org/en/latest/user_guide/concepts/main_concepts/workflows.html#divedeep-workflows

image

Comment on lines 715 to 717
REMOTE_PATH = "s3://my-s3-bucket/my-test/df.parquet"
df = generate_pandas()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why REMOTE_PATH = "s3://my-s3-bucket/my-test/df.parquet" but not
REMOTE_PATH = "s3://my-s3-bucket/df.parquet"?

Copy link
Contributor Author

@JiangJiaWei1103 JiangJiaWei1103 Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot that I just created my-testdir and uploaded df.parquet manually last time. Hence, the test seems to pass without an error locally.

Your are right. We can just use REMOTE_PATH = "s3://my-s3-bucket/df.parquet". Also, the uploading logic in the previous comment should be modified, too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes the best way to test this in your computer.

(dev) future@outlier ~ % pwd
/Users/future-outlier/code/dev/flytekit
(dev) future@outlier ~ % pytest -s test_file.py

This is how flytekit run the uni test.
or you can just run

make unit_test

under the folder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the expected result.
image

Copy link

codecov bot commented Nov 14, 2024

Codecov Report

Attention: Patch coverage is 22.22222% with 7 lines in your changes missing coverage. Please review.

Project coverage is 75.81%. Comparing base (51f9a3e) to head (14df985).
Report is 8 commits behind head on master.

Files with missing lines Patch % Lines
flytekit/types/structured/structured_dataset.py 22.22% 7 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##           master    #2914       +/-   ##
===========================================
+ Coverage   46.91%   75.81%   +28.89%     
===========================================
  Files         199      199               
  Lines       20840    20860       +20     
  Branches     2681     2685        +4     
===========================================
+ Hits         9778    15815     +6037     
+ Misses      10582     4281     -6301     
- Partials      480      764      +284     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@JiangJiaWei1103
Copy link
Contributor Author

Hi @Future-Outlier,

As demonstrated by CI build errors, it seems that we have no chance to interact with s3 during unit test because there's no sandbox built in this stage. Maybe we could just delete all unit tests which need s3 communication in this PR and retain the local case, e.g., sd = StructuredDataset(uri=uri, file_format="parquet") where uri is a local parquet file. What do you think?

As for remote run behavior, we just use integration test to make sure the bug fix works in the sandbox env. Do you think these cases cover all the desired behaviors we've discussed so far?

If it's insufficient, we could discuss more test cases that will be beneficial to real-world usage. I'll move on to the next issue and keep refining this one with you.

@Future-Outlier
Copy link
Member

Hi @Future-Outlier,

As demonstrated by CI build errors, it seems that we have no chance to interact with s3 during unit test because there's no sandbox built in this stage. Maybe we could just delete all unit tests which need s3 communication in this PR and retain the local case, e.g., sd = StructuredDataset(uri=uri, file_format="parquet") where uri is a local parquet file. What do you think?

As for remote run behavior, we just use integration test to make sure the bug fix works in the sandbox env. Do you think these cases cover all the desired behaviors we've discussed so far?

If it's insufficient, we could discuss more test cases that will be beneficial to real-world usage. I'll move on to the next issue and keep refining this one with you.

As demonstrated by CI build errors, it seems that we have no chance to interact with s3 during unit test because there's no sandbox built in this stage. Maybe we could just delete all unit tests which need s3 communication in this PR and retain the local case, e.g., sd = StructuredDataset(uri=uri, file_format="parquet") where uri is a local parquet file. What do you think?

This is ok if your update in this PR is tested.

As for remote run behavior, we just use integration test to make ....
This is not the same since the default input is not the same as you create a structured dataset in a task container.
But we can just test locally now.

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really enjoy to work with you, thank you

@Future-Outlier Future-Outlier merged commit b04bc8d into flyteorg:master Nov 19, 2024
104 checks passed
@JiangJiaWei1103
Copy link
Contributor Author

Thanks for all the support!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

[BUG] Structured Dataset create with remote uri and read directly will fail
3 participants