Skip to content

Commit

Permalink
DCV-3111 bluegreen: wait for queries ending between steps
Browse files Browse the repository at this point in the history
  • Loading branch information
BAntonellini committed Dec 19, 2024
1 parent abdae19 commit 95a67d9
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions dbt_coves/tasks/blue_green/clone_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from rich.console import Console
from snowflake.connector import DictCursor
from snowflake.connector.connection import SnowflakeConnection

console = Console()

Expand Down Expand Up @@ -140,7 +141,8 @@ def __init__(self, con, threads):
self.threads = threads
self.register_command_thread = 0
self.thread_commands = [[] for _ in range(self.threads)]
self.con = con
self.con: SnowflakeConnection = con
self.pending_queries = []

def register_command(self, command: str):
"""
Expand Down Expand Up @@ -168,7 +170,24 @@ def run_commands(self, commands):
None
"""
for command in commands:
self.con.cursor().execute_async(command)
cur = self.con.cursor()
cur.execute_async(command)
self.pending_queries.append(cur.sfqid)

def wait_for_completion(self):
"""
Waits for all queries to complete.
"""
while self.pending_queries:
self.pending_queries = [
q
for q in self.pending_queries
if self.con.is_still_running(self.con.get_query_status_throw_if_error(q))
]
console.print(
f"Waiting for {len(self.pending_queries)} queries to complete before continuing."
)
time.sleep(1)

def run(self):
"""
Expand All @@ -185,6 +204,7 @@ def run(self):
# complete the processes
for proc in procs:
proc.join()
self.wait_for_completion()


# if __name__ == "__main__":
Expand Down

0 comments on commit 95a67d9

Please sign in to comment.