Skip to content

Commit

Permalink
add profiling log for livy sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
tovganesh committed Dec 5, 2022
1 parent 5cb195a commit ab2e6f0
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions dbt/adapters/spark_livy/livysession.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ def create_session(self, data):
except requests.exceptions.JSONDecodeError as json_err:
raise Exception("Json decode error to get session_id") from json_err

logger.debug(f"Obtained new livy session: {self.session_id}")
logger.debug(f"{self.session_id} Polling to check if session is idle")

# Wait for started state
while True:
res = requests.get(
Expand All @@ -95,6 +98,8 @@ def create_session(self, data):
verify=self.verify_ssl_certificate
).json()

logger.debug(f"Session state is {res['state']}")

if res['state'] == 'idle':
break
if res['state'] == 'dead':
Expand All @@ -104,9 +109,10 @@ def create_session(self, data):
)
return

logger.debug(f"Sleeping {DEFAULT_POLL_WAIT} seconds before next poll")
time.sleep(DEFAULT_POLL_WAIT)

logger.debug(f"Creating new livy session: {self.session_id}")
logger.debug(f"Created new livy session: {self.session_id}")

return self.session_id

Expand Down Expand Up @@ -248,18 +254,24 @@ def _getLivySQL(self, sql):
def _getLivyResult(self, res_obj):
json_res = res_obj.json()

statement_id = repr(json_res['id'])
logger.debug(f"{self.session_id}: Awaiting query results for {statement_id}")

while True:
res = requests.get(
self.connect_url + '/sessions/' + self.session_id + '/statements/' + repr(json_res['id']),
self.connect_url + '/sessions/' + self.session_id + '/statements/' + statement_id,
headers=self.headers,
auth=self.auth,
verify=self.verify_ssl_certificate
).json()

# print(res)
logger.debug(f"{self.session_id}: Query status for {statement_id}: {res['state']}")

if res['state'] == 'available':
return res

logger.debug(f"{self.session_id}: Waiting {DEFAULT_POLL_WAIT} seconds to poll next for: {statement_id}")
time.sleep(DEFAULT_POLL_WAIT)

def execute(self, sql: str, *parameters: Any) -> None:
Expand Down Expand Up @@ -287,8 +299,10 @@ def execute(self, sql: str, *parameters: Any) -> None:

# TODO: handle parameterised sql

logger.debug(f"{self.session_id}: Submitting query: {sql}")

res = self._getLivyResult(self._submitLivyCode(self._getLivySQL(sql)))

if (res['output']['status'] == 'ok'):
# values = res['output']['data']['application/json']
values = res['output']['data']['application/json']
Expand All @@ -300,6 +314,8 @@ def execute(self, sql: str, *parameters: Any) -> None:
else:
self._rows = []
self._schema = []

logger.debug(f"{self.session_id}: Query execution over")
else:
self._rows = None
self._schema = None
Expand Down Expand Up @@ -412,6 +428,7 @@ def __init__(self):
self.livy_global_session = None

def connect(self, connect_url, user, password, auth_type, session_params, verify_ssl_certificate):
logger.debug("Starting livy session")
if auth_type and auth_type.lower() == "kerberos":
logger.debug("Using Kerberos auth")
auth = HTTPKerberosAuth()
Expand All @@ -431,10 +448,15 @@ def connect(self, connect_url, user, password, auth_type, session_params, verify

if (self.livy_global_session == None):
self.livy_global_session = LivySession(connect_url, auth, headers, verify_ssl_certificate)
logger.debug("Creating new livy session")
self.livy_global_session.create_session(data)
logger.debug(f"New livy session created {self.livy_global_session.session_id}")
elif not self.livy_global_session.is_valid_session():
logger.debug(f"Deleting previous invalid session {self.livy_global_session.session_id}")
self.livy_global_session.delete_session()
logger.debug("Creating new livy session")
self.livy_global_session.create_session(data)
logger.debug(f"New livy session created {self.livy_global_session.session_id}")
else:
logger.debug(f"Reusing session: {self.livy_global_session.session_id}")

Expand All @@ -446,6 +468,7 @@ def connect(self, connect_url, user, password, auth_type, session_params, verify
session_params,
verify_ssl_certificate
)
logger.debug("Livy session connected")

return livyConnection

Expand Down

0 comments on commit ab2e6f0

Please sign in to comment.