-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use deterministic job_ids to avoid retrying successful queries #977
Conversation
…ed in previous run
…ed in previous run
…mcknight/adap-924
dbt/adapters/bigquery/connections.py
Outdated
def cancel_open(self) -> None: | ||
pass | ||
names = [] | ||
this_connection = self.get_if_exists() | ||
with self.lock: | ||
for thread_id, connection in self.thread_connections.items(): | ||
if connection is this_connection: | ||
continue | ||
|
||
if connection.handle is not None and connection.state == ConnectionState.OPEN: | ||
client = connection.handle | ||
for job_id in self.jobs_by_thread.get(thread_id, []): | ||
|
||
def fn(): | ||
return client.cancel_job(job_id) | ||
|
||
self._retry_and_handle(msg=f"Cancel job: {job_id}", conn=connection, fn=fn) | ||
|
||
self.close(connection) | ||
|
||
if connection.name is not None: | ||
names.append(connection.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to return names
here? if we do we need to re-add it and change the type for function. to match what's in dbt-core on the SQLConnectionManager
Within dbt-bigquery the only place we call this is in unit tests
Having to modify two unit tests to take into account
|
all integration/functional tests failing now for
in |
solved this part of it by not going through defined client and just using conn.name |
@colin-rogers-dbt it's possible we may still have to use this seems to pass locally but not a fan of interaction when you add a breakpoint around ln524 on |
…der to create unique job_id
…mcknight/adap-924
|
||
self.close(connection) | ||
|
||
if connection.name is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we specifically want all connections which are not this_connection
? Or do we only want connections in which we cancelled jobs? In this current flow, a connection for which connection.state == ConnectionState.CLOSED
will show up in names
, which doesn't feel like an intuitive list to get from `cancel_open'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If i backtrack open_cancel
to core it looks like one of the only places we call it is for cancel_open_connections
which does make me think the desired result is that all closed connections are accounted for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other words we should only be returning the connections which we cancelled during this call, right?
if connection is this_connection: | ||
continue | ||
|
||
if connection.handle is not None and connection.state == ConnectionState.OPEN: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be over-engineering, but I would consider putting the contents of this if block into its own method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be clear are you referring to lines 308-321?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm referring to lines 310-318. Then this would look like:
names = []
this_connection = self.get_if_exists()
with self.lock:
for thread_id, connection in self.thread_connections.items():
if connection is this_connection:
continue
if connection.handle and connection.state == ConnectionState.OPEN:
self.close_thread(thread_id, connection) # or whatever name you choose
if name := connection.name:
names.append(name)
return names
Something else worth considering is whether we want to handle all of the threads within a connection. I don't know if there's more than one thread for a connection, but I feel like there is. If there's a connection with more than one thread, you'll close that connection in the second condition above when you get to the first thread. Then you'll skip past the second condition for every other thread since connection.state
should be closed at that point.
I think what you probably want is a list of job_ids by connection. Then for each connection you would cancel the job. Once all jobs are cancelled, then close the connection.
dbt/adapters/bigquery/connections.py
Outdated
# build out determinsitic_id | ||
model_name = conn.credentials.schema # schema name as model name is not | ||
invocation_id = str(uuid.uuid4()) | ||
job_id = define_job_id(model_name, invocation_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think uuid.uuid4()
is deterministic, which means job_id
is not either. Have you considered an md5
hash of sufficient attributes (model, connection name, etc.)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently calling uuid
directly as part of getting unit tests swapped over for functionality I think initial/current plan was to use the invocation_id
we define via tracking in core https://docs.getdbt.com/reference/dbt-jinja-functions/invocation_id and it itself is a uuid
based on docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using invocation_id (which we only sometimes have) should we just use the actual query text (which we have to have)?
dbt/adapters/bigquery/jobs.py
Outdated
@@ -0,0 +1,3 @@ | |||
def define_job_id(model_name, invocation_id): | |||
job_id = f"{model_name}_{invocation_id}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the constraint on job_id
? Is there a max length? Can all characters that go into a model name also be used in a job id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will definitely have to test this, I think all characters are fine as we should just be combining 2 strings but length may hit a limit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would want to make sure that we can submit a job_id to BQ with some weird characters. People put all kinds of things in their model names. An alternative is to hash the model_name so that it's only alpha-numeric.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 if we want to stick with uuid we can just generate a deterministic one with uuid.uuid5
def define_job_id(sql, invocation_id=None): | ||
if invocation_id: | ||
job_id = str(uuid.uuid5(invocation_id, sql)) | ||
else: | ||
job_id = str(uuid.uuid5(_INVOCATION_ID, sql)) | ||
job_id = job_id.replace("-", "_") | ||
return job_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leverage a macro to let end users override that logic to make it unique across invocations if needed
This PR has been marked as Stale because it has been open with no activity as of late. If you would like the PR to remain open, please comment on the PR or else it will be closed in 7 days. |
Although we are closing this PR as stale, it can still be reopened to continue development. Just add a comment to notify the maintainers. |
resolves #949
docs dbt-labs/docs.getdbt.com/#
Problem
Currently if we experience a transient exception like RemoteDisconnected we can sometimes end up re-running a query that has been successfully kicked off.
Solution
on retry try to poll by a deterministic job_id to see if a successful job has already been kicked off
Checklist