Skip to content

Commit

Permalink
only lock during creation of pandas dataframe in output role
Browse files Browse the repository at this point in the history
  • Loading branch information
maurerle committed Nov 25, 2024
1 parent b9cf60e commit a48f2f7
Showing 1 changed file with 24 additions and 25 deletions.
49 changes: 24 additions & 25 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,40 +294,39 @@ async def store_dfs(self):
return

for table in self.write_dfs.keys():
with self.locks[table]:
if len(self.write_dfs[table]) == 0:
if len(self.write_dfs[table]) == 0:
continue

with self.locks[table]:
# concat all dataframes
# use join='outer' to keep all columns and fill missing values with NaN
df = pd.concat(self.write_dfs[table], axis=0, join="outer")
self.write_dfs[table] = []

df.reset_index()
if df.empty:
continue
df.reset_index()
if df.empty:
continue

Check warning on line 307 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L307

Added line #L307 was not covered by tests

df = df.apply(check_for_tensors)
df = df.apply(check_for_tensors)

if self.export_csv_path:
data_path = self.export_csv_path / f"{table}.csv"
df.to_csv(
data_path,
mode="a",
header=not data_path.exists(),
float_format="%.5g",
)
if self.export_csv_path:
data_path = self.export_csv_path / f"{table}.csv"
df.to_csv(

Check warning on line 313 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L312-L313

Added lines #L312 - L313 were not covered by tests
data_path,
mode="a",
header=not data_path.exists(),
float_format="%.5g",
)

if self.db is not None:
try:
with self.db.begin() as db:
df.to_sql(table, db, if_exists="append")
except (ProgrammingError, OperationalError, DataError):
self.check_columns(table, df)
# now try again
with self.db.begin() as db:
df.to_sql(table, db, if_exists="append")
if self.db is not None:
try:
with self.db.begin() as db:
df.to_sql(table, db, if_exists="append")
except (ProgrammingError, OperationalError, DataError):
self.check_columns(table, df)

Check warning on line 325 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L324-L325

Added lines #L324 - L325 were not covered by tests
# now try again
with self.db.begin() as db:
df.to_sql(table, db, if_exists="append")

Check warning on line 328 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L327-L328

Added lines #L327 - L328 were not covered by tests

self.write_dfs[table] = []

self.current_dfs_size = 0

Expand Down

0 comments on commit a48f2f7

Please sign in to comment.