Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Multiprocessing #68

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
# brave-news-source-suggestion

Pipeline for producing the source embedding representations and similarity matrix needed for source suggestion feature in Brave News.
Service for producing the source embedding representations and similarity matrix needed for source suggestion feature in Brave News.

## Scripts
Run the scripts in the order in which they are presented.

`source-feed-accumulator.py`: parses periodically Brave News's feed, creating articles buckets for each source. These buckets are collected in `articles_history.csv` and catalogued by the `publisher_id` attribute.

`sources-similarity-matrix.py`: takes in the source buckets and produces an 512-dimensional embedding for each source, built as the mean of the 512-dimensional embeddings of all articles belonging to the source, as generated by the Universal Sentence Encoder model (https://arxiv.org/abs/1803.11175).
## Installation

## Outputs

`source_embeddings.csv`: [`index | publisher_id | 0 | 1 ... | ... 511`] stores all the 512-dimensional embeddings for each source under its `publisher_name`.

`source_similarity_t10.json` stores the top-10 most similar sources, with similarity score, for each source.
```
pip install -r requirements.txt
```

## Scripts
**source-feed-accumulator.py**: parses Brave News feed periodically, collecting articles for each source in `articles_history.csv`. For each article, we store the `publisher_id` attribute.

**sources-similarity-matrix.py**: takes as input the article history and produces a 512-dimensional embedding for each source, using the Universal Sentence Encoder model (https://arxiv.org/abs/1803.11175). Once an embedding is computed for each source, a source similarity matrix is produced.

## Running locally
To collect and accumulate article history:
```
export NO_UPLOAD=1
export NO_DOWNLOAD=1
python source-feed-accumulator.py
```

To computed source embeddings and produce the source similarity matrix:
```
export NO_UPLOAD=1
export NO_DOWNLOAD=1
python sources-similarity-matrix.py
```
9 changes: 9 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from multiprocessing import cpu_count

# Disable uploads to S3. Useful when running locally or in CI.
NO_UPLOAD = os.getenv('NO_UPLOAD', None)
Expand All @@ -13,6 +14,7 @@
LANG_REGION_MODEL_MAP = os.getenv('LANG_REGION_MODEL_MAP', [
('en_US', "https://tfhub.dev/google/universal-sentence-encoder/4"),
('en_CA', "https://tfhub.dev/google/universal-sentence-encoder/4"),
('en_GB', "https://tfhub.dev/google/universal-sentence-encoder/4"),
('es_ES', 'https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/3'),
('es_MX', 'https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/3'),
('pt_BR', 'https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/3'),
Expand All @@ -32,3 +34,10 @@
SOURCE_SIMILARITY_T10_HR = os.getenv('SOURCE_SIMILARITY_T10_HR', "source_similarity_t10_hr.{LANG_REGION}")

SOURCE_EMBEDDINGS = os.getenv('SOURCE_EMBEDDINGS', "SOURCE_EMBEDDINGS.{LANG_REGION}")

# Set the number of processes to spawn for all multiprocessing tasks.
CONCURRENCY = cpu_count()

if SENTRY_URL := os.getenv('SENTRY_URL'):
import sentry_sdk
sentry_sdk.init(dsn=SENTRY_URL, traces_sample_rate=0)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ numpy==1.23.5
pandas==1.5.1
requests==2.28.1
scipy==1.9.3
sentry-sdk==1.12.0
tensorflow==2.9.3
tensorflow_text==2.9.0
tensorflow_hub==0.12.0
Expand Down
12 changes: 11 additions & 1 deletion source-feed-accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pandas as pd
from structlog import get_logger
from tqdm import tqdm
from multiprocessing import Pool as ProcessPool

import config
from utils import download_file, upload_file
Expand All @@ -28,7 +29,9 @@ def accumulate_articles(articles, lang_region):
f.write('"' + '","'.join([title, description, publish_time, publisher_id]) + '"\n')


for lang_region, model in config.LANG_REGION_MODEL_MAP:
def main(lang_region_model_map):
lang_region, _ = lang_region_model_map

logger.info(f"Starting feeds accumulator for {lang_region}")

feed_file = f'{config.FEED_JSON_FILE.format(LANG_REGION=lang_region)}.json'
Expand Down Expand Up @@ -56,3 +59,10 @@ def accumulate_articles(articles, lang_region):
f"source-suggestions/{config.ARTICLE_HISTORY_FILE.format(LANG_REGION=lang_region)}")

logger.info("Finished sanitizing articles_history.")



if __name__ == '__main__':
with ProcessPool(config.CONCURRENCY) as pool:
for item in pool.imap_unordered(main, config.LANG_REGION_MODEL_MAP):
pass
121 changes: 77 additions & 44 deletions source-similarity-matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,49 +9,81 @@
import tensorflow_text
from structlog import get_logger
from tqdm import tqdm
from multiprocessing import Pool as ProcessPool

import config
from utils import download_file, upload_file

logger = get_logger()

def upload_source_sim_files(lang_region):
logger.info("Uploading sources sim files")
if not config.NO_UPLOAD:
upload_file(config.OUTPUT_DIR + "/" + f'/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json',
config.PUB_S3_BUCKET,
f"source-suggestions/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json")

upload_file(
config.OUTPUT_DIR + "/" + f'/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json',
config.PUB_S3_BUCKET,
f"source-suggestions/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json")

upload_file(config.OUTPUT_DIR + "/" + f'/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv',
config.PUB_S3_BUCKET,
f"source-suggestions/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv")

def clean_source_similarity_file(sources_data, sources_sim_data):
sources_id = [sources.get("publisher_id") for sources in sources_data]

for s_id in sources_id:
if s_id not in sources_sim_data:
sources_sim_data.pop(s_id, None)
continue

if s_id in sources_sim_data:
for index, suggestion in enumerate(sources_sim_data[s_id]):
if suggestion["source"] not in sources_id:
sources_sim_data[s_id].pop(index)

def embed(input):
return model(input)
return sources_sim_data


# Take centroid of 512-d embeddings
def get_source_representation_from_titles(titles):
source_repr = np.zeros((1, 512))
if len(titles) < config.MINIMUM_ARTICLE_HISTORY_SIZE:
return source_repr
def update_source_sim_files(model_lang):
def embed(input):
return model(input)

for title in titles:
source_repr += embed([title])[0]
norm_repr = tf.nn.l2_normalize(source_repr / len(titles), axis=1)
return norm_repr.numpy()
# Take centroid of 512-d embeddings
def get_source_representation_from_titles(titles):
source_repr = np.zeros((1, 512))
if len(titles) < config.MINIMUM_ARTICLE_HISTORY_SIZE:
return source_repr

for title in titles:
source_repr += embed([title])[0]
norm_repr = tf.nn.l2_normalize(source_repr / len(titles), axis=1)
return norm_repr.numpy()

def compute_source_similarity(source1, source2, t='dot'):
cosine_similarities = np.dot(source1, np.transpose(source2))
clip_cosine_similarity = tf.clip_by_value(cosine_similarities, -1.0, 1.0)
score = 1.0 - tf.acos(clip_cosine_similarity) / math.pi
return score

def compute_source_similarity(source1, source2, t='dot'):
cosine_similarities = np.dot(source1, np.transpose(source2))
clip_cosine_similarity = tf.clip_by_value(cosine_similarities, -1.0, 1.0)
score = 1.0 - tf.acos(clip_cosine_similarity) / math.pi
return score

def compute_source_representation_from_articles(articles_df, publisher_id):
publisher_bucket_df = articles_df[articles_df.publisher_id == publisher_id]

source_titles = [title for title in publisher_bucket_df.title.to_numpy() if title is not None]
return get_source_representation_from_titles(source_titles)
def compute_source_representation_from_articles(articles_df, publisher_id):
publisher_bucket_df = articles_df[articles_df.publisher_id == publisher_id]

source_titles = [title for title in publisher_bucket_df.title.to_numpy() if title is not None]
return get_source_representation_from_titles(source_titles)

def get_source_id_for_title(title, sources_df):
return sources_df[sources_df.publisher_name == title].publisher_id.to_numpy()[0]

def get_source_id_for_title(title, sources_df):
return sources_df[sources_df.publisher_name == title].publisher_id.to_numpy()[0]

# Compute similarity matrix for all existing LANG_REGION pairs
for lang_region, model_url in config.LANG_REGION_MODEL_MAP:

# Compute similarity matrix for all existing LANG_REGION pairs
lang_region, model_url = model_lang
logger.info(f"Started computing similarity matrix for {lang_region} using {model_url}")

pathlib.Path(config.OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -94,8 +126,7 @@ def get_source_id_for_title(title, sources_df):
logger.info(f"Computing sources representations for {lang_region}")
sources_representation = pd.DataFrame({'publisher_id': publisher_ids})
sources_representation = pd.concat([sources_representation, pd.DataFrame(reprs)], axis=1)
sources_representation.to_csv(
f'output/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv', header=None)
sources_representation.to_csv(f'output/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv', header=None)
logger.info("Finished building source embeddings.")

# For each source pair, compute pair similarity
Expand Down Expand Up @@ -130,30 +161,32 @@ def get_source_id_for_title(title, sources_df):
# Only include suggestion if within 10% of the best match's score
top_similarity_score = 0.0
if sources_ranking:
top_similarity_score = sources_ranking[0][1]
top_similarity_score = sources_ranking[0][1]
similarity_cutoff = config.SIMILARITY_CUTOFF_RATIO * top_similarity_score
top10_dictionary[source_id] = [{'source': get_source_id_for_title(source[0], sources_df), 'score': source[1]} for
top10_dictionary[source_id] = [{'source': get_source_id_for_title(source[0], sources_df), 'score': source[1]}
for
source in sources_ranking[:10] if source[1] > similarity_cutoff]
top10_dictionary_human_readable[feed] = [{'source': source[0], 'score': source[1]} for source in
sources_ranking[:10] if source[1] > similarity_cutoff]

logger.info("Removing un-matched sources")
top10_dictionary = clean_source_similarity_file(sources_data, top10_dictionary)

logger.info("Outputting sources similarities files")
with open(f'output/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json', 'w', encoding='utf-8') as f:
json.dump(top10_dictionary, f, ensure_ascii=True)
with open(f'output/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json', 'w', encoding='utf-8') as f:
json.dump(top10_dictionary_human_readable,
f, ensure_ascii=True)
logger.info("Script has finished running.")
with open(f'output/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json', 'w') as f:
json.dump(top10_dictionary, f)
with open(f'output/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json', 'w') as f:
json.dump(top10_dictionary_human_readable, f)

if not config.NO_UPLOAD:
upload_file(config.OUTPUT_DIR + "/" + f'/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json',
config.PUB_S3_BUCKET,
f"source-suggestions/{config.SOURCE_SIMILARITY_T10.format(LANG_REGION=lang_region)}.json")
upload_source_sim_files(lang_region)
logger.info(f"Updated similarity file for {lang_region}")

upload_file(config.OUTPUT_DIR + "/" + f'/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json',
config.PUB_S3_BUCKET,
f"source-suggestions/{config.SOURCE_SIMILARITY_T10_HR.format(LANG_REGION=lang_region)}.json")

upload_file(config.OUTPUT_DIR + "/" + f'/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv',
config.PUB_S3_BUCKET,
f"source-suggestions/{config.SOURCE_EMBEDDINGS.format(LANG_REGION=lang_region)}.csv")
if __name__ == '__main__':
# Compute similarity matrix for all existing LANG_REGION pairs
logger.info(f"Number of cpu : {config.CONCURRENCY}")
with ProcessPool(config.CONCURRENCY) as pool:
tqdm(pool.map(update_source_sim_files, config.LANG_REGION_MODEL_MAP),
total=len(config.LANG_REGION_MODEL_MAP))

logger.info("Script has finished running.")
5 changes: 4 additions & 1 deletion utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import mimetypes

import boto3
from botocore.exceptions import ClientError
Expand All @@ -17,9 +18,11 @@ def upload_file(file_name, bucket, object_name=None):
if object_name is None:
object_name = file_name
try:
content_type = mimetypes.guess_type(file_name)[0] or 'binary/octet-stream'
s3_client.upload_file(file_name, bucket, object_name, ExtraArgs={
'GrantRead': f'id={config.BRAVE_TODAY_CLOUDFRONT_CANONICAL_ID}',
'GrantFullControl': f'id={config.BRAVE_TODAY_CANONICAL_ID}'
'GrantFullControl': f'id={config.BRAVE_TODAY_CANONICAL_ID}',
'ContentType': content_type
})

except ClientError as e:
Expand Down