diff --git a/airbyte_cdk/sources/declarative/interpolation/jinja.py b/airbyte_cdk/sources/declarative/interpolation/jinja.py index ec5e861c..3bb9b0c2 100644 --- a/airbyte_cdk/sources/declarative/interpolation/jinja.py +++ b/airbyte_cdk/sources/declarative/interpolation/jinja.py @@ -4,7 +4,7 @@ import ast from functools import cache -from typing import Any, Mapping, Optional, Tuple, Type +from typing import Any, Mapping, Optional, Set, Tuple, Type from jinja2 import meta from jinja2.environment import Template @@ -27,7 +27,35 @@ class StreamPartitionAccessEnvironment(SandboxedEnvironment): def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool: if attr in ["_partition"]: return True - return super().is_safe_attribute(obj, attr, value) + return super().is_safe_attribute(obj, attr, value) # type: ignore # for some reason, mypy says 'Returning Any from function declared to return "bool"' + + +# These aliases are used to deprecate existing keywords without breaking all existing connectors. +_ALIASES = { + "stream_interval": "stream_slice", # Use stream_interval to access incremental_sync values + "stream_partition": "stream_slice", # Use stream_partition to access partition router's values +} + +# These extensions are not installed so they're not currently a problem, +# but we're still explicitly removing them from the jinja context. +# At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks +_RESTRICTED_EXTENSIONS = ["jinja2.ext.loopcontrols"] # Adds support for break continue in loops + +# By default, these Python builtin functions are available in the Jinja context. +# We explicitly remove them because of the potential security risk. +# Please add a unit test to test_jinja.py when adding a restriction. +_RESTRICTED_BUILTIN_FUNCTIONS = [ + "range" +] # The range function can cause very expensive computations + +_ENVIRONMENT = StreamPartitionAccessEnvironment() +_ENVIRONMENT.filters.update(**filters) +_ENVIRONMENT.globals.update(**macros) + +for extension in _RESTRICTED_EXTENSIONS: + _ENVIRONMENT.extensions.pop(extension, None) +for builtin in _RESTRICTED_BUILTIN_FUNCTIONS: + _ENVIRONMENT.globals.pop(builtin, None) class JinjaInterpolation(Interpolation): @@ -48,34 +76,6 @@ class JinjaInterpolation(Interpolation): Additional information on jinja templating can be found at https://jinja.palletsprojects.com/en/3.1.x/templates/# """ - # These aliases are used to deprecate existing keywords without breaking all existing connectors. - ALIASES = { - "stream_interval": "stream_slice", # Use stream_interval to access incremental_sync values - "stream_partition": "stream_slice", # Use stream_partition to access partition router's values - } - - # These extensions are not installed so they're not currently a problem, - # but we're still explicitely removing them from the jinja context. - # At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks - RESTRICTED_EXTENSIONS = ["jinja2.ext.loopcontrols"] # Adds support for break continue in loops - - # By default, these Python builtin functions are available in the Jinja context. - # We explicitely remove them because of the potential security risk. - # Please add a unit test to test_jinja.py when adding a restriction. - RESTRICTED_BUILTIN_FUNCTIONS = [ - "range" - ] # The range function can cause very expensive computations - - def __init__(self) -> None: - self._environment = StreamPartitionAccessEnvironment() - self._environment.filters.update(**filters) - self._environment.globals.update(**macros) - - for extension in self.RESTRICTED_EXTENSIONS: - self._environment.extensions.pop(extension, None) - for builtin in self.RESTRICTED_BUILTIN_FUNCTIONS: - self._environment.globals.pop(builtin, None) - def eval( self, input_str: str, @@ -86,7 +86,7 @@ def eval( ) -> Any: context = {"config": config, **additional_parameters} - for alias, equivalent in self.ALIASES.items(): + for alias, equivalent in _ALIASES.items(): if alias in context: # This is unexpected. We could ignore or log a warning, but failing loudly should result in fewer surprises raise ValueError( @@ -105,6 +105,7 @@ def eval( raise Exception(f"Expected a string, got {input_str}") except UndefinedError: pass + # If result is empty or resulted in an undefined error, evaluate and return the default string return self._literal_eval(self._eval(default, context), valid_types) @@ -132,16 +133,16 @@ def _eval(self, s: Optional[str], context: Mapping[str, Any]) -> Optional[str]: return s @cache - def _find_undeclared_variables(self, s: Optional[str]) -> set[str]: + def _find_undeclared_variables(self, s: Optional[str]) -> Set[str]: """ Find undeclared variables and cache them """ - ast = self._environment.parse(s) # type: ignore # parse is able to handle None + ast = _ENVIRONMENT.parse(s) # type: ignore # parse is able to handle None return meta.find_undeclared_variables(ast) @cache - def _compile(self, s: Optional[str]) -> Template: + def _compile(self, s: str) -> Template: """ We must cache the Jinja Template ourselves because we're using `from_string` instead of a template loader """ - return self._environment.from_string(s) # type: ignore [arg-type] # Expected `str | Template` but passed `str | None` + return _ENVIRONMENT.from_string(s)