Skip to content

Commit

Permalink
Revise to get all engine adapters; adapt parallel integration tests t…
Browse files Browse the repository at this point in the history
…o support this
  • Loading branch information
themisvaltinos committed Jan 3, 2025
1 parent 03812b0 commit e9bc257
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 25 deletions.
18 changes: 4 additions & 14 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,10 @@ def engine_adapter(self) -> EngineAdapter:
@property
def snapshot_evaluator(self) -> SnapshotEvaluator:
if not self._snapshot_evaluator:
if snapshot_gateways := self._snapshot_gateways:
self._create_engine_adapters(set(snapshot_gateways.values()))
self._snapshot_evaluator = SnapshotEvaluator(
{
gateway: adapter.with_log_level(logging.INFO)
for gateway, adapter in self._engine_adapters.items()
for gateway, adapter in self.engine_adapters.items()
},
ddl_concurrent_tasks=self.concurrent_tasks,
selected_gateway=self.selected_gateway,
Expand Down Expand Up @@ -2025,21 +2023,13 @@ def _snapshot_gateways(self) -> t.Dict[str, str]:

@cached_property
def engine_adapters(self) -> t.Dict[str, EngineAdapter]:
"""Returns all engine adapters for the gateways defined in the configs."""
self._create_engine_adapters()
return self._engine_adapters

def _create_engine_adapters(self, gateways: t.Optional[t.Set] = None) -> None:
"""Create engine adapters for the gateways, when none provided include all defined in the configs."""

"""Returns all the engine adapters for the gateways defined in the configuration."""
for gateway_name in self.config.gateways:
if gateway_name != self.selected_gateway and (
gateways is None or gateway_name in gateways
):
if gateway_name != self.selected_gateway:
connection = self.config.get_connection(gateway_name)
adapter = connection.create_engine_adapter()
self.concurrent_tasks = min(self.concurrent_tasks, connection.concurrent_tasks)
self._engine_adapters[gateway_name] = adapter
return self._engine_adapters

def _get_engine_adapter(self, gateway: t.Optional[str] = None) -> EngineAdapter:
if gateway:
Expand Down
1 change: 1 addition & 0 deletions tests/core/engine_adapter/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ def create_context(
],
)
if config_mutator:
config.gateways = {self.gateway: config.gateways[self.gateway]}
config_mutator(self.gateway, config)

gateway_config = config.gateways[self.gateway]
Expand Down
4 changes: 4 additions & 0 deletions tests/core/engine_adapter/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,8 @@ def test_sushi(ctx: TestContext, tmp_path_factory: pytest.TempPathFactory):
personal_paths=[pathlib.Path("~/.sqlmesh/config.yaml").expanduser()],
)

# To enable parallelism in integration tests
config.gateways = {ctx.gateway: config.gateways[ctx.gateway]}
current_gateway_config = config.gateways[ctx.gateway]
current_gateway_config.state_schema = sushi_state_schema

Expand Down Expand Up @@ -1730,6 +1732,8 @@ def _normalize_snowflake(name: str, prefix_regex: str = "(sqlmesh__)(.*)"):
if config.model_defaults.dialect != ctx.dialect:
config.model_defaults = config.model_defaults.copy(update={"dialect": ctx.dialect})

# To enable parallelism in integration tests
config.gateways = {ctx.gateway: config.gateways[ctx.gateway]}
current_gateway_config = config.gateways[ctx.gateway]

if ctx.dialect == "athena":
Expand Down
8 changes: 3 additions & 5 deletions tests/core/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,10 +757,8 @@ def test_multi_gateway_config(tmp_path, mocker: MockerFixture):
new_callable=mocker.PropertyMock(return_value={"snapshot": "athena"}),
)

ctx._create_engine_adapters()

assert isinstance(ctx._connection_config, RedshiftConnectionConfig)
assert len(ctx._engine_adapters) == 2
assert isinstance(ctx._engine_adapters["athena"], AthenaEngineAdapter)
assert isinstance(ctx._engine_adapters["redshift"], RedshiftEngineAdapter)
assert len(ctx.engine_adapters) == 2
assert isinstance(ctx.engine_adapters["athena"], AthenaEngineAdapter)
assert isinstance(ctx.engine_adapters["redshift"], RedshiftEngineAdapter)
assert ctx.engine_adapter == ctx._get_engine_adapter("redshift")
3 changes: 1 addition & 2 deletions tests/core/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,7 @@ def test_gateway_specific_adapters(copy_to_temp_path, mocker):

ctx = Context(paths=path, config="isolated_systems_config")

ctx._create_engine_adapters({"test"})
assert len(ctx._engine_adapters) == 2
assert len(ctx.engine_adapters) == 3
assert ctx.engine_adapter == ctx._get_engine_adapter()
assert ctx._get_engine_adapter("test") == ctx._engine_adapters["test"]

Expand Down
3 changes: 0 additions & 3 deletions tests/core/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6635,7 +6635,6 @@ def test_gateway_specific_render(assert_exp_eq) -> None:
)
context = Context(config=config)
assert context.engine_adapter == context._engine_adapters["main"]
assert len(context._engine_adapters) == 1

@model(
name="dummy_model",
Expand All @@ -6652,8 +6651,6 @@ def dummy_model_entry(evaluator: MacroEvaluator) -> exp.Select:
assert isinstance(dummy_model, SqlModel)
assert dummy_model.gateway == "duckdb"

# Calling render with a model with a non-default gateway should create
# the engine adapters and render with the model specified gateway
assert_exp_eq(
context.render("dummy_model"),
"""
Expand Down
1 change: 0 additions & 1 deletion tests/core/test_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2021,7 +2021,6 @@ def test_test_with_gateway_specific_model(tmp_path: Path, mocker: MockerFixture)
)

assert context.engine_adapter == context._engine_adapters["main"]
assert len(context._engine_adapters) == 1
with pytest.raises(
SQLMeshError, match=r"Gateway 'wrong' not found in the available engine adapters."
):
Expand Down

0 comments on commit e9bc257

Please sign in to comment.