Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add Risingwave Engine Adapter #3270

Closed
wants to merge 10 commits into from
1 change: 1 addition & 0 deletions .circleci/continue_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ workflows:
- spark
- clickhouse
- clickhouse-cluster
- risingwave
- engine_tests_cloud:
name: cloud_engine_<< matrix.engine >>
context:
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ spark-test: engine-spark-up
trino-test: engine-trino-up
pytest -n auto -x -m "trino or trino_iceberg or trino_delta" --retries 3 --junitxml=test-results/junit-trino.xml

risingwave-test: engine-risingwave-up
pytest -n auto -x -m "risingwave" --retries 3 --junitxml=test-results/junit-risingwave.xml

#################
# Cloud Engines #
#################
Expand Down
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@
"sse-starlette>=0.2.2",
"pyarrow",
],
"risingwave": [
"psycopg2",
],
},
classifiers=[
"Intended Audience :: Developers",
Expand Down
42 changes: 42 additions & 0 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1671,6 +1671,48 @@ def get_catalog(self) -> t.Optional[str]:
return self.catalog_name


class RisingwaveConnectionConfig(ConnectionConfig):
host: str
user: str
password: str
port: int
database: str
keepalives_idle: t.Optional[int] = None
connect_timeout: int = 10
role: t.Optional[str] = None
sslmode: t.Optional[str] = None

concurrent_tasks: int = 4
register_comments: bool = True
pre_ping: bool = True

type_: Literal["risingwave"] = Field(alias="type", default="risingwave")

@property
def _connection_kwargs_keys(self) -> t.Set[str]:
return {
"host",
"user",
"password",
"port",
"database",
"keepalives_idle",
"connect_timeout",
"role",
"sslmode",
}

@property
def _engine_adapter(self) -> t.Type[EngineAdapter]:
return engine_adapter.RisingwaveEngineAdapter

@property
def _connection_factory(self) -> t.Callable:
from psycopg2 import connect

return connect


CONNECTION_CONFIG_TO_TYPE = {
# Map all subclasses of ConnectionConfig to the value of their `type_` field.
tpe.all_field_infos()["type_"].default: tpe
Expand Down
2 changes: 2 additions & 0 deletions sqlmesh/core/engine_adapter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from sqlmesh.core.engine_adapter.spark import SparkEngineAdapter
from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter
from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter
from sqlmesh.core.engine_adapter.risingwave import RisingwaveEngineAdapter

DIALECT_TO_ENGINE_ADAPTER = {
"hive": SparkEngineAdapter,
Expand All @@ -33,6 +34,7 @@
"mssql": MSSQLEngineAdapter,
"trino": TrinoEngineAdapter,
"athena": AthenaEngineAdapter,
"risingwave": RisingwaveEngineAdapter,
}

DIALECT_ALIASES = {
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class EngineAdapter:
SUPPORTS_REPLACE_TABLE = True
DEFAULT_CATALOG_TYPE = DIALECT
QUOTE_IDENTIFIERS_IN_VIEWS = True
SUPPORTS_RW_SINK = False

def __init__(
self,
Expand Down
Loading