diff --git a/sources/pg_replication/helpers.py b/sources/pg_replication/helpers.py index 71e97497e..27a4692cf 100644 --- a/sources/pg_replication/helpers.py +++ b/sources/pg_replication/helpers.py @@ -603,20 +603,20 @@ def process_msg(self, msg: ReplicationMessage) -> None: - `target_batch_size` is reached - a table's schema has changed """ - op = (msg.payload[:1]).decode("utf-8") - if op == "B": - self.last_commit_ts = Begin(msg.payload).commit_ts - elif op == "C": - self.process_commit(msg) - elif op == "R": - self.process_relation(Relation(msg.payload)) - elif op == "I": + op = msg.payload[:1] + if op == b"I": self.process_change(Insert(msg.payload), msg.data_start) - elif op == "U": + elif op == b"U": self.process_change(Update(msg.payload), msg.data_start) - elif op == "D": + elif op == b"D": self.process_change(Delete(msg.payload), msg.data_start) - elif op == "T": + elif op == b"B": + self.last_commit_ts = Begin(msg.payload).commit_ts + elif op == b"C": + self.process_commit(msg) + elif op == b"R": + self.process_relation(Relation(msg.payload)) + elif op == b"T": logger.warning( "The truncate operation is currently not supported. " "Truncate replication messages are ignored."