diff --git a/weave/trace_server/clickhouse_trace_server_migrator.py b/weave/trace_server/clickhouse_trace_server_migrator.py index 9b5074330c0..b174be7a6e8 100644 --- a/weave/trace_server/clickhouse_trace_server_migrator.py +++ b/weave/trace_server/clickhouse_trace_server_migrator.py @@ -2,6 +2,7 @@ import logging import os from typing import Optional +import re from clickhouse_connect.driver.client import Client as CHClient @@ -40,7 +41,24 @@ def __init__( self._initialize_migration_db() def _format_replicated_sql(self, sql_query: str) -> str: - return sql_query + """Format SQL query to use replicated engines if replicated mode is enabled. + + Converts MergeTree engine variants to their Replicated counterparts: + - MergeTree -> ReplicatedMergeTree + - SummingMergeTree -> ReplicatedSummingMergeTree + - etc. + """ + if not self.replicated: + return sql_query + + # Match "ENGINE = MergeTree" followed by word boundary + pattern = r'ENGINE\s*=\s*(\w+)?MergeTree\b' + + def replace_engine(match): + engine_prefix = match.group(1) or "" + return f'ENGINE = Replicated{engine_prefix}MergeTree' + + return re.sub(pattern, replace_engine, sql_query, flags=re.IGNORECASE) def _create_db_sql(self, db_name: str) -> str: replicated_engine = ""