diff --git a/README.md b/README.md index 8f4c591..2faad64 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,29 @@ # brave-news-source-suggestion -Pipeline for producing the source embedding representations and similarity matrix needed for source suggestion feature in Brave News. +Service for producing the source embedding representations and similarity matrix needed for source suggestion feature in Brave News. -## Scripts -Run the scripts in the order in which they are presented. - -`source-feed-accumulator.py`: parses periodically Brave News's feed, creating articles buckets for each source. These buckets are collected in `articles_history.csv` and catalogued by the `publisher_id` attribute. - -`sources-similarity-matrix.py`: takes in the source buckets and produces an 512-dimensional embedding for each source, built as the mean of the 512-dimensional embeddings of all articles belonging to the source, as generated by the Universal Sentence Encoder model (https://arxiv.org/abs/1803.11175). +## Installation -## Outputs - -`source_embeddings.csv`: [`index | publisher_id | 0 | 1 ... | ... 511`] stores all the 512-dimensional embeddings for each source under its `publisher_name`. - -`source_similarity_t10.json` stores the top-10 most similar sources, with similarity score, for each source. +``` +pip install -r requirements.txt +``` +## Scripts +**source-feed-accumulator.py**: parses Brave News feed periodically, collecting articles for each source in `articles_history.csv`. For each article, we store the `publisher_id` attribute. + +**sources-similarity-matrix.py**: takes as input the article history and produces a 512-dimensional embedding for each source, using the Universal Sentence Encoder model (https://arxiv.org/abs/1803.11175). Once an embedding is computed for each source, a source similarity matrix is produced. + +## Running locally +To collect and accumulate article history: +``` +export NO_UPLOAD=1 +export NO_DOWNLOAD=1 +python source-feed-accumulator.py +``` + +To computed source embeddings and produce the source similarity matrix: +``` +export NO_UPLOAD=1 +export NO_DOWNLOAD=1 +python sources-similarity-matrix.py +``` diff --git a/config.py b/config.py index c4a963e..f692f33 100644 --- a/config.py +++ b/config.py @@ -1,4 +1,5 @@ import os +from multiprocessing import cpu_count # Disable uploads to S3. Useful when running locally or in CI. NO_UPLOAD = os.getenv('NO_UPLOAD', None) @@ -13,6 +14,7 @@ LANG_REGION_MODEL_MAP = os.getenv('LANG_REGION_MODEL_MAP', [ ('en_US', "https://tfhub.dev/google/universal-sentence-encoder/4"), ('en_CA', "https://tfhub.dev/google/universal-sentence-encoder/4"), + ('en_GB', "https://tfhub.dev/google/universal-sentence-encoder/4"), ('es_ES', 'https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/3'), ('es_MX', 'https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/3'), ('pt_BR', 'https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/3'), @@ -32,3 +34,10 @@ SOURCE_SIMILARITY_T10_HR = os.getenv('SOURCE_SIMILARITY_T10_HR', "source_similarity_t10_hr.{LANG_REGION}") SOURCE_EMBEDDINGS = os.getenv('SOURCE_EMBEDDINGS', "SOURCE_EMBEDDINGS.{LANG_REGION}") + +# Set the number of processes to spawn for all multiprocessing tasks. +CONCURRENCY = cpu_count() + +if SENTRY_URL := os.getenv('SENTRY_URL'): + import sentry_sdk + sentry_sdk.init(dsn=SENTRY_URL, traces_sample_rate=0) diff --git a/requirements.txt b/requirements.txt index 37edd34..786145c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ numpy==1.23.5 pandas==1.5.1 requests==2.28.1 scipy==1.9.3 +sentry-sdk==1.12.0 tensorflow==2.9.3 tensorflow_text==2.9.0 tensorflow_hub==0.12.0 diff --git a/source-feed-accumulator.py b/source-feed-accumulator.py index d0b15ed..7c8d17c 100644 --- a/source-feed-accumulator.py +++ b/source-feed-accumulator.py @@ -4,6 +4,7 @@ import pandas as pd from structlog import get_logger from tqdm import tqdm +from multiprocessing import Pool as ProcessPool import config from utils import download_file, upload_file @@ -28,7 +29,9 @@ def accumulate_articles(articles, lang_region): f.write('"' + '","'.join([title, description, publish_time, publisher_id]) + '"\n') -for lang_region, model in config.LANG_REGION_MODEL_MAP: +def main(lang_region_model_map): + lang_region, _ = lang_region_model_map + logger.info(f"Starting feeds accumulator for {lang_region}") feed_file = f'{config.FEED_JSON_FILE.format(LANG_REGION=lang_region)}.json' @@ -56,3 +59,10 @@ def accumulate_articles(articles, lang_region): f"source-suggestions/{config.ARTICLE_HISTORY_FILE.format(LANG_REGION=lang_region)}") logger.info("Finished sanitizing articles_history.") + + + +if __name__ == '__main__': + with ProcessPool(config.CONCURRENCY) as pool: + for item in pool.imap_unordered(main, config.LANG_REGION_MODEL_MAP): + pass \ No newline at end of file diff --git a/source-similarity-matrix.py b/source-similarity-matrix.py index 355f8a3..b5e9760 100644 --- a/source-similarity-matrix.py +++ b/source-similarity-matrix.py @@ -9,49 +9,81 @@ import tensorflow_text from structlog import get_logger from tqdm import tqdm +from multiprocessing import Pool as ProcessPool import config from utils import download_file, upload_file logger = get_logger() +def upload_source_sim_files(lang_region): + logger.info("Uploading sources sim files") + if not config.NO_UPLOAD: + upload_file(config.OUTPUT_DIR + "/" + f'/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json', + config.PUB_S3_BUCKET, + f"source-suggestions/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json") + + upload_file( + config.OUTPUT_DIR + "/" + f'/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json', + config.PUB_S3_BUCKET, + f"source-suggestions/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json") + + upload_file(config.OUTPUT_DIR + "/" + f'/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv', + config.PUB_S3_BUCKET, + f"source-suggestions/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv") + +def clean_source_similarity_file(sources_data, sources_sim_data): + sources_id = [sources.get("publisher_id") for sources in sources_data] + + for s_id in sources_id: + if s_id not in sources_sim_data: + sources_sim_data.pop(s_id, None) + continue + + if s_id in sources_sim_data: + for index, suggestion in enumerate(sources_sim_data[s_id]): + if suggestion["source"] not in sources_id: + sources_sim_data[s_id].pop(index) -def embed(input): - return model(input) + return sources_sim_data -# Take centroid of 512-d embeddings -def get_source_representation_from_titles(titles): - source_repr = np.zeros((1, 512)) - if len(titles) < config.MINIMUM_ARTICLE_HISTORY_SIZE: - return source_repr +def update_source_sim_files(model_lang): + def embed(input): + return model(input) - for title in titles: - source_repr += embed([title])[0] - norm_repr = tf.nn.l2_normalize(source_repr / len(titles), axis=1) - return norm_repr.numpy() + # Take centroid of 512-d embeddings + def get_source_representation_from_titles(titles): + source_repr = np.zeros((1, 512)) + if len(titles) < config.MINIMUM_ARTICLE_HISTORY_SIZE: + return source_repr + for title in titles: + source_repr += embed([title])[0] + norm_repr = tf.nn.l2_normalize(source_repr / len(titles), axis=1) + return norm_repr.numpy() -def compute_source_similarity(source1, source2, t='dot'): - cosine_similarities = np.dot(source1, np.transpose(source2)) - clip_cosine_similarity = tf.clip_by_value(cosine_similarities, -1.0, 1.0) - score = 1.0 - tf.acos(clip_cosine_similarity) / math.pi - return score + def compute_source_similarity(source1, source2, t='dot'): + cosine_similarities = np.dot(source1, np.transpose(source2)) + clip_cosine_similarity = tf.clip_by_value(cosine_similarities, -1.0, 1.0) + score = 1.0 - tf.acos(clip_cosine_similarity) / math.pi + return score -def compute_source_representation_from_articles(articles_df, publisher_id): - publisher_bucket_df = articles_df[articles_df.publisher_id == publisher_id] - source_titles = [title for title in publisher_bucket_df.title.to_numpy() if title is not None] - return get_source_representation_from_titles(source_titles) + def compute_source_representation_from_articles(articles_df, publisher_id): + publisher_bucket_df = articles_df[articles_df.publisher_id == publisher_id] + source_titles = [title for title in publisher_bucket_df.title.to_numpy() if title is not None] + return get_source_representation_from_titles(source_titles) -def get_source_id_for_title(title, sources_df): - return sources_df[sources_df.publisher_name == title].publisher_id.to_numpy()[0] + def get_source_id_for_title(title, sources_df): + return sources_df[sources_df.publisher_name == title].publisher_id.to_numpy()[0] -# Compute similarity matrix for all existing LANG_REGION pairs -for lang_region, model_url in config.LANG_REGION_MODEL_MAP: + + # Compute similarity matrix for all existing LANG_REGION pairs + lang_region, model_url = model_lang logger.info(f"Started computing similarity matrix for {lang_region} using {model_url}") pathlib.Path(config.OUTPUT_DIR).mkdir(parents=True, exist_ok=True) @@ -94,8 +126,7 @@ def get_source_id_for_title(title, sources_df): logger.info(f"Computing sources representations for {lang_region}") sources_representation = pd.DataFrame({'publisher_id': publisher_ids}) sources_representation = pd.concat([sources_representation, pd.DataFrame(reprs)], axis=1) - sources_representation.to_csv( - f'output/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv', header=None) + sources_representation.to_csv(f'output/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv', header=None) logger.info("Finished building source embeddings.") # For each source pair, compute pair similarity @@ -130,30 +161,32 @@ def get_source_id_for_title(title, sources_df): # Only include suggestion if within 10% of the best match's score top_similarity_score = 0.0 if sources_ranking: - top_similarity_score = sources_ranking[0][1] + top_similarity_score = sources_ranking[0][1] similarity_cutoff = config.SIMILARITY_CUTOFF_RATIO * top_similarity_score - top10_dictionary[source_id] = [{'source': get_source_id_for_title(source[0], sources_df), 'score': source[1]} for + top10_dictionary[source_id] = [{'source': get_source_id_for_title(source[0], sources_df), 'score': source[1]} + for source in sources_ranking[:10] if source[1] > similarity_cutoff] top10_dictionary_human_readable[feed] = [{'source': source[0], 'score': source[1]} for source in sources_ranking[:10] if source[1] > similarity_cutoff] + logger.info("Removing un-matched sources") + top10_dictionary = clean_source_similarity_file(sources_data, top10_dictionary) + logger.info("Outputting sources similarities files") - with open(f'output/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json', 'w', encoding='utf-8') as f: - json.dump(top10_dictionary, f, ensure_ascii=True) - with open(f'output/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json', 'w', encoding='utf-8') as f: - json.dump(top10_dictionary_human_readable, - f, ensure_ascii=True) - logger.info("Script has finished running.") + with open(f'output/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json', 'w') as f: + json.dump(top10_dictionary, f) + with open(f'output/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json', 'w') as f: + json.dump(top10_dictionary_human_readable, f) - if not config.NO_UPLOAD: - upload_file(config.OUTPUT_DIR + "/" + f'/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json', - config.PUB_S3_BUCKET, - f"source-suggestions/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json") + upload_source_sim_files(lang_region) + logger.info(f"Updated similarity file for {lang_region}") - upload_file(config.OUTPUT_DIR + "/" + f'/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json', - config.PUB_S3_BUCKET, - f"source-suggestions/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json") - upload_file(config.OUTPUT_DIR + "/" + f'/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv', - config.PUB_S3_BUCKET, - f"source-suggestions/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv") +if __name__ == '__main__': + # Compute similarity matrix for all existing LANG_REGION pairs + logger.info(f"Number of cpu : {config.CONCURRENCY}") + with ProcessPool(config.CONCURRENCY) as pool: + tqdm(pool.map(update_source_sim_files, config.LANG_REGION_MODEL_MAP), + total=len(config.LANG_REGION_MODEL_MAP)) + + logger.info("Script has finished running.") \ No newline at end of file diff --git a/utils.py b/utils.py index 0c70ac6..b799d06 100644 --- a/utils.py +++ b/utils.py @@ -1,4 +1,5 @@ import logging +import mimetypes import boto3 from botocore.exceptions import ClientError @@ -17,9 +18,11 @@ def upload_file(file_name, bucket, object_name=None): if object_name is None: object_name = file_name try: + content_type = mimetypes.guess_type(file_name)[0] or 'binary/octet-stream' s3_client.upload_file(file_name, bucket, object_name, ExtraArgs={ 'GrantRead': f'id={config.BRAVE_TODAY_CLOUDFRONT_CANONICAL_ID}', - 'GrantFullControl': f'id={config.BRAVE_TODAY_CANONICAL_ID}' + 'GrantFullControl': f'id={config.BRAVE_TODAY_CANONICAL_ID}', + 'ContentType': content_type }) except ClientError as e: