-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add export proto to parquet example #110
Conversation
|
125d82c
to
e5244e0
Compare
f828872
to
54ad2a9
Compare
54ad2a9
to
9128e4c
Compare
export_proto_to_parquet/LICENSE
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove this file
export_proto_to_parquet/README.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change this sample to cloud_export_to_parquet
so it's clear, unlike every other sample we write, that it's cloud specific. The proto
part isn't important I don't think.
export_proto_to_parquet/README.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you reference this sample from the list in the README?
export_proto_to_parquet/README.md
Outdated
Run the workflow: | ||
|
||
```bash | ||
python run_workflow.py |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
python run_workflow.py | |
python create_schedule.py |
Can we change the filename to be a bit clearer on what's happening in the script?
export_proto_to_parquet/README.md
Outdated
|
||
This is an example workflow to convert exported file from proto to parquet file. The workflow is an hourly schedule | ||
|
||
To use this code, make sure you have a [Temporal Cluster running](https://docs.temporal.io/docs/server/quick-install/) first. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of our samples link back to the primary README on how to get setup and therefore can just discuss how to run this specific piece. You don't need all this extra venv/activate/etc parts. See other READMEs in other samples and follow those.
action=ScheduleActionStartWorkflow( | ||
ProtoToParquet.run, | ||
wf_input, | ||
id=f"{WORKFLOW_ID_PREFIX}-{datetime.now()}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The schedule already appends a unique value, no value in adding your own here IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I have given some screenshot using this type of workflow id. So would like to keep this format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may need to be able to improve the sample in the future regardless of how it's referenced in a post. I think the screenshot can differ from the since-improved code, but up to you, no strong need to change this now.
|
||
|
||
@dataclass | ||
class DataTransAndLandActivitiyInput: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class DataTransAndLandActivitiyInput: | |
class DataTransAndLandActivityInput: |
export_proto_to_parquet/workflows.py
Outdated
) | ||
|
||
# Read Input File | ||
object_keys_output = await workflow.execute_activity_method( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How big might this output get?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not too big. These are file names in S3.
export_proto_to_parquet/workflows.py
Outdated
) | ||
|
||
# Read Input File | ||
object_keys_output = await workflow.execute_activity_method( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note, there may not be benefit to splitting this across activities compared to a simple activity that heartbeats and does all the work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the benefit I think we could do multi thread for data_trans_and_land activity. That's taking the majority of time right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this isn't doing anything concurrently today, it's one at a time. But I agree in general.
export_proto_to_parquet/yarn.lock
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file seems unnecessary
da7a0e2
to
6c737b5
Compare
Did quite a bit refactoring on this. Found aioboto3 as an async package for S3 access. Note: I have to create poetry inside my folder. The main reason is one sample depends on python 3.8, but in my case I has to use python 3.9 |
6d80eff
to
262eb2e
Compare
262eb2e
to
8af510b
Compare
README.md
Outdated
@@ -67,7 +67,7 @@ Some examples require extra dependencies. See each sample's directory for specif | |||
* [sentry](sentry) - Report errors to Sentry. | |||
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers. | |||
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code. | |||
|
|||
* [cloud_export_to_parquet](cloud_export_to_parquet) - Set up schedule workflow to process exported workflow on an hourly basis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you retain the alphabetical order of the list?
cloud_export_to_parquet/README.md
Outdated
|
||
Please make sure your python is 3.9 above. For this sample, run: | ||
|
||
poetry install |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be something like poetry install --with cloud_export_to_parquet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, as the project depends on different python. Thus, I can't merge them in the same pyproject.toml. The current pyproject.toml needs to use python <= 3.8 but pandas need to be use python >= 3.9. I tried use pandas that could use python 3.8, however there are other dependency issue as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be ok to say in this sample's README that this sample is a bit special and requires >= 3.9. Is that possible or are you hitting other issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So that's why I keep a separate poetry file inside the folder. Thus, it could build independently with the root folder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can make the Python version higher on just your dependencies in your group, see https://python-poetry.org/docs/dependency-specification/#python-restricted-dependencies (if our poetry version too old we can update). So you can say "You need Python 3.9+ and then run poetry install --with cloud_export_to_parquet
".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated it.
key = activity_input.object_key | ||
data = await get_data_from_object_key(activity_input.export_s3_bucket, key) | ||
activity.logger.info("Convert proto to parquet for file: %s", key) | ||
parquet_data = convert_proto_to_parquet_flatten(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned this is CPU bound work, is it expensive (i.e. can it take a lot of time blocking the entire asyncio event loop)?
I think def
activities would be better than async def
activities for code that does data translation and such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So while we do convert_proto_to_parquet_flatten, reading from s3 and writing to s3 are also I/O happy operations, it would be benefit if we still keep it as async. I could improve the code a bit.
Also I thought it's always recommended to use async def for activities? Am I wrong on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was planning to add the following logic in workflows.py
`
# Create a list of coroutine objects
tasks = []
# Could spin up multiple threads to process files in parallel
for key in object_keys_output:
data_trans_and_land_input = DataTransAndLandActivityInput(
workflow_input.export_s3_bucket,
key,
workflow_input.output_s3_bucket,
write_path,
)
# Convert proto to parquet and save to S3
task = workflow.execute_activity(
data_trans_and_land,
data_trans_and_land_input,
start_to_close_timeout=timedelta(hours=1),
retry_policy=retry_policy,
)
tasks.append(task)
# Wait for all tasks to complete and gather results
_ = await asyncio.gather(
*tasks, return_exceptions=True
) # `return_exceptions=True` to handle exceptions`
But as I only have 1 worker locally, it makes everything waiting longerl. But I think generally it's useful to keep the existing async logic. Let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be benefit if we still keep it as async
I don't think so in this case, I think normal def
would be best
Also I thought it's always recommended to use async def for activities? Am I wrong on this?
No, actually the opposite, you only use async def
if you're sure you aren't going to block the thread. We had that async-default stance originally but we've changed documentation and courses and such to encourage just def
instead.
I was planning to add the following logic in workflows.py
No need, workflow is good IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed all functions to sync.
|
||
async def save_to_sink(data: pd.DataFrame, s3_bucket: str, write_path: str) -> str: | ||
"""Function that save object to s3 bucket.""" | ||
write_bytes = data.to_parquet(None, compression="snappy", index=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an expensive operation that blocks the asyncio event loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Samples do not need their own pyproject.toml. You can just add a group to the top-level one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above. Not be able to use the same pyproject.toml.
70a8366
to
9db26e6
Compare
workflow_runner=SandboxedWorkflowRunner( | ||
restrictions=SandboxRestrictions.default.with_passthrough_modules("boto3") | ||
), | ||
activity_executor=ThreadPoolExecutor(5), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
activity_executor=ThreadPoolExecutor(5), | |
activity_executor=ThreadPoolExecutor(100), |
Or should make the max_concurrent_activities
smaller, your choice, but thread pool in theory should not be less than max_concurrent_activities
.
417c557
to
5605bde
Compare
5605bde
to
9e82613
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! This is a great looking sample that is nice and simple to read.
Looks like there may be some issues with MyPy. You may have to debug those, but then this will be ready to merge.
poetry.lock
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like you inadvertently updated a lot of dependencies instead of just the ones for your project (this is probably causing the CI break). Usually we're ok with updating dependencies, but we may want it in a separate PR.
What was changed
Why?
Checklist
Closes
How was this tested: