Skip to content

Commit

Permalink
Fix DRM / Refactor schema loader (#55)
Browse files Browse the repository at this point in the history
* Adds a set of Descriptor classes.
* A DescriptorGetter class allows DMM/DRM to load both a list of directories and a list of descriptor dictionaries.
* Descriptor helper classes act as interfaces that encompass the relevant handling of the data type
* Dependency inject sqlalchemy base to enable testing
* Loads only json files - #44
* Add logging to conftest.
* Fixes DRM error.
  • Loading branch information
loganripplinger authored Mar 2, 2020
1 parent 3d5da74 commit 63f74df
Show file tree
Hide file tree
Showing 19 changed files with 1,019 additions and 166 deletions.
11 changes: 6 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
FROM python:3.7.3-slim
WORKDIR /data-resource
ADD migrations migrations
ADD schema schema
ADD alembic.ini alembic.ini
RUN apt-get update && apt-get install -y python3-dev build-essential &&\
pip install --upgrade pipenv
ADD Pipfile Pipfile
ADD Pipfile.lock Pipfile.lock
RUN apt-get update && apt-get install -y python3-dev build-essential &&\
pip install --upgrade pipenv && pipenv install --system &&\
RUN pipenv install --system &&\
apt-get remove -y python3-dev build-essential
ADD wsgi.py wsgi.py
ADD alembic.ini alembic.ini
ADD data_model_manager_runner.py data_model_manager_runner.py
ADD cmd.sh cmd.sh
RUN chmod a+x cmd.sh
ADD migrations migrations
ADD schema schema
ADD data_resource_api data_resource_api
ENTRYPOINT [ "/data-resource/cmd.sh" ]
2 changes: 1 addition & 1 deletion data_resource_api/api/v1_0_0/resource_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

class ResourceHandler(object):
def __init__(self):
self.logger = LogFactory.get_console_logger('data-model-manager')
self.logger = LogFactory.get_console_logger('resource-handler')

def build_json_from_object(self, obj: object, restricted_fields: dict = []):
resp = {key: str(value) if value is not None else '' for key, value in obj.__dict__.items(
Expand Down
138 changes: 57 additions & 81 deletions data_resource_api/app/data_model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
from data_resource_api.factories.table_schema_types import TABLESCHEMA_TO_SQLALCHEMY_TYPES
from data_resource_api.db import Base, Session, Log, Checksum
from data_resource_api.logging import LogFactory
from data_resource_api.utils import exponential_backoff
from data_resource_api.app.descriptor import (
Descriptor,
DescriptorFileHelper,
DescriptorFromFile,
DescriptorsGetter)


class DataModelDescriptor(object):
Expand All @@ -46,12 +52,16 @@ class DataModelManagerSync(object):
"""

def __init__(self):
def __init__(self, base: object = Base, descriptors: list = []):
self.app_config = ConfigurationFactory.from_env()
self.data_model_descriptors: DataModelDescriptor = []
self.orm_factory = ORMFactory()
self.orm_factory = ORMFactory(base)
self.logger = LogFactory.get_console_logger('data-model-manager')

self.descriptor_directories = []
self.descriptor_directories.append(self.get_data_resource_schema_path())
self.custom_descriptors = descriptors

def run(self):
self.initalize_base_models()
self.restore_models_from_database()
Expand All @@ -70,20 +80,13 @@ def initalize_base_models(self):
max_retries = 10
retries = 0

# needs unit tests
def sleep_exponential_backoff(wait_time, exponential_rate):
def wait_func():
nonlocal wait_time
wait_time *= exponential_rate
sleep(wait_time)
return wait_func

exponential_sleep = sleep_exponential_backoff(1, 1.5)
exponential_time = exponential_backoff(1, 1.5)

while not db_active and retries <= max_retries:
if retries != 0:
self.logger.info(f'Sleeping for {retry_wait} seconds...')
exponential_sleep()
sleep_time = exponential_time()
self.logger.info(f'Sleeping for {sleep_time} with exponential backoff...')
sleep(sleep_time)

retries += 1

Expand Down Expand Up @@ -113,19 +116,26 @@ def wait_func():

self.logger.info('Base models initalized.')

def restore_models_from_database(self):
# TODO integration test
def restore_models_from_database(self) -> None:
"""This method will load all stored descriptor files from DB
into SQL Alchemy ORM models.
"""
# query database for all jsonb in checksum table
json_descriptor_list = self.get_stored_descriptors()

# if json_descriptor_list is empty can we return?

# load each item into our models
for descriptor in json_descriptor_list:
table_name, table_schema, api_schema = self.split_metadata_from_descriptor(descriptor)
data_model = self.orm_factory.create_orm_from_dict(
table_schema, table_name, api_schema)
self.load_descriptor_into_sql_alchemy_model(descriptor)

return
def load_descriptor_into_sql_alchemy_model(self, descriptor: dict) -> None:
desc = Descriptor(descriptor)
table_schema = desc.table_schema
table_name = desc.table_name
api_schema = desc.api_schema

data_model = self.orm_factory.create_orm_from_dict(
table_schema, table_name, api_schema)

def get_sleep_interval(self):
"""Retrieve the thread's sleep interval.
Expand Down Expand Up @@ -272,31 +282,23 @@ def get_model_checksum(self, table_name: str):
session.close()
return checksum

def get_stored_descriptors(self):
def get_stored_descriptors(self) -> list:
"""
Gets stored json models from database.
Returns:
list: List of JSON dictionaries
"""
session = Session()
descriptor_list = []
descriptor_list = [] # list of json dict
try:
query = session.query(Checksum)
for _row in query.all():
descriptor_list.append(_row.descriptor_json) # may want to just yield this?
descriptor_list.append(_row.descriptor_json)
except Exception as e:
self.logger.exception('Error retrieving stored models')
session.close()

#
# move this to its own function that restore_models_from_database is calling
# and take the yield from get_stored_models for the in variable
#
# load each item into a json object and put in a list
# json_descriptors_list = []
# for descriptor in descriptor_list:
# descriptor_dict = json.load(descriptor)
# json_descriptors_list.append(descriptor_dict)

return descriptor_list

def get_alembic_config(self):
Expand Down Expand Up @@ -354,45 +356,15 @@ def monitor_data_models(self):
responsbility of iterating through a directory to find schema files to load.
"""
self.logger.info('Checking data models')
schema_dir = self.get_data_resource_schema_path()

# Do some error checking on the provided path
if not os.path.exists(schema_dir) or not os.path.isdir(schema_dir):
self.logger.exception(
f"Unable to locate schema directory '{schema_dir}'")

# iterate over every descriptor file
schemas = os.listdir(schema_dir)
for schema in schemas:
# ignore folders
if os.path.isdir(os.path.join(schema_dir, schema)):
self.logger.exception(
f"Cannot open a nested schema directory '{schema}'")
continue

# Open the file and store its json data and file name
try:
with open(os.path.join(schema_dir, schema), 'r') as fh:
schema_dict = json.load(fh)

schema_filename = schema
except Exception as e:
self.logger.exception(
f"Error loading json from schema file '{schema}'")

# Pass the json data and filename to the worker function
self.work_on_schema(schema_dict, schema_filename)
descriptors = DescriptorsGetter(self.descriptor_directories, self.custom_descriptors)
for descriptor in descriptors.iter_descriptors():
self.process_descriptor(descriptor)

self.logger.info('Completed check of data models')

def split_metadata_from_descriptor(self, schema_dict: dict):
table_name = schema_dict['datastore']['tablename']
table_schema = schema_dict['datastore']['schema']
api_schema = schema_dict['api']['methods'][0]

return table_name, table_schema, api_schema

def work_on_schema(self, schema_dict: dict, schema_filename: str):
# TODO refactor this into smaller functions
def process_descriptor(self, schema_dict: object):
"""Operate on a schema dict for data model changes.
Note:
Expand All @@ -402,11 +374,14 @@ def work_on_schema(self, schema_dict: dict, schema_filename: str):
new Alembic migrations to meet these changes. The data models will then have
to be reconstructed by each individual worker.
"""
self.logger.info(f"Looking at {schema_filename}")
schema_filename = schema_dict.file_name
self.logger.debug(f"Looking at {schema_filename}")

try:
# Extract data from the json
table_name, table_schema, api_schema = self.split_metadata_from_descriptor(schema_dict)
# Extract data for easier use
table_name = schema_dict.table_name
table_schema = schema_dict.table_schema
api_schema = schema_dict.api_schema

# calculate the checksum for this json
model_checksum = md5(
Expand All @@ -416,22 +391,22 @@ def work_on_schema(self, schema_dict: dict, schema_filename: str):
).encode('utf-8')
).hexdigest()

self.logger.info('Pre: ' + str(Base.metadata.tables.keys()))
self.logger.debug('Pre: ' + str(Base.metadata.tables.keys()))

# Check if data model exists by checking if we have stored metadata about it
if self.data_model_exists(schema_filename):
self.logger.info(f"{schema_filename}: Found existing.")
self.logger.debug(f"{schema_filename}: Found existing.")
# check if the cached db checksum has changed from the new file checksum
if not self.data_model_changed(schema_filename, model_checksum):
self.logger.info(f"{schema_filename}: Unchanged.")
self.logger.debug(f"{schema_filename}: Unchanged.")
return

self.logger.info(f"{schema_filename}: Found changed.")
self.logger.debug(f"{schema_filename}: Found changed.")

# Get the index for this descriptor within our local metadata
data_model_index = self.get_data_model_index(
schema_filename)

# Create the sql alchemy orm
data_model = self.orm_factory.create_orm_from_dict(
table_schema, table_name, api_schema)
Expand All @@ -443,13 +418,13 @@ def work_on_schema(self, schema_dict: dict, schema_filename: str):
table_name, model_checksum)
del data_model

self.logger.info('Post1: ' + Base.metadata.tables.keys())
self.logger.debug('Post1: ' + Base.metadata.tables.keys())

# store metadata for descriptor locally
self.data_model_descriptors[data_model_index].model_checksum = model_checksum

else:
self.logger.info(f"{schema_filename}: Unseen before now.")
self.logger.debug(f"{schema_filename}: Unseen before now.")
# Create the metadata store for descriptor
data_model_descriptor = DataModelDescriptor(
schema_filename, table_name, model_checksum)
Expand All @@ -472,16 +447,17 @@ def work_on_schema(self, schema_dict: dict, schema_filename: str):
self.revision(table_name)
self.upgrade()
self.add_model_checksum(
table_name, model_checksum, schema_dict)
table_name, model_checksum, schema_dict.descriptor)

del data_model # this can probably be removed?

self.logger.info('Post2: ' + str(Base.metadata.tables.keys()))
self.logger.debug('Post2: ' + str(Base.metadata.tables.keys()))

except Exception as e:
self.logger.exception(
f"Error loading data resource schema '{schema_filename}'")

self.logger.info('Post3: ' + str(Base.metadata.tables.keys()))
self.logger.debug('Post3: ' + str(Base.metadata.tables.keys()))


class DataModelManager(Thread, DataModelManagerSync):
Expand Down
Loading

0 comments on commit 63f74df

Please sign in to comment.