-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(airbyte-cdk): add gzipjson decoder #20
Conversation
📝 Walkthrough<details>
<summary>📝 Walkthrough</summary>
## Walkthrough
The changes in this pull request primarily enhance the `declarative_component_schema.yaml` and related files by introducing new decoder classes, specifically `CustomDecoder` and `GzipJsonDecoder`. These additions allow for custom decoding strategies and support for Gzip-compressed JSON data. The `decoder` properties in both `SimpleRetriever` and `AsyncRetriever` are updated to reference these new decoders. Additionally, modifications to existing decoder classes improve their return types and parsing methods, while tests for the new functionality are added to ensure proper operation.
## Changes
| File | Change Summary |
|----------------------------------------------------------------------|---------------------------------------------------------------------------------------------------|
| `airbyte_cdk/sources/declarative/declarative_component_schema.yaml` | Added `CustomDecoder` and `GzipJsonDecoder` classes; updated `decoder` properties in retrievers. |
| `airbyte_cdk/sources/declarative/decoders/__init__.py` | Imported `GzipJsonDecoder` and updated `__all__` to include it. |
| `airbyte_cdk/sources/declarative/decoders/json_decoder.py` | Modified `decode` method return types; added `GzipJsonDecoder` class; refactored parsing logic. |
| `airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py` | Updated constructor parameter types for `LegacyToPerPartitionStateMigration`. |
| `airbyte_cdk/sources/declarative/models/declarative_component_schema.py` | Added `CustomDecoder` and `GzipJsonDecoder` classes; updated `decoder` fields in retrievers. |
| `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` | Integrated new decoders into `ModelToComponentFactory`; added method for `GzipJsonDecoder`. |
| `unit_tests/sources/declarative/decoders/test_json_decoder.py` | Added tests for `GzipJsonDecoder` to validate decoding of gzip-compressed JSON responses. |
## Sequence Diagram(s)
```mermaid
sequenceDiagram
participant Client
participant API
participant GzipJsonDecoder
participant JsonDecoder
Client->>API: Request Gzip JSON Data
API-->>Client: Gzip Compressed JSON Response
Client->>GzipJsonDecoder: Decode Gzip JSON
GzipJsonDecoder->>JsonDecoder: Parse JSON Body
JsonDecoder-->>GzipJsonDecoder: Parsed Data
GzipJsonDecoder-->>Client: Decoded Data Warning Rate limit exceeded@artem1205 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 17 minutes and 22 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
/fix-pr |
/autofix
|
Hi, @artem1205 - sorry for the mixup. New slash command is "/autofix" as you found. :) |
Signed-off-by: Artem Inzhyyants <[email protected]>
[skip ci] Signed-off-by: Artem Inzhyyants <[email protected]>
/autofix
|
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (7)
airbyte_cdk/sources/declarative/decoders/json_decoder.py (2)
28-52
: The changes look good! Quick suggestion about type hints.The implementation is clean and I like how you extracted the parse logic. Would you consider adding a type hint for the yield in the docstring to make it crystal clear what's being yielded? Something like:
def parse_body_json( body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]], ) -> Generator[MutableMapping[str, Any], None, None]: """Parse JSON body into a generator of records. Yields: MutableMapping[str, Any]: Individual records from the JSON body """wdyt? 🤔
84-90
: Interesting TODO about configurable delimiters!The TODO raises a good point. Would you like help implementing configurable delimiters? We could add an optional delimiter parameter with '\n' as the default.
Let me know if you'd like me to help draft the implementation or create an issue to track this enhancement!
unit_tests/sources/declarative/decoders/test_json_decoder.py (2)
127-199
: Great test implementation! Here are some suggestions to make it even better 🚀The test looks solid with good parametrization and realistic test data. Would you consider adding a few more test cases to make it more robust? Here's what I'm thinking:
- Edge cases (wdyt?):
@pytest.mark.parametrize( "test_case,response_to_compress,expected_length", [ ("empty_list", "[]", 0), ("single_item", "[{}]", 1), ("invalid_json", "{invalid}", None), # Should raise JSONDecodeError ] )
- The test data looks quite repetitive. Maybe we could make it more concise? Something like:
test_data = [{"campaignId": i, "campaignName": f"campaign-{i}"} for i in range(5)] response_to_compress = json.dumps(test_data)
- We might want to verify the actual content of decoded items, not just the count:
decoded_items = list(GzipJsonDecoder(parameters={}, encoding=encoding).decode(response)) assert len(decoded_items) == 5 assert decoded_items[0]["campaignId"] == 214078428 # Verify actual content
135-135
: How about adding a docstring to explain the test's purpose? 📝Would you consider adding something like this?
def test_gzipjson_decoder(requests_mock, encoding): """ Test GzipJsonDecoder's ability to decode gzip-compressed JSON data. Tests: - Successful decompression and JSON parsing - Support for different encodings (utf-8, utf) - Handling of multi-item JSON arrays """airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
704-711
: Consider adding more documentation for the encoding field, wdyt?The GzipJsonDecoder class looks good, but the encoding field could benefit from more detailed documentation about its purpose and supported values.
Consider adding documentation like this:
- encoding: Optional[str] = "utf-8" + encoding: Optional[str] = Field( + "utf-8", + description="The character encoding to use for decoding the gzipped JSON content. Common values include 'utf-8', 'ascii', etc.", + examples=["utf-8", "ascii", "latin1"], + title="Character Encoding", + )airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1560-1565
: Consider adding error handling for encoding parameter, wdyt?The implementation looks good, but we might want to validate the encoding parameter to ensure it's a valid encoding supported by Python's codecs. This would help catch configuration errors early.
Here's a suggested implementation:
@staticmethod def create_gzipjson_decoder( model: GzipJsonDecoderModel, config: Config, **kwargs: Any ) -> GzipJsonDecoder: + try: + import codecs + codecs.lookup(model.encoding) + except LookupError: + raise ValueError(f"Invalid encoding '{model.encoding}'. Please provide a valid Python encoding name.") return GzipJsonDecoder(parameters={}, encoding=model.encoding)airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
1775-1791
: The GzipJsonDecoder looks good! Would you consider adding more documentation?The decoder implementation is clean and follows the pattern of other decoders. Perhaps we could enhance the description to include an example use case or mention common scenarios where this would be useful, wdyt?
title: GzipJson Decoder - description: Use this if the response is Gzip compressed Json. + description: Use this if the response is Gzip compressed Json. This is commonly used for APIs that compress their responses to reduce bandwidth usage, such as bulk data exports or large dataset APIs.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (7)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(3 hunks)airbyte_cdk/sources/declarative/decoders/__init__.py
(1 hunks)airbyte_cdk/sources/declarative/decoders/json_decoder.py
(4 hunks)airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py
(2 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(3 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(7 hunks)unit_tests/sources/declarative/decoders/test_json_decoder.py
(3 hunks)
🔇 Additional comments (15)
airbyte_cdk/sources/declarative/decoders/json_decoder.py (3)
7-8
: LGTM! Clean import additions.
The new imports support the gzip functionality and provide more precise typing with MutableMapping.
66-68
: LGTM! Consistent type update.
The MutableMapping return type change aligns with the other decoders.
93-101
: The implementation looks good! A couple of thoughts for discussion:
-
Should we add error handling for invalid gzip data? The current implementation might raise cryptic errors if the content isn't properly gzipped. wdyt about wrapping the decompress call in a try-except?
-
Interesting point raised in the previous review about making this more generic. We could potentially split this into:
class GzipDecoder(Decoder): def __init__(self, inner_decoder: Decoder): self.inner_decoder = inner_decoder
This would allow composition like
GzipDecoder(JsonDecoder())
. However, I understand the current approach is simpler and meets the immediate needs. What are your thoughts on this trade-off? 🤔
Let's check how gzip errors are handled in similar code:
airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py (3)
7-12
: LGTM! Clean import organization.
The new imports for CustomPartitionRouter
and CustomIncrementalSync
are well-organized within the existing import group. 👍
39-40
: Nice type hint updates! More flexible now.
The constructor now accepts both custom and standard implementations for partition router and cursor. This aligns well with the PR's goal of supporting custom components. 🎯
39-40
: 🛠️ Refactor suggestion
Should we update the _get_partition_field parameter type hint?
I noticed that while the constructor accepts CustomPartitionRouter | SubstreamPartitionRouter
, the _get_partition_field
method still has just SubstreamPartitionRouter
in its type hint. This might cause type checking issues. Wdyt about updating it to match? 🤔
- def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str:
+ def _get_partition_field(self, partition_router: CustomPartitionRouter | SubstreamPartitionRouter) -> str:
Also applies to: 53-53
unit_tests/sources/declarative/decoders/test_json_decoder.py (1)
4-4
: LGTM! Clean import additions.
The new imports are well-organized and follow the existing pattern in the file.
Also applies to: 17-17
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
690-702
: LGTM! The CustomDecoder class looks well-structured.
The implementation follows the established patterns for custom components in the codebase, with proper documentation and field definitions. The class_name field's description clearly indicates the requirement for the decoder to be a subclass of Decoder.
1646-1655
: LGTM! The decoder updates in both retrievers are consistent.
The new CustomDecoder and GzipJsonDecoder have been properly added to both SimpleRetriever and AsyncRetriever's decoder Union types, maintaining consistency across the codebase.
Also applies to: 1715-1724
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
61-61
: LGTM!
The GzipJsonDecoder import is correctly added and alphabetically ordered.
412-412
: LGTM!
The CustomDecoderModel mapping is correctly added to support custom decoder implementations.
436-436
: LGTM!
The GzipJsonDecoderModel mapping is correctly added to support gzip JSON decoding.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)
1753-1774
: LGTM! The CustomDecoder definition looks well-structured.
The component follows the established patterns for custom components in the schema, with clear documentation and proper type definitions.
2446-2451
: LGTM! The SimpleRetriever decoder references are properly updated.
The new decoders are correctly added to the list of available decoders, maintaining alphabetical order and consistent formatting.
2564-2569
: LGTM! The AsyncRetriever decoder references mirror the SimpleRetriever changes.
The changes are consistent with the SimpleRetriever updates, ensuring both retriever types support the new decoders.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (3)
airbyte_cdk/sources/declarative/decoders/json_decoder.py (3)
6-8
: Consider documenting the rationale for MutableMapping.I notice we're switching from
Mapping
toMutableMapping
in the imports. This allows modification of the decoded data, which could be useful for transformations. Wdyt about adding a brief comment explaining why mutability is needed? This would help future maintainers understand the design choice.
84-90
: Consider addressing the TODO about delimiters.The TODO comment references an internal issue about configurable delimiters. Since we're touching this code, would you like to tackle this enhancement now? I can help draft the implementation if you're interested.
93-101
: Consider enhancing error handling for gzip operations.The implementation looks clean! However, we might want to handle gzip-specific errors separately from JSON parsing errors. Wdyt about catching and logging specific exceptions like
OSError
for corrupt gzip data?Also, regarding the earlier discussion about genericity - I agree with the current approach of keeping it simple. We can always abstract it later if we need more combinations of compression and formats.
def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: - raw_string = decompress(response.content).decode(encoding=self.encoding) - yield from self.parse_body_json(orjson.loads(raw_string)) + try: + raw_string = decompress(response.content).decode(encoding=self.encoding) + yield from self.parse_body_json(orjson.loads(raw_string)) + except OSError as e: + logger.warning(f"Failed to decompress gzip content: {e}") + yield {}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/decoders/json_decoder.py
(4 hunks)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/decoders/json_decoder.py (2)
28-36
: LGTM! Nice refactoring of the decode method.
The separation of concerns between decode
and parse_body_json
is clean and reusable. The type hints are precise and helpful.
43-52
: Consider adding validation for non-dict items in the list.
The parse_body_json
method handles empty lists and non-list inputs well. However, what if the list contains non-dict items? Wdyt about adding a type check in the yield loop to ensure we're only yielding dictionaries?
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (4)
airbyte_cdk/sources/declarative/decoders/json_decoder.py (4)
8-8
: Type hints look good! Small suggestion on ordering.The type hints are more precise now. What do you think about ordering them alphabetically for consistency? Like:
Any, Generator, List, Mapping, MutableMapping, Optional
? wdyt?
37-41
: Consider enhancing the error message?The current warning message shows the raw response which might be too verbose. What do you think about adding more context about what specifically failed during JSON parsing? Maybe include the error message from the JSONDecodeError? wdyt?
- f"Response cannot be parsed into json: {response.status_code=}, {response.text=}" + f"Failed to parse JSON response (status={response.status_code}): {str(e)}"
93-95
: Add docstring explaining the purpose?Would it be helpful to add a docstring explaining when to use this decoder and what encoding values are supported? wdyt?
106-110
: Consider adding error handling for decompression?The decode method could fail if the content isn't actually gzipped. What do you think about adding try/except to handle decompression errors gracefully? wdyt?
Also, regarding the previous discussion about making this more generic - we could consider a composition-based approach in the future where a GzipDecoder wraps any other decoder. But that would be a larger refactor, so this implementation looks good for now!
def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: - raw_string = decompress(response.content).decode(encoding=self.encoding or "utf-8") - yield from self.parse_body_json(orjson.loads(raw_string)) + try: + raw_string = decompress(response.content).decode(encoding=self.encoding or "utf-8") + yield from self.parse_body_json(orjson.loads(raw_string)) + except OSError as e: + logger.error(f"Failed to decompress gzipped content: {str(e)}") + yield {}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/decoders/json_decoder.py
(4 hunks)airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py
(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/decoders/json_decoder.py (4)
4-4
: LGTM! Clean standard library imports.
The new imports are well-organized and necessary for the gzip functionality.
Also applies to: 7-7
28-36
: Nice refactoring of the decode method!
The extraction of parsing logic and handling of empty lists improves code organization.
66-68
: LGTM! Consistent type system improvements.
The return type changes to MutableMapping are consistent across all decoders.
Also applies to: 84-86
97-104
: Nice encoding validation!
Good catch on validating the encoding early in post_init. This prevents issues later during actual decoding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of comments around typing but the rest seems fine
airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
436-436
: Consider aligning method naming with other decodersThe mapping uses
create_gzipjson_decoder
while other similar decoders use underscore format (e.g.,create_json_decoder
). Would you consider renaming tocreate_gzip_json_decoder
for consistency, wdyt?
1565-1570
: Consider adding docstring and handling model parametersA few suggestions to align with other decoder implementations:
- Add docstring documentation like other methods
- Pass model parameters to the decoder:
@staticmethod def create_gzipjson_decoder( model: GzipJsonDecoderModel, config: Config, **kwargs: Any ) -> GzipJsonDecoder: - return GzipJsonDecoder(parameters={}, encoding=model.encoding) + return GzipJsonDecoder(parameters=model.parameters or {}, encoding=model.encoding)What do you think about these changes to maintain consistency with other decoder implementations?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py
(3 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(7 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
61-61
: LGTM! Import follows existing patterns
The GzipJsonDecoder import is correctly placed in alphabetical order within the decoders import group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
What
resolving amazon ads low-code async migration
add custom decoder to implement gzip jsonl decoder
How
add custom decoder
Review guide
airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml
airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
User Impact
Can this PR be safely reverted and rolled back?
Summary by CodeRabbit
New Features
CustomDecoder
andGzipJsonDecoder
for enhanced data decoding capabilities.ModelToComponentFactory
to support new decoders.Bug Fixes
Tests
GzipJsonDecoder
to ensure proper handling of gzip-compressed JSON responses.