Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull upstream changes #1

Open
wants to merge 49 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
cf63f84
Fix mocking of entity.build_entity_query
amCap1712 May 26, 2022
166ba52
Setup real database for tests
amCap1712 May 7, 2022
fc5cf88
Fix SQLAlchemy warning to use text directly
amCap1712 May 7, 2022
b73e209
Remove volume mount from docker-compose.test.yml
amCap1712 May 18, 2022
55c5560
Optionally accept a session parameter in indexing functions
amCap1712 May 31, 2022
ce8bf44
Add indexing tests for entities using actual data
amCap1712 May 7, 2022
dff0a11
Improve test script
yvanzo May 31, 2022
bd99fd4
Upgrade SQLAlchemy (1/3) from 1.0.19 to 1.1.18
yvanzo Jan 23, 2020
7a2d2ad
Fix column type check in last_model_in_path
mwiencek Jan 23, 2020
f53b8b7
Fix debug message by creating list from generator
yvanzo Feb 3, 2020
129b2d4
Update psycopg requirement to -binary
reosarevok Aug 26, 2021
631b7ec
Update to 1.4, fix missing text() error
reosarevok Aug 30, 2021
8b16c86
Use load_only rather than many defer calls
reosarevok Aug 31, 2021
fce9789
Change relationship.table to relationship.mapper
amCap1712 Aug 31, 2021
e96bb3b
Document filter_valid_annotations
reosarevok Aug 31, 2021
34139b1
remove __tablename__ from column list
amCap1712 Sep 1, 2021
615ce79
Do not use @hybrid_property in paths
amCap1712 Sep 6, 2021
e668d96
Update SQLAlchemy documentation link
amCap1712 Sep 16, 2021
62b786e
Try fixing AttributeError for session
amCap1712 Jun 23, 2022
5d86c7a
Accept session attribute in index_entity and live_index_entity
amCap1712 Jun 23, 2022
73e2528
Document requiring MBDB materialized tables
yvanzo Oct 15, 2022
cef9055
Merge pull request #111 from yvanzo/sqlalchemy13
yvanzo Oct 15, 2022
2bbacca
SEARCH-675: Document using RabbitMQ (#139)
yvanzo Oct 15, 2022
2789d18
Fix deprecation warnings in SIR (#140)
amCap1712 Oct 17, 2022
2f8862c
Fix overlapping relationship SAWarning in custom models
amCap1712 Oct 16, 2022
9cb44fa
Fix implicitly coercing SELECT object warning
amCap1712 Oct 16, 2022
63aa1cb
Remove unused import
amCap1712 Oct 25, 2022
cc3edad
Revert "Use load_only rather than many defer calls"
amCap1712 Oct 25, 2022
6599b4c
Do not defer CompositeProperty
amCap1712 Oct 25, 2022
05c277d
Upgrade SQLAlchemy to latest version
amCap1712 Oct 27, 2022
99b3208
Eagerly load area fields
amCap1712 Oct 28, 2022
dd87d0d
Eagerly load artist.sort_name in event indexing
amCap1712 Oct 31, 2022
45085da
Use mapper attribute instead of hybrid property
amCap1712 Oct 31, 2022
da0da07
Use mapper attribute instead of hybrid property
amCap1712 Oct 31, 2022
16577db
Eagerly load artist_alias.gid in release group indexing (#148)
amCap1712 Nov 4, 2022
89ef1ab
Eagerly load artist.comment in release indexing
amCap1712 Nov 1, 2022
80954d6
Eagerly load packaging.gid in release indexing
amCap1712 Nov 1, 2022
a062672
Use mapper attribute instead of hybrid property
amCap1712 Nov 7, 2022
a596aa6
Eagerly load area0.type.name and area0.ended
amCap1712 Nov 7, 2022
ade0073
Eagerly load area attributes in place indexing
amCap1712 Nov 7, 2022
c93c640
Fix loading of release-group first_release_date
amCap1712 Nov 7, 2022
4b0cab7
Fix loading of recording first_release_date
amCap1712 Nov 7, 2022
a39d968
Fix loading of release amazon asin
amCap1712 Nov 7, 2022
17a8340
Eagerly load area_type related attributes for artist core
amCap1712 Nov 7, 2022
e9f6181
Eagerly load area attributes in label indexing
amCap1712 Nov 7, 2022
ee18b5a
Add test case for work that includes recording links
amCap1712 Nov 14, 2022
d322f80
Update extrapaths fields indexed for work
amCap1712 Nov 8, 2022
dc96c30
Avoid doing recording count in sql query for work
amCap1712 Nov 14, 2022
e9e6364
Amend 2bbacca6: Declare exchanges/queues on move (#159)
yvanzo May 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions config.test.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[database]
dbname = musicbrainz_test
host = musicbrainz_db
password =
port = 5432
user = musicbrainz

[solr]
uri = SKIP
batch_size = 60

[sir]
import_threads = 2
query_batch_size = 20000
wscompat = on

[rabbitmq]
host = SKIP
user = SKIP
password = SKIP
vhost = SKIP
prefetch_count = 350

[sentry]
dsn = SKIP
10 changes: 8 additions & 2 deletions docker/Dockerfile.test
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ FROM metabrainz/python:2.7-20220421
RUN mkdir /code
WORKDIR /code

ENV DOCKERIZE_VERSION v0.6.1
RUN wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
&& tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz

# Python dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends \
Expand All @@ -23,8 +27,10 @@ RUN pip install -r requirements.txt
RUN pip install -r requirements_dev.txt

COPY . /code/
RUN cp config.test.ini config.ini

CMD py.test --junitxml=/data/test_report.xml \
CMD dockerize -wait tcp://musicbrainz_db:5432 -timeout 600s \
bash -c "py.test --junitxml=/data/test_report.xml \
--cov=sir \
--cov-report xml:/data/coverage.xml \
--cov-report html:/data/coverage-html
--cov-report html:/data/coverage-html"
10 changes: 9 additions & 1 deletion docker/docker-compose.test.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
# Docker Compose file for testing
version: "2"
version: "3.8"
services:

test:
build:
context: ..
dockerfile: ./docker/Dockerfile.test
depends_on:
- musicbrainz_db

musicbrainz_db:
image: metabrainz/musicbrainz-test-database:production
environment:
POSTGRES_HOST_AUTH_METHOD: trust
PGDATA: /var/lib/postgresql-musicbrainz/data
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
autoclass_content = "both"

intersphinx_mapping = {'python': ('https://docs.python.org/2.7', None),
'sqla': ('http://docs.sqlalchemy.org/en/rel_1_0/', None),
'sqla': ('https://docs.sqlalchemy.org/en/14/', None),
'solr': ('https://pythonhosted.org//solrpy/', None),
'amqp': ('https://amqp.readthedocs.org/en/latest', None)}

Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Contents:
usage
import
queues
service/index
api

Indices and tables
Expand Down
6 changes: 6 additions & 0 deletions docs/source/service/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.. _service:

Service maintenance
===================

.. include:: rabbitmq.rst
105 changes: 105 additions & 0 deletions docs/source/service/rabbitmq.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
.. _rabbitmq:

RabbitMQ
--------

Maintenance
~~~~~~~~~~~

Requirements
++++++++++++

* Tolerance to connectivity issues:
When running in watch mode, losing connection to RabbitMQ can make the indexer
to stale indefinitely.
To recover, the container running the indexer has to be manually restarted.
See the ticket `SEARCH-678 <https://tickets.metabrainz.org/browse/SEARCH-678>`_
for follow-up on improving tolerance.
* Maintenance mode:
It doesn’t exist.
To perform maintenance operations, it requires switching to another instance
of RabbitMQ to prevent any data loss, even for a short period of time.
* Data importance:
The RabbitMQ instance is conveying notification messages about changes that
must be made to the search indexes.
If any message is lost, all search indexes would have to be rebuilt,
which currently takes hours and implies a downtime for searches.
See the ticket `SEARCH-674 <https://tickets.metabrainz.org/browse/SEARCH-674>`_
for follow-up on rebuilding with zero-downtime.
* Data persistence:
Messages are expected to be processed within seconds (or minutes during
activity peaks), so there is no need for persistent volumes.
Losing these messages isn’t critical either as search indexes can be
rebuilt in hours, so there is no need for backups either.

Procedures
++++++++++


* Start service:

See :ref:`amqp`

* Reload service configuration:

After:

* Check the indexer logs to ensure that it did not stale and that it continues
to process new messages.

* Stop service:

Before:

* Uninstall search triggers
* Stop the live indexer

It implies that search indexes will be outdated for good.
Updating search indexes requires to rebuild these and takes hours of downtime.

* Restart service:

It implies that search indexes will be likely missing some updates.
Updating search indexes requires to rebuild these and takes hours of downtime.

* Move service:

* Create vhost, user, permissions, queues in the new instance
* Declare exchanges and queues as described in :ref:`amqp`
* Update broker in PostgreSQL to point to the new instance
* Once the queues in the old instance are empty,
switch the live indexer to the new instance

Neiher data loss nor downtime will occur.

* Remove service:

Before:

* Uninstall search triggers
* Stop the live indexer

It implies that search indexes will be outdated for good.
Updating search indexes requires to rebuild these and takes hours of downtime.

Implementation details
~~~~~~~~~~~~~~~~~~~~~~

* Connectivity issues are reported through both Docker logs and Sentry.
* Producer and consumer are separate as follows:

* Producer is `pg_amqp` used by triggers in Postgres database.

* ack mode: transactional
* heartbeat timeout: (not using 0.8 version)
* message protocol version: 0.8

* Consumer is `sir` running in watch mode for live indexing.

* ack mode: basic/manual
* heartbeat timeout: (not configured/server’s default)
* message protocol version: 0.9.1

* There are known issues related to queues declaration; See :ref:`amqp`
* Connections are not named properly (just using proxy interface IP and port)

3 changes: 3 additions & 0 deletions docs/source/setup/amqp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ Database
Sir requires that you both install an extension into your MusicBrainz database
and add triggers to it.

It also requires to have built the materialized (or denormalized) tables
for the MusicBrainz database.

AMQP Extension
++++++++++++++

Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ backports.functools_lru_cache==1.0.1
enum34==1.1.6
git+https://github.com/amCap1712/[email protected]#egg=mbdata
git+https://github.com/metabrainz/[email protected]#egg=mb-rngpy
psycopg2==2.8.4
psycopg2-binary==2.8.4
retrying==1.3.3
pysolr==3.8.1
sqlalchemy==1.0.19
sqlalchemy==1.4.41
requests==2.22.0
ujson==1.35
sentry-sdk==1.3.1
Expand Down
1 change: 0 additions & 1 deletion requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pytest==4.6.9
pytest-cov==2.8.1
mock==3.0.5
pysqlite==2.8.3
4 changes: 2 additions & 2 deletions sir/amqp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def _index_by_fk(self, parsed_message):
# to update the related entities. For 'one to many' relationships, the related
# entity would have had an update trigger firing off to unlink the `index_entity`
# before `index_entity` itself is deleted, so we can ignore those.
relevant_rels = dict((r.table.name, (list(r.local_columns)[0].name, list(r.remote_side)[0]))
relevant_rels = dict((r.mapper.persist_selectable.name, (list(r.local_columns)[0].name, list(r.remote_side)[0]))
for r in class_mapper(index_model).mapper.relationships
if r.direction.name == 'MANYTOONE')
for core_name, path in update_map[parsed_message.table_name]:
Expand All @@ -398,7 +398,7 @@ def _index_by_fk(self, parsed_message):
related_model, new_path = second_last_model_in_path(entity.model, path)
related_table_name = ""
if related_model:
related_table_name = class_mapper(related_model).mapped_table.name
related_table_name = class_mapper(related_model).persist_selectable.name
if related_table_name in relevant_rels:
with db_session_ctx(self.db_session) as session:
select_query = None
Expand Down
21 changes: 13 additions & 8 deletions sir/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from logging import getLogger, DEBUG, INFO
from pysolr import SolrError
from sqlalchemy import and_
from sqlalchemy.orm import Session
from .util import SIR_EXIT
from ctypes import c_bool

Expand Down Expand Up @@ -188,9 +189,10 @@ def _index_entity_process_wrapper(args, live=False):
signal.signal(signal.SIGTERM, signal.SIG_DFL)

try:
session = Session(util.engine())
if live:
return live_index_entity(*args)
return index_entity(*args)
return live_index_entity(session, *args)
return index_entity(session, *args)
except Exception as exc:
logger.error("Failed to import %s with id in bounds %s",
args[0],
Expand All @@ -199,12 +201,13 @@ def _index_entity_process_wrapper(args, live=False):
raise


def index_entity(entity_name, bounds, data_queue):
def index_entity(session, entity_name, bounds, data_queue):
"""
Retrieve rows for a single entity type identified by ``entity_name``,
convert them to a dict with :func:`sir.indexing.query_result_to_dict` and
put the dicts into ``queue``.

:param sqlalchemy.orm.Session session:
:param str entity_name:
:param bounds:
:type bounds: (int, int)
Expand All @@ -217,15 +220,16 @@ def index_entity(entity_name, bounds, data_queue):
condition = and_(model.id >= lower_bound, model.id < upper_bound)
else:
condition = model.id >= lower_bound
_query_database(entity_name, condition, data_queue)
_query_database(session, entity_name, condition, data_queue)


def live_index_entity(entity_name, ids, data_queue):
def live_index_entity(session, entity_name, ids, data_queue):
"""
Retrieve rows for a single entity type identified by ``entity_name``,
convert them to a dict with :func:`sir.indexing.query_result_to_dict` and
put the dicts into ``queue``.

:param sqlalchemy.orm.Session session:
:param str entity_name:
:param [int] ids:
:param Queue.Queue data_queue:
Expand All @@ -234,10 +238,10 @@ def live_index_entity(entity_name, ids, data_queue):
return
condition = and_(SCHEMA[entity_name].model.id.in_(ids))
logger.debug("Importing %s new rows for entity %s", len(ids), entity_name)
_query_database(entity_name, condition, data_queue)
_query_database(session, entity_name, condition, data_queue)


def _query_database(entity_name, condition, data_queue):
def _query_database(session, entity_name, condition, data_queue):
"""
Retrieve rows for a single entity type identified by ``entity_name``,
convert them to a dict with :func:`sir.indexing.query_result_to_dict` and
Expand All @@ -254,7 +258,8 @@ def _query_database(entity_name, condition, data_queue):
search_entity = SCHEMA[entity_name]
model = search_entity.model
row_converter = search_entity.query_result_to_dict
with util.db_session_ctx(util.db_session()) as session:

with session:
query = search_entity.query.filter(condition).with_session(session)
total_records = 0
for row in query:
Expand Down
12 changes: 9 additions & 3 deletions sir/querying.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import logging


from sqlalchemy import func
from sqlalchemy import func, text
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.orm.interfaces import ONETOMANY, MANYTOONE
from sqlalchemy.orm.properties import RelationshipProperty
from sqlalchemy.ext.hybrid import HYBRID_PROPERTY


logger = logging.getLogger("sir")

Expand All @@ -29,6 +31,10 @@ def iterate_path_values(path, obj):
returned by the :func:`getattr` call will be returned and added to the
list of values for this field.

.. warning::

Hybrid attributes like @hybrid_property are currently not supported.

To give an example, lets presume the object we're starting with is an
instance of :class:`~mbdata.models.Artist` and the path is
"begin_area.name". The first :func:`getattr` call will be::
Expand Down Expand Up @@ -107,10 +113,10 @@ def iter_bounds(db_session, column, batch_size, importlimit):
from_self(column)

if batch_size > 1:
q = q.filter("rownum %% %d=1" % batch_size)
q = q.filter(text("rownum % :batch_size=1").bindparams(batch_size=batch_size))

if importlimit:
q = q.filter("rownum <= %d" % (importlimit))
q = q.filter(text("rownum <= :import_limit").bindparams(import_limit=importlimit))

intervals = [id for id in q]
bounds = []
Expand Down
Loading