Skip to content

Commit

Permalink
Add record/replay support (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
peterallenwebb authored Jul 16, 2024
1 parent 7a4c021 commit bc042ce
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 9 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240716-172442.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add support for experimental record/replay testing.
time: 2024-07-16T17:24:42.271859-04:00
custom:
Author: peterallenwebb
Issue: "123"
34 changes: 25 additions & 9 deletions dbt/adapters/postgres/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
from dbt.adapters.contracts.connection import AdapterResponse, Credentials
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.events.types import TypeCodeNotFound
from dbt.adapters.postgres.record import PostgresRecordReplayHandle
from dbt.adapters.sql import SQLConnectionManager
from dbt_common.exceptions import DbtDatabaseError, DbtRuntimeError
from dbt_common.events.functions import warn_or_error
from dbt_common.helper_types import Port
from dbt_common.record import get_record_mode_from_env, RecorderMode
from mashumaro.jsonschema.annotations import Maximum, Minimum
import psycopg2
from typing_extensions import Annotated
Expand Down Expand Up @@ -132,17 +134,31 @@ def open(cls, connection):
kwargs["application_name"] = credentials.application_name

def connect():
handle = psycopg2.connect(
dbname=credentials.database,
user=credentials.user,
host=credentials.host,
password=credentials.password,
port=credentials.port,
connect_timeout=credentials.connect_timeout,
**kwargs,
)
handle = None

# In replay mode, we won't connect to a real database at all, while
# in record and diff modes we do, but insert an intermediate handle
# object which monitors native connection activity.
rec_mode = get_record_mode_from_env()
if rec_mode != RecorderMode.REPLAY:
handle = psycopg2.connect(
dbname=credentials.database,
user=credentials.user,
host=credentials.host,
password=credentials.password,
port=credentials.port,
connect_timeout=credentials.connect_timeout,
**kwargs,
)

if rec_mode is not None:
# If using the record/replay mechanism, regardless of mode, we
# use a wrapper.
handle = PostgresRecordReplayHandle(handle, connection)

if credentials.role:
handle.cursor().execute("set role {}".format(credentials.role))

return handle

retryable_exceptions = [
Expand Down
2 changes: 2 additions & 0 deletions dbt/adapters/postgres/record/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from dbt.adapters.postgres.record.cursor.cursor import PostgresRecordReplayCursor
from dbt.adapters.postgres.record.handle import PostgresRecordReplayHandle
15 changes: 15 additions & 0 deletions dbt/adapters/postgres/record/cursor/cursor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from dbt_common.record import record_function

from dbt.adapters.record import RecordReplayCursor

from dbt.adapters.postgres.record.cursor.status import CursorGetStatusMessageRecord


class PostgresRecordReplayCursor(RecordReplayCursor):
"""A custom extension of RecordReplayCursor that adds the statusmessage
property which is specific to psycopg."""

@property
@record_function(CursorGetStatusMessageRecord, method=True, id_field_name="connection_name")
def statusmessage(self):
return self.native_cursor.statusmessage
21 changes: 21 additions & 0 deletions dbt/adapters/postgres/record/cursor/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import dataclasses
from typing import Optional

from dbt_common.record import Record, Recorder


@dataclasses.dataclass
class CursorGetStatusMessageParams:
connection_name: str


@dataclasses.dataclass
class CursorGetStatusMessageResult:
msg: Optional[str]


@Recorder.register_record_type
class CursorGetStatusMessageRecord(Record):
params_cls = CursorGetStatusMessageParams
result_cls = CursorGetStatusMessageResult
group = "Database"
12 changes: 12 additions & 0 deletions dbt/adapters/postgres/record/handle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dbt.adapters.record import RecordReplayHandle

from dbt.adapters.postgres.record.cursor.cursor import PostgresRecordReplayCursor


class PostgresRecordReplayHandle(RecordReplayHandle):
"""A custom extension of RecordReplayHandle that returns
a psycopg-specific PostgresRecordReplayCursor object."""

def cursor(self):
cursor = None if self.native_handle is None else self.native_handle.cursor()
return PostgresRecordReplayCursor(cursor, self.connection)

0 comments on commit bc042ce

Please sign in to comment.