From f3037a2091ea07da8e05442d5f6c5bf6b36cec73 Mon Sep 17 00:00:00 2001 From: "V. Ganesh" Date: Mon, 5 Dec 2022 17:31:32 +0530 Subject: [PATCH] add profiling log for livy sessions --- dbt/adapters/spark_livy/livysession.py | 29 +++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/spark_livy/livysession.py b/dbt/adapters/spark_livy/livysession.py index 7d10532b7..c0d0feea7 100644 --- a/dbt/adapters/spark_livy/livysession.py +++ b/dbt/adapters/spark_livy/livysession.py @@ -86,6 +86,9 @@ def create_session(self, data): except requests.exceptions.JSONDecodeError as json_err: raise Exception("Json decode error to get session_id") from json_err + logger.debug(f"Obtained new livy session: {self.session_id}") + logger.debug("Polling to check if session is idle") + # Wait for started state while True: res = requests.get( @@ -95,6 +98,8 @@ def create_session(self, data): verify=self.verify_ssl_certificate ).json() + logger.debug(f"Session state is {res['state']}") + if res['state'] == 'idle': break if res['state'] == 'dead': @@ -104,9 +109,10 @@ def create_session(self, data): ) return + logger.debug(f"Sleeping {DEFAULT_POLL_WAIT} seconds before next poll") time.sleep(DEFAULT_POLL_WAIT) - logger.debug(f"Creating new livy session: {self.session_id}") + logger.debug(f"Created new livy session: {self.session_id}") return self.session_id @@ -248,18 +254,24 @@ def _getLivySQL(self, sql): def _getLivyResult(self, res_obj): json_res = res_obj.json() + statement_id = repr(json_res['id']) + logger.debug(f"{self.session_id}: Awaiting query results for {statement_id}") + while True: res = requests.get( - self.connect_url + '/sessions/' + self.session_id + '/statements/' + repr(json_res['id']), + self.connect_url + '/sessions/' + self.session_id + '/statements/' + statement_id, headers=self.headers, auth=self.auth, verify=self.verify_ssl_certificate ).json() # print(res) + logger.debug(f"{self.session_id}: Query status for {statement_id}: {res['state']}") if res['state'] == 'available': return res + + logger.debug(f"{self.session_id}: Waiting {DEFAULT_POLL_WAIT} seconds to poll next for: {statement_id}") time.sleep(DEFAULT_POLL_WAIT) def execute(self, sql: str, *parameters: Any) -> None: @@ -287,8 +299,10 @@ def execute(self, sql: str, *parameters: Any) -> None: # TODO: handle parameterised sql + logger.debug(f"{self.session_id}: Sumitting query: {sql}") + res = self._getLivyResult(self._submitLivyCode(self._getLivySQL(sql))) - + if (res['output']['status'] == 'ok'): # values = res['output']['data']['application/json'] values = res['output']['data']['application/json'] @@ -300,6 +314,8 @@ def execute(self, sql: str, *parameters: Any) -> None: else: self._rows = [] self._schema = [] + + logger.debug(f"{self.session_id}: Query execution over") else: self._rows = None self._schema = None @@ -412,6 +428,7 @@ def __init__(self): self.livy_global_session = None def connect(self, connect_url, user, password, auth_type, session_params, verify_ssl_certificate): + logger.debug("Starting livy session") if auth_type and auth_type.lower() == "kerberos": logger.debug("Using Kerberos auth") auth = HTTPKerberosAuth() @@ -431,10 +448,15 @@ def connect(self, connect_url, user, password, auth_type, session_params, verify if (self.livy_global_session == None): self.livy_global_session = LivySession(connect_url, auth, headers, verify_ssl_certificate) + logger.debug("Creating new livy session") self.livy_global_session.create_session(data) + logger.debug(f"New livy session created {self.livy_global_session.session_id}") elif not self.livy_global_session.is_valid_session(): + logger.debug(f"Deleting previous invalid session {self.livy_global_session.session_id}") self.livy_global_session.delete_session() + logger.debug("Creating new livy session") self.livy_global_session.create_session(data) + logger.debug(f"New livy session created {self.livy_global_session.session_id}") else: logger.debug(f"Reusing session: {self.livy_global_session.session_id}") @@ -446,6 +468,7 @@ def connect(self, connect_url, user, password, auth_type, session_params, verify session_params, verify_ssl_certificate ) + logger.debug("Livy session connected") return livyConnection