Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Commit

Permalink
Save cleaned data to tsv
Browse files Browse the repository at this point in the history
Signed-off-by: Olga Bulat <[email protected]>
  • Loading branch information
obulat committed Feb 5, 2023
1 parent f084149 commit b5f06f6
Show file tree
Hide file tree
Showing 7 changed files with 4,558 additions and 11 deletions.
90 changes: 79 additions & 11 deletions ingestion_server/ingestion_server/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
This includes cleaning up malformed URLs and filtering out undesirable tags.
"""

import csv
import logging as log
import multiprocessing
import time
import uuid
from urllib.parse import urlparse

import requests as re
import re as regex
import tldextract
from psycopg2.extras import DictCursor, Json

Expand Down Expand Up @@ -72,6 +73,18 @@ class CleanupFunctions:
Cleanup functions are dispatched in the _cleanup_config dictionary.
"""

@staticmethod
def cleanup_wiki_title(title):
"""
Remove the "File:" prefix and the image filetype suffix from the title if it exists. If no change is
made, return None.
"""
pat = regex.compile("File:?(.*?)(?:\.(jpg|jpeg|png|gif|bmp|svg))?$")
if match := pat.match(title):
clean_title = match.group(1).replace("'", "''")
return f"'{clean_title}'"
return None

@staticmethod
def cleanup_url(url, tls_support):
"""
Expand Down Expand Up @@ -147,12 +160,22 @@ def cleanup_tags(tags):
"creator_url": CleanupFunctions.cleanup_url,
"foreign_landing_url": CleanupFunctions.cleanup_url,
}
},
"wikimedia": {
"fields": {
"title": CleanupFunctions.cleanup_wiki_title,
}
}
}
}
}
}

# Extracts global and sources-specific field names from _cleanup_config for specific table
def _get_cleanable_fields(table):
cleanable_fields = []
for source in _cleanup_config["tables"][table]["sources"].values():
cleanable_fields += list(source["fields"].keys())
return cleanable_fields

class TlsTest:
"""
Expand Down Expand Up @@ -184,19 +207,31 @@ def test_tls_supported(cls, url):
return True


def _clean_data_worker(rows, temp_table, sources_config):
def _clean_data_worker(rows, temp_table, sources_config, table):
log.info("Starting data cleaning worker")
global_field_to_func = sources_config["*"]["fields"]
worker_conn = database_connect()
log.info("Data cleaning worker connected to database")
write_cur = worker_conn.cursor(cursor_factory=DictCursor)
log.info(f"Cleaning {len(rows)} rows")
tls_cache = {}
# We know that flickr and wikimedia support TLS, so we can add them here
tls_cache = {
'www.flickr.com': True,
'commons.wikimedia.org': True,
'https://www.eol.org/': True,
'.geograph.org.uk': True,
'.eol.org': True,
'.digitaltmuseum.org': True,
'www.geograph.org.uk': True,
}

start_time = time.time()
cleaned_values = {field: [] for field in _get_cleanable_fields(table)}
for row in rows:
# Map fields that need updating to their cleaning functions
source = row["source"]
_id = row["id"]
identifier = row["identifier"]
if source in sources_config:
source_field_to_func = sources_config[source]["fields"]
# Merge source-local and global function field mappings
Expand All @@ -220,6 +255,8 @@ def _clean_data_worker(rows, temp_table, sources_config):
update_field_expressions = []
for field in cleaned_data:
update_field_expressions.append(f"{field} = {cleaned_data[field]}")
cleaned_values[field].append((identifier, cleaned_data[field]))

if len(update_field_expressions) > 0:
update_query = f"""UPDATE {temp_table} SET
{', '.join(update_field_expressions)} WHERE id = {_id}
Expand All @@ -233,15 +270,41 @@ def _clean_data_worker(rows, temp_table, sources_config):
end_time = time.time()
total_time = end_time - start_time
log.info(f"Worker finished batch in {total_time}")
return True
return cleaned_values

def save_cleaned_data(results):
log.info("Saving cleaned data...")
start_time = time.time()

results_to_save: dict[str, list[tuple[str, str|Json]]] = {}
# Results is a list of dicts, where each dict is a mapping of field name to
# a list of tuples of (identifier, cleaned_value). There are as many dicts
# as there are workers. We need to merge the lists of tuples for each field
# name.
for result in results:
for field in result:
if field not in results_to_save:
results_to_save[field] = []
results_to_save[field].extend(result[field])
cleanup_counts = {}
for field, cleaned_items in results_to_save.items():
cleanup_counts[field] = len(cleaned_items) if cleaned_items else 0
if cleaned_items:
with open(f"{field}.tsv", "a") as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerows(cleaned_items)

end_time = time.time()
total_time = end_time - start_time
log.info(f"Finished saving cleaned data in {total_time}")
return cleanup_counts


def clean_image_data(table):
"""
Clean up data loaded from upstream that is unsuitable for prod before going live.
:param table: The staging table for the new data
:param upstream_db: A dict specifying the connection details of the upstream DB
:return: None
"""

Expand All @@ -262,7 +325,7 @@ def clean_image_data(table):
fields_to_clean.add(f)

cleanup_selection = (
f"SELECT id, source, " f"{', '.join(fields_to_clean)} from temp_import_{table}"
f"SELECT id, identifier, source, " f"{', '.join(fields_to_clean)} from temp_import_{table}"
)
log.info(f'Running cleanup on selection "{cleanup_selection}"')
conn = database_connect(autocommit=True)
Expand All @@ -281,6 +344,8 @@ def clean_image_data(table):
jobs = []
num_workers = multiprocessing.cpu_count()
num_cleaned = 0
cleaned_counts_by_field = {field: 0 for field in fields_to_clean}

while batch:
# Divide updates into jobs for parallel execution.
batch_start_time = time.time()
Expand All @@ -294,22 +359,25 @@ def clean_image_data(table):
end = job_size * n
last_end = end
# Arguments for parallel _clean_data_worker calls
jobs.append((batch[start:end], temp_table, source_config))
jobs.append((batch[start:end], temp_table, source_config, table))
pool = multiprocessing.Pool(processes=num_workers)
log.info(f"Starting {len(jobs)} cleaning jobs")
conn.commit()
pool.starmap(_clean_data_worker, jobs)
results = pool.starmap(_clean_data_worker, jobs)
batch_cleaned_counts = save_cleaned_data(results)
for field in batch_cleaned_counts:
cleaned_counts_by_field[field] += batch_cleaned_counts[field]
pool.close()
num_cleaned += len(batch)
batch_end_time = time.time()
rate = len(batch) / (batch_end_time - batch_start_time)
log.info(f"Batch finished, records/s: cleanup_rate={rate}")
log.info(f"Fetching next batch. Records cleaned so far: {num_cleaned}")
log.info(f"Fetching next batch. Records cleaned so far: {num_cleaned}, counts: {batch_cleaned_counts}")
jobs = []
batch = iter_cur.fetchmany(size=CLEANUP_BUFFER_SIZE)
conn.commit()
iter_cur.close()
conn.close()
end_time = time.time()
cleanup_time = end_time - start_time
log.info(f"Cleaned all records in {cleanup_time} seconds")
log.info(f"Cleaned all records in {cleanup_time} seconds, counts: {cleaned_counts_by_field}")
Loading

0 comments on commit b5f06f6

Please sign in to comment.