Skip to content

Commit

Permalink
add profiling log for livy sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
tovganesh committed Dec 5, 2022
1 parent 5cb195a commit 28f1dff
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions dbt/adapters/spark_livy/livysession.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ def create_session(self, data):
except requests.exceptions.JSONDecodeError as json_err:
raise Exception("Json decode error to get session_id") from json_err

logger.debug(f"Obtained new livy session: {self.session_id}")
logger.debug(f"{self.session_id}: Polling to check if session is idle")

# Wait for started state
while True:
res = requests.get(
Expand All @@ -95,6 +98,8 @@ def create_session(self, data):
verify=self.verify_ssl_certificate
).json()

logger.debug(f"{self.session_id}: Session state is {res['state']}")

if res['state'] == 'idle':
break
if res['state'] == 'dead':
Expand All @@ -104,9 +109,10 @@ def create_session(self, data):
)
return

logger.debug(f"{self.session_id}: Sleeping {DEFAULT_POLL_WAIT} seconds before next poll")
time.sleep(DEFAULT_POLL_WAIT)

logger.debug(f"Creating new livy session: {self.session_id}")
logger.debug(f"Created new livy session: {self.session_id}")

return self.session_id

Expand Down Expand Up @@ -245,21 +251,24 @@ def _getLivySQL(self, sql):

return code

def _getLivyResult(self, res_obj):
json_res = res_obj.json()
def _getLivyResult(self, statement_id):
logger.debug(f"{self.session_id}: {statement_id}: Awaiting query results")

while True:
res = requests.get(
self.connect_url + '/sessions/' + self.session_id + '/statements/' + repr(json_res['id']),
self.connect_url + '/sessions/' + self.session_id + '/statements/' + statement_id,
headers=self.headers,
auth=self.auth,
verify=self.verify_ssl_certificate
).json()

# print(res)
logger.debug(f"{self.session_id}: {statement_id}: Query status: {res['state']}")

if res['state'] == 'available':
return res

logger.debug(f"{self.session_id}: {statement_id}: Sleeping {DEFAULT_POLL_WAIT} seconds to poll for status")
time.sleep(DEFAULT_POLL_WAIT)

def execute(self, sql: str, *parameters: Any) -> None:
Expand Down Expand Up @@ -287,8 +296,13 @@ def execute(self, sql: str, *parameters: Any) -> None:

# TODO: handle parameterised sql

res = self._getLivyResult(self._submitLivyCode(self._getLivySQL(sql)))

logger.debug(f"{self.session_id}: Submitting query: {sql}")

res_obj = self._submitLivyCode(self._getLivySQL(sql))
json_res = res_obj.json()
statement_id = repr(json_res['id'])
res = self._getLivyResult(statement_id)

if (res['output']['status'] == 'ok'):
# values = res['output']['data']['application/json']
values = res['output']['data']['application/json']
Expand All @@ -300,6 +314,8 @@ def execute(self, sql: str, *parameters: Any) -> None:
else:
self._rows = []
self._schema = []

logger.debug(f"{self.session_id}: {statement_id}: Query execution over")
else:
self._rows = None
self._schema = None
Expand Down Expand Up @@ -412,6 +428,7 @@ def __init__(self):
self.livy_global_session = None

def connect(self, connect_url, user, password, auth_type, session_params, verify_ssl_certificate):
logger.debug("Starting livy session")
if auth_type and auth_type.lower() == "kerberos":
logger.debug("Using Kerberos auth")
auth = HTTPKerberosAuth()
Expand All @@ -431,12 +448,17 @@ def connect(self, connect_url, user, password, auth_type, session_params, verify

if (self.livy_global_session == None):
self.livy_global_session = LivySession(connect_url, auth, headers, verify_ssl_certificate)
logger.debug("Creating new livy session")
self.livy_global_session.create_session(data)
logger.debug(f"New Livy session created: {self.livy_global_session.session_id}")
elif not self.livy_global_session.is_valid_session():
logger.debug(f"Deleting previous invalid session: {self.livy_global_session.session_id}")
self.livy_global_session.delete_session()
logger.debug("Creating new livy session")
self.livy_global_session.create_session(data)
logger.debug(f"New Livy session created: {self.livy_global_session.session_id}")
else:
logger.debug(f"Reusing session: {self.livy_global_session.session_id}")
logger.debug(f"Reusing Livy session: {self.livy_global_session.session_id}")

livyConnection = LivyConnection(
connect_url,
Expand All @@ -446,6 +468,7 @@ def connect(self, connect_url, user, password, auth_type, session_params, verify
session_params,
verify_ssl_certificate
)
logger.debug(f"{self.livy_global_session.session_id}: Livy session connected")

return livyConnection

Expand Down

0 comments on commit 28f1dff

Please sign in to comment.