Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Commit

Permalink
Create a list of fields to clean outside of worker
Browse files Browse the repository at this point in the history
Signed-off-by: Olga Bulat <[email protected]>
  • Loading branch information
obulat committed Feb 5, 2023
1 parent b7cc4f1 commit 84ac386
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions ingestion_server/ingestion_server/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def test_tls_supported(cls, url):
return True


def _clean_data_worker(rows, temp_table, sources_config, table):
def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
log.info("Starting data cleaning worker")
global_field_to_func = sources_config["*"]["fields"]
worker_conn = database_connect()
Expand All @@ -216,7 +216,7 @@ def _clean_data_worker(rows, temp_table, sources_config, table):
}

start_time = time.time()
cleaned_values = {field: [] for field in _get_cleanable_fields(table)}
cleaned_values = {field: [] for field in all_fields}
for row in rows:
# Map fields that need updating to their cleaning functions
source = row["source"]
Expand Down Expand Up @@ -349,7 +349,7 @@ def clean_image_data(table):
end = job_size * n
last_end = end
# Arguments for parallel _clean_data_worker calls
jobs.append((batch[start:end], temp_table, source_config, table))
jobs.append((batch[start:end], temp_table, source_config, _get_cleanable_fields("image")))
pool = multiprocessing.Pool(processes=num_workers)
log.info(f"Starting {len(jobs)} cleaning jobs")
conn.commit()
Expand Down

0 comments on commit 84ac386

Please sign in to comment.