Skip to content

Commit

Permalink
add retry
Browse files Browse the repository at this point in the history
  • Loading branch information
v-chen_data committed Jun 17, 2024
1 parent 618db6f commit 6e55ec6
Showing 1 changed file with 46 additions and 29 deletions.
75 changes: 46 additions & 29 deletions scripts/data_prep/convert_delta_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

MINIMUM_DB_CONNECT_DBR_VERSION = '14.1'
MINIMUM_SQ_CONNECT_DBR_VERSION = '12.2'
MAX_RETRY = 5

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -368,36 +369,52 @@ def fetch(
dbsql (databricks.sql.connect): dbsql session
"""
cursor = dbsql.cursor() if dbsql is not None else None
for row_retry in range(MAX_RETRY):
try:
ans = run_query(
f'SELECT COUNT(*) FROM {tablename}',
method,
cursor,
sparkSession,
)
nrows = [row.asDict() for row in ans
][0].popitem()[1] # pyright: ignore
log.info(f'total_rows = {nrows}')
break
except Exception as e:
if row_retry == MAX_RETRY - 1:
raise RuntimeError(
f'Error in get total rows from {tablename}. Restart sparkSession and try again',
) from e
else:
log.warning(
f'Error in get total rows from {tablename}, trying again...'
)

try:
ans = run_query(
f'SELECT COUNT(*) FROM {tablename}',
method,
cursor,
sparkSession,
)
nrows = [row.asDict() for row in ans][0].popitem()[1] # pyright: ignore
log.info(f'total_rows = {nrows}')
except Exception as e:
raise RuntimeError(
f'Error in get total rows from {tablename}. Restart sparkSession and try again',
) from e

try:
ans = run_query(
f'SHOW COLUMNS IN {tablename}',
method,
cursor,
sparkSession,
)
columns = [row.asDict().popitem()[1] for row in ans] # pyright: ignore
order_by = columns[0]
columns_str = ','.join(columns)
log.info(f'order by column {order_by}')
except Exception as e:
raise RuntimeError(
f'Error in get columns from {tablename}. Restart sparkSession and try again',
) from e
for row_retry in range(MAX_RETRY):
try:
ans = run_query(
f'SHOW COLUMNS IN {tablename}',
method,
cursor,
sparkSession,
)
columns = [
row.asDict().popitem()[1] for row in ans
] # pyright: ignore
order_by = columns[0]
columns_str = ','.join(columns)
log.info(f'order by column {order_by}')
break
except Exception as e:
if row_retry == MAX_RETRY - 1:
raise RuntimeError(
f'Error in get columns from {tablename}. Restart sparkSession and try again',
) from e
else:
log.warning(
f'Error in get columns from {tablename}, trying again...'
)

if method == 'dbconnect' and sparkSession is not None:
log.info(f'{processes=}')
Expand Down

0 comments on commit 6e55ec6

Please sign in to comment.