Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add export proto to parquet example #110

Merged
merged 8 commits into from
Apr 23, 2024

Conversation

alice-yin
Copy link
Contributor

What was changed

Why?

Checklist

  1. Closes

  2. How was this tested:

  1. Any docs updates needed?

@CLAassistant
Copy link

CLAassistant commented Apr 18, 2024

CLA assistant check
All committers have signed the CLA.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@alice-yin alice-yin force-pushed the aliceyin/add_export_proto_to_parquet branch from 125d82c to e5244e0 Compare April 18, 2024 23:17
@alice-yin alice-yin requested a review from cretz April 18, 2024 23:18
@alice-yin alice-yin force-pushed the aliceyin/add_export_proto_to_parquet branch 5 times, most recently from f828872 to 54ad2a9 Compare April 19, 2024 03:40
@alice-yin alice-yin force-pushed the aliceyin/add_export_proto_to_parquet branch from 54ad2a9 to 9128e4c Compare April 19, 2024 03:45
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this file

Copy link
Member

@cretz cretz Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this sample to cloud_export_to_parquet so it's clear, unlike every other sample we write, that it's cloud specific. The proto part isn't important I don't think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you reference this sample from the list in the README?

Run the workflow:

```bash
python run_workflow.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
python run_workflow.py
python create_schedule.py

Can we change the filename to be a bit clearer on what's happening in the script?


This is an example workflow to convert exported file from proto to parquet file. The workflow is an hourly schedule

To use this code, make sure you have a [Temporal Cluster running](https://docs.temporal.io/docs/server/quick-install/) first.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of our samples link back to the primary README on how to get setup and therefore can just discuss how to run this specific piece. You don't need all this extra venv/activate/etc parts. See other READMEs in other samples and follow those.

action=ScheduleActionStartWorkflow(
ProtoToParquet.run,
wf_input,
id=f"{WORKFLOW_ID_PREFIX}-{datetime.now()}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schedule already appends a unique value, no value in adding your own here IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I have given some screenshot using this type of workflow id. So would like to keep this format

Copy link
Member

@cretz cretz Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may need to be able to improve the sample in the future regardless of how it's referenced in a post. I think the screenshot can differ from the since-improved code, but up to you, no strong need to change this now.



@dataclass
class DataTransAndLandActivitiyInput:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class DataTransAndLandActivitiyInput:
class DataTransAndLandActivityInput:

)

# Read Input File
object_keys_output = await workflow.execute_activity_method(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How big might this output get?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not too big. These are file names in S3.

)

# Read Input File
object_keys_output = await workflow.execute_activity_method(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, there may not be benefit to splitting this across activities compared to a simple activity that heartbeats and does all the work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the benefit I think we could do multi thread for data_trans_and_land activity. That's taking the majority of time right now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this isn't doing anything concurrently today, it's one at a time. But I agree in general.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file seems unnecessary

@alice-yin alice-yin force-pushed the aliceyin/add_export_proto_to_parquet branch from da7a0e2 to 6c737b5 Compare April 19, 2024 20:04
@alice-yin alice-yin requested a review from cretz April 19, 2024 20:05
@alice-yin
Copy link
Contributor Author

alice-yin commented Apr 19, 2024

Did quite a bit refactoring on this. Found aioboto3 as an async package for S3 access.

Note: I have to create poetry inside my folder. The main reason is one sample depends on python 3.8, but in my case I has to use python 3.9

@alice-yin alice-yin force-pushed the aliceyin/add_export_proto_to_parquet branch 2 times, most recently from 6d80eff to 262eb2e Compare April 19, 2024 21:58
@alice-yin alice-yin force-pushed the aliceyin/add_export_proto_to_parquet branch from 262eb2e to 8af510b Compare April 19, 2024 22:43
README.md Outdated
@@ -67,7 +67,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [sentry](sentry) - Report errors to Sentry.
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.

* [cloud_export_to_parquet](cloud_export_to_parquet) - Set up schedule workflow to process exported workflow on an hourly basis
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you retain the alphabetical order of the list?


Please make sure your python is 3.9 above. For this sample, run:

poetry install
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be something like poetry install --with cloud_export_to_parquet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, as the project depends on different python. Thus, I can't merge them in the same pyproject.toml. The current pyproject.toml needs to use python <= 3.8 but pandas need to be use python >= 3.9. I tried use pandas that could use python 3.8, however there are other dependency issue as well.

Copy link
Member

@cretz cretz Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be ok to say in this sample's README that this sample is a bit special and requires >= 3.9. Is that possible or are you hitting other issues?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that's why I keep a separate poetry file inside the folder. Thus, it could build independently with the root folder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can make the Python version higher on just your dependencies in your group, see https://python-poetry.org/docs/dependency-specification/#python-restricted-dependencies (if our poetry version too old we can update). So you can say "You need Python 3.9+ and then run poetry install --with cloud_export_to_parquet".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it.

key = activity_input.object_key
data = await get_data_from_object_key(activity_input.export_s3_bucket, key)
activity.logger.info("Convert proto to parquet for file: %s", key)
parquet_data = convert_proto_to_parquet_flatten(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned this is CPU bound work, is it expensive (i.e. can it take a lot of time blocking the entire asyncio event loop)?

I think def activities would be better than async def activities for code that does data translation and such.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So while we do convert_proto_to_parquet_flatten, reading from s3 and writing to s3 are also I/O happy operations, it would be benefit if we still keep it as async. I could improve the code a bit.

Also I thought it's always recommended to use async def for activities? Am I wrong on this?

Copy link
Contributor Author

@alice-yin alice-yin Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to add the following logic in workflows.py

`

       # Create a list of coroutine objects
        tasks = []

        # Could spin up multiple threads to process files in parallel
        for key in object_keys_output:
            data_trans_and_land_input = DataTransAndLandActivityInput(
                workflow_input.export_s3_bucket,
                key,
                workflow_input.output_s3_bucket,
                write_path,
            )
            # Convert proto to parquet and save to S3
            task = workflow.execute_activity(
                data_trans_and_land,
                data_trans_and_land_input,
                start_to_close_timeout=timedelta(hours=1),
                retry_policy=retry_policy,
            )
            tasks.append(task)

        # Wait for all tasks to complete and gather results
        _ = await asyncio.gather(
            *tasks, return_exceptions=True
        )  # `return_exceptions=True` to handle exceptions`

But as I only have 1 worker locally, it makes everything waiting longerl. But I think generally it's useful to keep the existing async logic. Let me know what you think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be benefit if we still keep it as async

I don't think so in this case, I think normal def would be best

Also I thought it's always recommended to use async def for activities? Am I wrong on this?

No, actually the opposite, you only use async def if you're sure you aren't going to block the thread. We had that async-default stance originally but we've changed documentation and courses and such to encourage just def instead.

I was planning to add the following logic in workflows.py

No need, workflow is good IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed all functions to sync.


async def save_to_sink(data: pd.DataFrame, s3_bucket: str, write_path: str) -> str:
"""Function that save object to s3 bucket."""
write_bytes = data.to_parquet(None, compression="snappy", index=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an expensive operation that blocks the asyncio event loop?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Samples do not need their own pyproject.toml. You can just add a group to the top-level one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above. Not be able to use the same pyproject.toml.

@alice-yin alice-yin requested a review from cretz April 22, 2024 18:38
@alice-yin alice-yin force-pushed the aliceyin/add_export_proto_to_parquet branch from 70a8366 to 9db26e6 Compare April 22, 2024 19:10
workflow_runner=SandboxedWorkflowRunner(
restrictions=SandboxRestrictions.default.with_passthrough_modules("boto3")
),
activity_executor=ThreadPoolExecutor(5),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
activity_executor=ThreadPoolExecutor(5),
activity_executor=ThreadPoolExecutor(100),

Or should make the max_concurrent_activities smaller, your choice, but thread pool in theory should not be less than max_concurrent_activities.

@alice-yin alice-yin requested a review from cretz April 22, 2024 21:34
@alice-yin alice-yin force-pushed the aliceyin/add_export_proto_to_parquet branch 3 times, most recently from 417c557 to 5605bde Compare April 22, 2024 23:44
@alice-yin alice-yin force-pushed the aliceyin/add_export_proto_to_parquet branch from 5605bde to 9e82613 Compare April 22, 2024 23:46
Copy link
Member

@cretz cretz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! This is a great looking sample that is nice and simple to read.

Looks like there may be some issues with MyPy. You may have to debug those, but then this will be ready to merge.

poetry.lock Outdated
Copy link
Member

@cretz cretz Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you inadvertently updated a lot of dependencies instead of just the ones for your project (this is probably causing the CI break). Usually we're ok with updating dependencies, but we may want it in a separate PR.

@alice-yin alice-yin merged commit f625172 into main Apr 23, 2024
8 checks passed
@cretz cretz deleted the aliceyin/add_export_proto_to_parquet branch April 23, 2024 18:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants