From c04973776f2ab8311c449da0a5ae18ed0b2488e8 Mon Sep 17 00:00:00 2001 From: Olga Bulat Date: Sun, 5 Feb 2023 11:32:39 +0300 Subject: [PATCH] Create a list of fields to clean outside of worker Signed-off-by: Olga Bulat --- ingestion_server/ingestion_server/cleanup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index d717a69eb..a6ca0e923 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -207,7 +207,7 @@ def test_tls_supported(cls, url): return True -def _clean_data_worker(rows, temp_table, sources_config, table): +def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): log.info("Starting data cleaning worker") global_field_to_func = sources_config["*"]["fields"] worker_conn = database_connect() @@ -226,7 +226,7 @@ def _clean_data_worker(rows, temp_table, sources_config, table): } start_time = time.time() - cleaned_values = {field: [] for field in _get_cleanable_fields(table)} + cleaned_values = {field: [] for field in all_fields} for row in rows: # Map fields that need updating to their cleaning functions source = row["source"] @@ -359,7 +359,7 @@ def clean_image_data(table): end = job_size * n last_end = end # Arguments for parallel _clean_data_worker calls - jobs.append((batch[start:end], temp_table, source_config, table)) + jobs.append((batch[start:end], temp_table, source_config, _get_cleanable_fields("image"))) pool = multiprocessing.Pool(processes=num_workers) log.info(f"Starting {len(jobs)} cleaning jobs") conn.commit()