Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Low-Code Concurrent CDK): Refactor the low-code AsyncRetriever to use an underlying StreamSlicer #170

Merged
merged 4 commits into from
Dec 18, 2024

Conversation

brianjlai
Copy link
Contributor

@brianjlai brianjlai commented Dec 13, 2024

What is the issue

Something uncovered while using the latest version of the Concurrent CDK on source-sendgrid was that because it relied on the assumption all Retrievers had an underlying stream_slicer defined. This however was not the way the AsyncRetriever was implemented.

Implementation Details

To fix this, I've refactored the AsyncRetriever to follow a more established pattern exhibited by our existing Retrievers. Now the retriever will always have an underlying stream_slicer that can be invoked to generate the partitions within the concurrent framework. This allows us to continue to use the StreamSlicerPartitionGenerator.

This change was also designed to be non-breaking because we are not changing the developer facing interface. Instead we use the existing AsyncRetriever fields to construct the AsyncJobPartitionGenerator which is effectively an implementation detail.

Note:
This will not completely give us the ability to run streams using the AsyncRetriever in the concurrent framework. This just fixed the first issue identified in #168. I will work on a follow up PR that addresses the second issue. But I wanted to separate the PRs so I can release this refactor which should be a no-op for existing connectors like source-sendgrid and we should see no change in behavior. And because of that, I have not ungated the async report streams concurrent_declarative_source.py since we need to address part 2.

todo: add unit tests to AsyncJobPartitionRouter

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Introduced AsyncJobPartitionRouter for improved management of asynchronous job handling.
    • Added AsyncRetriever component for managing asynchronous job operations.
  • Bug Fixes

    • Streamlined logic in AsyncRetriever to enhance performance and reduce complexity.
  • Tests

    • Added tests for the new AsyncRetriever and AsyncJobPartitionRouter components to ensure proper functionality and integration.
    • Created unit tests for AsyncJobPartitionRouter to validate its behavior with different partitioning scenarios.

@brianjlai brianjlai requested review from maxi297 and tolik0 December 13, 2024 05:27
@brianjlai brianjlai marked this pull request as ready for review December 13, 2024 05:27
Copy link
Contributor

coderabbitai bot commented Dec 13, 2024

📝 Walkthrough
📝 Walkthrough

Walkthrough

The changes in this pull request introduce the AsyncJobPartitionRouter class and modify the AsyncRetriever class to utilize this new router for managing asynchronous job retrieval. The create_async_retriever method in ModelToComponentFactory has been updated to instantiate AsyncJobPartitionRouter, enhancing modularity. The AsyncJobPartitionRouter class is designed to handle job creation, monitoring, and stream slicing, while the AsyncRetriever class has been streamlined by removing unnecessary orchestration logic. Additionally, tests have been updated to reflect these changes, ensuring proper integration of the new components.

Changes

File Path Change Summary
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Added import for AsyncJobPartitionRouter; modified create_async_retriever to use this router.
airbyte_cdk/sources/declarative/partition_routers/__init__.py Introduced AsyncJobPartitionRouter and updated __all__ to include it.
airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py Added new class AsyncJobPartitionRouter for managing asynchronous job creation and monitoring.
airbyte_cdk/sources/declarative/retrievers/async_retriever.py Updated AsyncRetriever to use AsyncJobPartitionRouter instead of SinglePartitionRouter; removed job orchestrator logic.
unit_tests/sources/declarative/async_job/test_integration.py Updated MockSource to instantiate AsyncJobPartitionRouter instead of SinglePartitionRouter.
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py Added tests for AsyncRetriever and AsyncJobPartitionRouter to ensure proper integration and functionality.

Possibly related PRs


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ec98aa3 and 588fdb7.

📒 Files selected for processing (2)
  • unit_tests/sources/declarative/async_job/test_integration.py (3 hunks)
  • unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py
  • unit_tests/sources/declarative/async_job/test_integration.py

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Experiment)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (5)
airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py (1)

51-56: Exception handling in fetch_records method is appropriate

Raising an AirbyteTracedException when the job orchestrator is not initialized is a good way to alert developers of improper usage. Have you considered adding guidance on how to properly initialize stream_slices() before calling fetch_records()? Wdyt?

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)

3305-3396: Consider expanding tests to cover behavior of AsyncRetriever

While the current test verifies the instantiation and structure of the AsyncRetriever components, would it make sense to include tests that assert the actual behavior, such as making API calls or handling responses? This could help catch potential integration issues early. Wdyt?

unit_tests/sources/declarative/async_job/test_integration.py (1)

85-94: Consider making the job tracker limit configurable?

Currently using _NO_LIMIT (10000) for JobTracker. Would it make sense to make this configurable through test parameters to allow testing different scenarios? wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

2152-2160: Consider implementing parent stream bulk detection?

The has_bulk_parent is currently hardcoded to False. Should we implement the detection logic now to avoid technical debt? This could prevent potential issues if bulk parent streams are added later. wdyt?


2156-2157: Consider adding validation for job limit configuration?

The JobTracker is created with a hard limit of 1 job. When implementing the configurable limit mentioned in the FIXME comment, should we add validation to ensure the limit is positive and reasonable? wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ceebfda and a5321da.

📒 Files selected for processing (6)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2 hunks)
  • airbyte_cdk/sources/declarative/partition_routers/__init__.py (1 hunks)
  • airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/async_retriever.py (5 hunks)
  • unit_tests/sources/declarative/async_job/test_integration.py (2 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (7 hunks)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py (1)

39-49: Great use of dictionary merging in stream_slices method

The merging of partition data with the completed partition information using the | operator is a clean and efficient approach. This ensures all relevant data is included in the StreamSlice. Nice work!

airbyte_cdk/sources/declarative/partition_routers/__init__.py (1)

5-19: Addition of AsyncJobPartitionRouter is correctly integrated

Including AsyncJobPartitionRouter in the imports and the __all__ list ensures it is properly exposed for use. Good job on keeping the module's exports up to date!

airbyte_cdk/sources/declarative/retrievers/async_retriever.py (1)

Line range hint 34-96: Refactor of AsyncRetriever enhances simplicity and clarity

Simplifying the AsyncRetriever by utilizing AsyncJobPartitionRouter directly in the stream_slicer streamlines the code and reduces unnecessary complexity. This makes the codebase more maintainable. Nice work!

unit_tests/sources/declarative/async_job/test_integration.py (1)

23-25: LGTM!

The import is correctly placed and follows Python import conventions.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

311-313: LGTM!

The import is correctly placed and follows Python import conventions.


2167-2172: LGTM!

The AsyncRetriever instantiation looks good with the new AsyncJobPartitionRouter integration.

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm fine with those changes but I'd like to see the tests to confirm

@brianjlai brianjlai changed the title bug(Low-Code Concurrent CDK): Refactor the low-code AsyncRetriever to use an underlying StreamSlicer fix(Low-Code Concurrent CDK): Refactor the low-code AsyncRetriever to use an underlying StreamSlicer Dec 13, 2024
@brianjlai
Copy link
Contributor Author

I think I'm fine with those changes but I'd like to see the tests to confirm

Yup definitely. I'm adding a couple more unit tests today and i tested this locally on Sendgrid, but I plan to regression test this against live traffic on source-sendgrid to validate that this is a drop in replacement end to end.

@github-actions github-actions bot added the bug Something isn't working label Dec 13, 2024
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (4)
unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py (3)

20-20: Consider adding a docstring for the _NO_LIMIT constant?

Adding a brief docstring explaining the purpose and significance of this value would help future maintainers understand its role in the tests. wdyt?


23-48: Consider adding error scenario test cases?

The happy path is well covered, but we might want to add tests for error scenarios. Some suggestions:

  1. Invalid configuration
  2. Failed job status
  3. Empty stream slices

Would you like me to help draft these additional test cases?


23-82: Consider adding test documentation and parameterization?

A few suggestions to improve test maintainability:

  1. Add docstrings describing the test scenarios and their purpose
  2. Consider using @pytest.mark.parametrize for testing different partition values and configurations
  3. Add comments explaining the expected behavior in each assertion block

This would make it easier for other developers to understand and maintain these tests. Thoughts?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2238-2239: Consider improving the comment about concurrent job limitations.

The FIXME comment could be more descriptive about why we're currently limiting to 1 concurrent job and what conditions need to be met to make it configurable. This would help future contributors understand the constraints and requirements.

What about updating it to something like this?

-                JobTracker(1),
-                # FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1
+                JobTracker(1),
+                # TODO: Make the number of concurrent jobs configurable per connector.
+                # Currently limited to 1 for safety, but some connectors like source-salesforce 
+                # successfully use 5 concurrent jobs. This requires:
+                # 1. Connector-specific configuration
+                # 2. API-specific rate limiting handling
+                # 3. Testing with various load patterns
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a5321da and ec98aa3.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2 hunks)
  • airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py (1 hunks)
  • unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

326-328: LGTM!

Clean import statement for the new AsyncJobPartitionRouter.


2249-2254: LGTM!

The AsyncRetriever instantiation looks good, using the new AsyncJobPartitionRouter as the stream slicer.


2234-2242: Consider making the number of concurrent jobs configurable.

The current implementation hardcodes the JobTracker to 1 concurrent job, but as noted in the existing comment, some connectors like source-salesforce successfully use 5 concurrent jobs in production.

This limitation could impact performance for connectors that can handle multiple concurrent jobs. Additionally, the comment about bulk parent streams suggests there might be more work needed in this area.

Let's verify the concurrent job usage in other connectors:

@brianjlai
Copy link
Contributor Author

live test results: https://github.com/airbytehq/airbyte/actions/runs/12360671390

results analysis:

  • Expected record count for contacts (the only async reports stream) matches
  • the mismatches on the config where is_resumable is going from True -> False is expected since this is moving more full refresh streams to concurrent which doesn't have RFR
  • there is an added change to the manifest because 6.7.2 made a change where the ResponseToFileExtractor was no longer assigned at default. This might have fixed a bug, but Sendgrid relied on this and since its an isolated use case, it makes more sense to just fix the manifest once if we're made the CDK work properly. line of code here: https://github.com/airbytehq/airbyte-python-cdk/pull/50/files#diff-1d9bc4ca384e5c00867da05e9db9d919aba908c59bb73aab83883b9ed20def05R2072

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks @brianjlai

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants