diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index d717a69eb..a6ca0e923 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -207,7 +207,7 @@ def test_tls_supported(cls, url): return True -def _clean_data_worker(rows, temp_table, sources_config, table): +def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): log.info("Starting data cleaning worker") global_field_to_func = sources_config["*"]["fields"] worker_conn = database_connect() @@ -226,7 +226,7 @@ def _clean_data_worker(rows, temp_table, sources_config, table): } start_time = time.time() - cleaned_values = {field: [] for field in _get_cleanable_fields(table)} + cleaned_values = {field: [] for field in all_fields} for row in rows: # Map fields that need updating to their cleaning functions source = row["source"] @@ -359,7 +359,7 @@ def clean_image_data(table): end = job_size * n last_end = end # Arguments for parallel _clean_data_worker calls - jobs.append((batch[start:end], temp_table, source_config, table)) + jobs.append((batch[start:end], temp_table, source_config, _get_cleanable_fields("image"))) pool = multiprocessing.Pool(processes=num_workers) log.info(f"Starting {len(jobs)} cleaning jobs") conn.commit()