From e4b79f36b4f59819ec98a29d57b55db02c53ae67 Mon Sep 17 00:00:00 2001 From: cl117 Date: Tue, 27 Aug 2024 11:05:09 -0600 Subject: [PATCH] split utils into 5 classes --- flask/cluster.py | 38 +++--- flask/configManager.py | 63 +++++++++ flask/dataManager.py | 76 +++++++++++ flask/elasticsearchManager.py | 17 +++ flask/explorer.py | 86 ++++++------ flask/index.py | 48 ++++--- flask/logger.py | 39 ++++++ flask/pagerank.py | 27 ++-- flask/query.py | 31 +++-- flask/search.py | 32 +++-- flask/sequencesearch.py | 14 +- flask/utils.py | 249 ---------------------------------- flask/wor_client.py | 19 +++ 13 files changed, 369 insertions(+), 370 deletions(-) create mode 100644 flask/configManager.py create mode 100644 flask/dataManager.py create mode 100644 flask/elasticsearchManager.py create mode 100644 flask/logger.py delete mode 100644 flask/utils.py create mode 100644 flask/wor_client.py diff --git a/flask/cluster.py b/flask/cluster.py index 02da210..967a346 100644 --- a/flask/cluster.py +++ b/flask/cluster.py @@ -1,19 +1,21 @@ from xml.etree import ElementTree import subprocess -import utils +from configManager import ConfigManager +from logger import Logger import query from sys import platform - -uclust_identity = utils.get_config()['uclust_identity'] # how similar sequences in the same cluster must be +config_manager = ConfigManager() +uclust_identity = config_manager.load_config()['uclust_identity'] # how similar sequences in the same cluster must be +logger_ = Logger() sequences_filename = 'dumps/sequences.fsa' -if 'which_search' not in utils.get_config(): - explorerConfig = utils.get_config() +if 'which_search' not in config_manager.load_config(): + explorerConfig = config_manager.load_config() explorerConfig['which_search'] = 'vsearch' - utils.set_config(explorerConfig) + config_manager.load_config(explorerConfig) -whichSearch = utils.get_config()['which_search'] +whichSearch = config_manager.load_config()['which_search'] if platform == "linux" or platform == "linux2": if whichSearch == 'usearch': @@ -26,7 +28,7 @@ elif whichSearch == 'vsearch': usearch_binary_filename = 'usearch/vsearch_macos' else: - utils.log("Sorry, your OS is not supported for sequence based-search.") + logger_.log("Sorry, your OS is not supported for sequence based-search.") uclust_results_filename = 'usearch/uclust_results.uc' @@ -56,7 +58,7 @@ def run_uclust(): popen = subprocess.Popen(args, stdout=subprocess.PIPE) popen.wait() output = popen.stdout.read() - utils.log_indexing(str(output)) + logger_.log(str(output), True) def analyze_uclust(): @@ -80,11 +82,11 @@ def analyze_uclust(): hits += 1 f.close() - utils.log_indexing('parts: ' + str(total_parts)) - utils.log_indexing('hits: ' + str(hits)) + logger_.log('parts: ' + str(total_parts), True) + logger_.log('hits: ' + str(hits), True) if hits > 0: - utils.log_indexing('average hit identity: ' + str(total_identity / hits)) + logger_.log('average hit identity: ' + str(total_identity / hits), True) def uclust2uris(fileName): @@ -138,17 +140,17 @@ def uclust2clusters(): def update_clusters(): - utils.log_indexing('------------ Updating clusters ------------') - utils.log_indexing('******** Query for sequences ********') + logger_.log('------------ Updating clusters ------------', True) + logger_.log('******** Query for sequences ********', True) sequences_response = query.query_sparql(sequence_query) - utils.log_indexing('******** Query for sequences complete ********') + logger_.log('******** Query for sequences complete ********', True) write_fasta(sequences_response) - utils.log_indexing('******** Running uclust ********') + logger_.log('******** Running uclust ********', True) run_uclust() - utils.log_indexing('******** Running uclust complete ********') + logger_.log('******** Running uclust complete ********', True) analyze_uclust() - utils.log_indexing('------------ Successsfully updated clusters ------------\n') + logger_.log('------------ Successsfully updated clusters ------------\n', True) return uclust2clusters() diff --git a/flask/configManager.py b/flask/configManager.py new file mode 100644 index 0000000..096ff41 --- /dev/null +++ b/flask/configManager.py @@ -0,0 +1,63 @@ +import json +import datetime + +class ConfigManager: + def __init__(self, config_file='config.json'): + self.config_file = config_file + self._config = None + + def load_config(self): + """ + Gets a copy of the config file + Returns: Config file in JSON + + """ + if self._config is None: + with open(self.config_file) as f: + self._config = json.load(f) + return self._config + + def save_config(self, new_config): + """ + Overwrites the existing config with a new config file + Args: + new_config: New config file with the updated information + Returns: + """ + config = self.load_config() + config.update(new_config) + with open(self.config_file, 'w') as f: + json.dump(config, f) + + def save_time(self, attribute): + """ + Saves the current time to an attribute in the config + Args: + attribute: Config attribute to save current time to + + Returns: + + """ + config = self.load_config() + config[attribute] = datetime.datetime.now().isoformat() + self.save_config(config) + + def get_es_endpoint(self): + return self.load_config().get('elasticsearch_endpoint') + + def save_update_end_time(self): + """ + Save end time of indexing + Returns: + + """ + return self.save_time("last_update_end") + + + def save_update_start_time(self): + """ + Save start time of indexing + Returns: + + """ + return self.save_time("last_update_start") diff --git a/flask/dataManager.py b/flask/dataManager.py new file mode 100644 index 0000000..82e50e0 --- /dev/null +++ b/flask/dataManager.py @@ -0,0 +1,76 @@ +import pickle +import os +class DataManager: + def __init__(self, clusters_filename='dumps/clusters_dump', uri2rank_filename='dumps/uri2rank_dump'): + self.clusters_filename = clusters_filename + self.uri2rank_filename = uri2rank_filename + self._clusters = None + self._uri2rank = None + + def save_clusters(self, clusters): + """ + Save clusters of parts + Args: + new_clusters: Clusters to be saved + + Returns: + + """ + self._clusters = clusters + self._serialize(self._clusters, self.clusters_filename) + + def get_clusters(self): + if self._clusters is None: + self._clusters = self._deserialize(self.clusters_filename) + return self._clusters + + def save_uri2rank(self, uri2rank): + """ + Saves the pagerank of all URI's + Args: + new_uri2rank: + + Returns: + + """ + self._uri2rank = uri2rank + self._serialize(self._uri2rank, self.uri2rank_filename) + + def get_uri2rank(self): + """ + Gets all pageranks of URI's + Returns: + + """ + if self._uri2rank is None: + self._uri2rank = self._deserialize(self.uri2rank_filename) + return self._uri2rank + + @staticmethod + def _serialize(data, filename): + """ + Serializes some data to a file + Args: + data: Data to be written + filename: File to be written to + + Returns: + + """ + with open(filename, 'wb') as f: + pickle.dump(data, f) + + @staticmethod + def _deserialize(filename): + """ + Deserializes data from a serialized file + Args: + filename: Serialized file + + Returns: Deserialized data from file + + """ + if os.path.exists(filename): + with open(filename, 'rb') as f: + return pickle.load(f) + return {} diff --git a/flask/elasticsearchManager.py b/flask/elasticsearchManager.py new file mode 100644 index 0000000..985dc94 --- /dev/null +++ b/flask/elasticsearchManager.py @@ -0,0 +1,17 @@ +from elasticsearch import Elasticsearch + +class ElasticsearchManager: + def __init__(self, config_manager): + self.config_manager = config_manager + self._es = None + + def get_es(self): + """ + Gets an instance of elasticsearch + Returns: The instance of elasticsearch + """ + if self._es is None: + self._es = Elasticsearch([self.config_manager.get_es_endpoint()], verify_certs=True) + if not self._es.ping(): + raise ValueError('Elasticsearch connection failed') + return self._es \ No newline at end of file diff --git a/flask/explorer.py b/flask/explorer.py index c10f8e9..93ae178 100644 --- a/flask/explorer.py +++ b/flask/explorer.py @@ -7,20 +7,28 @@ import logging import threading import time -from flask_debugtoolbar import DebugToolbarExtension -from flask_debugtoolbar_lineprofilerpanel.profile import line_profile - import cluster import pagerank import index import search -import utils import query +from configManager import ConfigManager +from dataManager import DataManager +from elasticsearchManager import ElasticsearchManager +from logger import Logger + +from flask_debugtoolbar import DebugToolbarExtension +from flask_debugtoolbar_lineprofilerpanel.profile import line_profile # Configure logging, This will affect all loggers in your application, not just the Werkzeug logger. log = logging.getLogger('werkzeug') log.setLevel(logging.ERROR) +config_manager = ConfigManager() +data_manager = DataManager() +elasticsearch_manager = ElasticsearchManager(config_manager) +logger_ = Logger() + app = Flask(__name__) app.config.update( SECRET_KEY='your-secret-key', # Required for the debug toolbar @@ -54,11 +62,11 @@ def handle_error(e): @app.before_first_request def startup(): def auto_update_index(): - update_interval = int(utils.get_config().get('updateTimeInDays', 0)) * 86400 + update_interval = int(config_manager.load_config().get('updateTimeInDays', 0)) * 86400 while True: time.sleep(update_interval) # Implement your update logic here - if utils.get_config().get('autoUpdateIndex', False): + if config_manager.load_config().get('autoUpdateIndex', False): update_index() # Start the background thread for auto-updating the index @@ -70,63 +78,61 @@ def auto_update_index(): if os.path.exists(log_file) and os.path.getsize(log_file) > 20000000: # 20 MB os.remove(log_file) - utils.log('SBOLExplorer started :)') + logger_.log('SBOLExplorer started :)') # Check and create index if necessary try: - es = utils.get_es() - index_name = utils.get_config().get('elasticsearch_index_name') + es = elasticsearch_manager.get_es() + index_name = config_manager.load_config().get('elasticsearch_index_name') if not es.indices.exists(index=index_name): - utils.log('Index not found, creating new index.') + logger_.log('Index not found, creating new index.') update_index() except Exception as e: log.error(f'Error during startup: {e}') raise def update_index(): - utils.log_indexing('============ STARTING INDEXING ============\n\n') - utils.log('============ STARTING INDEXING ============\n\n') - utils.save_update_start_time() + logger_.log('============ STARTING INDEXING ============\n\n', True) + config_manager.save_update_start_time() clusters = cluster.update_clusters() - utils.save_clusters(clusters) + data_manager.save_clusters(clusters) uri2rank = pagerank.update_pagerank() - utils.save_uri2rank(uri2rank) + data_manager.save_uri2rank(uri2rank) - index.update_index(utils.get_uri2rank()) + index.update_index(data_manager.get_uri2rank()) query.memoized_query_sparql.cache_clear() - utils.log_indexing('Cache cleared') + logger_.log('Cache cleared', True) - utils.save_update_end_time() - utils.log_indexing('============ INDEXING COMPLETED ============\n\n') - utils.log('============ INDEXING COMPLETED ============\n\n') + config_manager.save_update_end_time() + logger_.log('============ INDEXING COMPLETED ============\n\n', True) @app.route('/info', methods=['GET']) def info(): - utils.log('Explorer up!!! Virtuoso ' + str(query.memoized_query_sparql.cache_info())) - return utils.get_log() + logger_.log('Explorer up!!! Virtuoso ' + str(query.memoized_query_sparql.cache_info())) + return logger_.get_log() @app.route('/indexinginfo', methods=['GET']) def indexinginfo(): - return utils.get_indexing_log() + return logger_.get_indexing_log() @app.route('/config', methods=['POST', 'GET']) def config_route(): if request.method == 'POST': new_config = request.get_json() - utils.set_config(new_config) - utils.log('Successfully updated config') + config_manager.save_config(new_config) + logger_.log('Successfully updated config') - return jsonify(utils.get_config()) + return jsonify(config_manager.load_config()) @app.route('/update', methods=['GET']) def update(): try: subject = request.args.get('subject') if subject: - index.refresh_index(subject, utils.get_uri2rank()) + index.refresh_index(subject, data_manager.get_uri2rank()) success_message = f'Successfully refreshed: {subject}' else: update_index() @@ -140,9 +146,9 @@ def update(): def incremental_update(): try: updates = request.get_json() - index.incremental_update(updates, utils.get_uri2rank()) + index.incremental_update(updates, data_manager.get_uri2rank()) success_message = 'Successfully incrementally updated parts' - utils.log(success_message) + logger_.log(success_message) return success_message except Exception as e: log.error(f'Error during incremental update: {e}') @@ -154,7 +160,7 @@ def incremental_remove(): subject = request.args.get('subject') index.incremental_remove(subject) success_message = f'Successfully incrementally removed: {subject}' - utils.log(success_message) + logger_.log(success_message) return success_message except Exception as e: log.error(f'Error during incremental remove: {e}') @@ -167,7 +173,7 @@ def incremental_remove_collection(): uri_prefix = request.args.get('uriPrefix') index.incremental_remove_collection(subject, uri_prefix) success_message = f'Successfully incrementally removed collection and members: {subject}' - utils.log(success_message) + logger_.log(success_message) return success_message except Exception as e: log.error(f'Error during incremental remove collection: {e}') @@ -182,8 +188,8 @@ def SBOLExplore_test_endpoint(): @line_profile def sparql_search_endpoint(): try: - es = utils.get_es() - index_name = utils.get_config().get('elasticsearch_index_name') + es = elasticsearch_manager.get_es() + index_name = config_manager.load_config().get('elasticsearch_index_name') if not es.indices.exists(index=index_name) or es.cat.indices(format='json')[0]['health'] == 'red': abort(503, 'Elasticsearch is not working or the index does not exist.') @@ -192,15 +198,15 @@ def sparql_search_endpoint(): default_graph_uri = request.args.get('default-graph-uri') response = jsonify(search.search( sparql_query, - utils.get_uri2rank(), - utils.get_clusters(), + data_manager.get_uri2rank(), + data_manager.get_clusters(), default_graph_uri )) return response return "

Welcome to SBOLExplorer!

The available indices in Elasticsearch are shown below:


"\ - + str(utils.get_es().cat.indices(format='json'))\ + + str(elasticsearch_manager.get_es().cat.indices(format='json'))\ + "

The config options are set to:


"\ - + str(utils.get_config())\ + + str(config_manager.load_config())\ + "



Visit our GitHub repository!"\ + "

Any issues can be reported to our issue tracker."\ + "

Used by SynBioHub." @@ -212,8 +218,8 @@ def sparql_search_endpoint(): @app.route('/search', methods=['GET']) def search_by_string(): try: - es = utils.get_es() - index_name = utils.get_config().get('elasticsearch_index_name') + es = elasticsearch_manager.get_es() + index_name = config_manager.load_config().get('elasticsearch_index_name') if not es.indices.exists(index=index_name) or es.cat.indices(format='json')[0]['health'] == 'red': abort(503, 'Elasticsearch is not working or the index does not exist.') @@ -225,4 +231,4 @@ def search_by_string(): raise if __name__ == "__main__": - app.run(debug=True) + app.run(debug=True) # threaded=True diff --git a/flask/index.py b/flask/index.py index cb38309..0762a72 100644 --- a/flask/index.py +++ b/flask/index.py @@ -1,7 +1,13 @@ from elasticsearch import helpers -import utils +from configManager import ConfigManager +from elasticsearchManager import ElasticsearchManager import query import json +from logger import Logger + +config_manager = ConfigManager() +elasticsearch_manager = ElasticsearchManager(config_manager) +logger_ = Logger() def add_pagerank(parts_response, uri2rank): """ @@ -94,9 +100,9 @@ def create_parts_index(index_name): index_name {String} -- Name of the new index """ - if utils.get_es().indices.exists(index_name): - utils.log_indexing('Index already exists -> deleting') - utils.get_es().indices.delete(index=index_name) + if elasticsearch_manager.get_es().indices.exists(index_name): + logger_.log('Index already exists -> deleting', True) + elasticsearch_manager.get_es().indices.delete(index=index_name) body = { 'mappings': { @@ -116,8 +122,8 @@ def create_parts_index(index_name): } } - utils.get_es().indices.create(index=index_name, body=body) - utils.log_indexing('Index created') + elasticsearch_manager.get_es().indices.create(index=index_name, body=body) + logger_.log('Index created', True) def bulk_index_parts(parts_response, index_name): @@ -143,12 +149,12 @@ def bulk_index_parts(parts_response, index_name): actions.append(action) - utils.log_indexing('Bulk indexing') + logger_.log('Bulk indexing', True) try: - stats = helpers.bulk(utils.get_es(), actions) - utils.log_indexing('Bulk indexing complete') + stats = helpers.bulk(elasticsearch_manager.get_es(), actions) + logger_.log('Bulk indexing complete', True) except: - utils.log_indexing('[ERROR] Error_messages: ' + '\n'.join(stats[1])) + logger_.log('[ERROR] Error_messages: ' + '\n'.join(stats[1]), True) raise Exception("Bulk indexing failed") def update_index(uri2rank): @@ -160,15 +166,15 @@ def update_index(uri2rank): Returns: """ - index_name = utils.get_config()['elasticsearch_index_name'] + index_name = config_manager.load_config()['elasticsearch_index_name'] - utils.log_indexing('------------ Updating index ------------') + logger_.log('------------ Updating index ------------', True) - utils.log_indexing('******** Query for parts ********') + logger_.log('******** Query for parts ********', True) parts_response = query.query_parts(indexing = True) - utils.log_indexing('******** Query for parts complete ********') + logger_.log('******** Query for parts complete ********', True) - utils.log_indexing('******** Adding parts to new index ********') + logger_.log('******** Adding parts to new index ********', True) add_pagerank(parts_response, uri2rank) add_keywords(parts_response) add_roles(parts_response) @@ -176,9 +182,9 @@ def update_index(uri2rank): create_parts_index(index_name) bulk_index_parts(parts_response, index_name) - utils.log_indexing('******** Finished adding ' + str(len(parts_response)) + ' parts to index ********') + logger_.log('******** Finished adding ' + str(len(parts_response)) + ' parts to index ********', True) - utils.log_indexing('------------ Successfully updated index ------------\n') + logger_.log('------------ Successfully updated index ------------\n', True) def delete_subject(subject): @@ -190,7 +196,7 @@ def delete_subject(subject): Returns: """ - index_name = utils.get_config()['elasticsearch_index_name'] + index_name = config_manager.load_config()['elasticsearch_index_name'] body = { 'query': { @@ -202,13 +208,13 @@ def delete_subject(subject): }, 'conflicts': 'proceed' } - utils.get_es().delete_by_query(index=index_name, doc_type=index_name, body=body) + elasticsearch_manager.get_es().delete_by_query(index=index_name, doc_type=index_name, body=body) def index_part(part): delete_subject(part['subject']) - index_name = utils.get_config()['elasticsearch_index_name'] - utils.get_es().index(index=index_name, doc_type=index_name, id=part['subject'], body=part) + index_name = config_manager.load_config()['elasticsearch_index_name'] + elasticsearch_manager.get_es().index(index=index_name, doc_type=index_name, id=part['subject'], body=part) def refresh_index(subject, uri2rank): diff --git a/flask/logger.py b/flask/logger.py new file mode 100644 index 0000000..00c7259 --- /dev/null +++ b/flask/logger.py @@ -0,0 +1,39 @@ +import datetime +import os +class Logger: + def __init__(self, log_file='log.txt', indexing_log_file='indexing_log.txt'): + self.log_file = log_file + self.indexing_log_file = indexing_log_file + + def log(self, message, to_indexing_log=False): + """ + Writes a message to the log + Args: + message: Message to write + + Returns: + """ + log_message = f'[{datetime.datetime.now().isoformat()}] {message}\n' + print(log_message, end='') # Output to console + + file = self.indexing_log_file if to_indexing_log else self.log_file + with open(file, 'a+') as f: + f.write(log_message) + + def get_log(self): + """ + Gets a copy of the log + Returns: Stream from the read() method + + """ + return self._read_file(self.log_file) + + def get_indexing_log(self): + return self._read_file(self.indexing_log_file) + + @staticmethod + def _read_file(filename): + if os.path.exists(filename): + with open(filename, 'r') as f: + return f.read() + return "" diff --git a/flask/pagerank.py b/flask/pagerank.py index 816c9dd..2b319f1 100644 --- a/flask/pagerank.py +++ b/flask/pagerank.py @@ -1,8 +1,11 @@ from xml.etree import ElementTree import numpy as np -import utils import query +from logger import Logger +from configManager import ConfigManager +config_manager = ConfigManager() +logger_ = Logger() link_query = ''' SELECT DISTINCT ?parent ?child @@ -116,7 +119,7 @@ def pagerank(g, s=0.85, tolerance=0.001): if n == 0: - utils.log_indexing('no iterations: empty graph') + logger_.log('no iterations: empty graph', True) return p iteration = 1 @@ -134,7 +137,7 @@ def pagerank(g, s=0.85, tolerance=0.001): new_p = v / np.sum(v) delta = np.sum(np.abs(p - new_p)) - utils.log_indexing('Iteration ' + str(iteration) + ': L1 norm delta is ' + str(delta)) + logger_.log('Iteration ' + str(iteration) + ': L1 norm delta is ' + str(delta), True) p = new_p iteration += 1 @@ -155,22 +158,22 @@ def make_uri2rank(pr_vector, uri2index): def update_pagerank(): - utils.log_indexing('------------ Updating pagerank ------------') - utils.log_indexing('******** Query for uris ********') + logger_.log('------------ Updating pagerank ------------', True) + logger_.log('******** Query for uris ********', True) uri_response = query.query_sparql(uri_query) - utils.log_indexing('******** Query for uris complete ********') + logger_.log('******** Query for uris complete ********', True) adjacency_list = populate_uris(uri_response) - utils.log_indexing('******** Query for links ********') + logger_.log('******** Query for links ********', True) link_response = query.query_sparql(link_query) - utils.log_indexing('******** Query for links complete ********') + logger_.log('******** Query for links complete ********', True) populate_links(link_response, adjacency_list) g = graph(adjacency_list) - utils.log_indexing('******** Running pagerank ********') - pr = pagerank(g, tolerance=float(utils.get_config()['pagerank_tolerance'])) - utils.log_indexing('******** Running pagerank complete ********') - utils.log_indexing('------------ Successfully updated pagerank ------------\n') + logger_.log('******** Running pagerank ********', True) + pr = pagerank(g, tolerance=float(config_manager.load_config()['pagerank_tolerance'])) + logger_.log('******** Running pagerank complete ********', True) + logger_.log('------------ Successfully updated pagerank ------------\n', True) pr_vector = np.squeeze(np.asarray(pr)) # after squeeze, make sure it at least has a dimension in the case that there is only one element diff --git a/flask/query.py b/flask/query.py index 75c53d1..e7a54b8 100755 --- a/flask/query.py +++ b/flask/query.py @@ -2,8 +2,15 @@ import urllib.parse from functools import lru_cache import json -import utils +from wor_client import WORClient import re +from configManager import ConfigManager +from logger import Logger + +config_manager = ConfigManager() +logger_ = Logger() +wor_client_ = WORClient() + def query_parts(_from = '', criteria = '', indexing = False): @@ -67,10 +74,10 @@ def query_sparql(query): Returns: """ - endpoints = [utils.get_config()['sparql_endpoint']] + endpoints = [config_manager.load_config()['sparql_endpoint']] - if utils.get_config()['distributed_search']: - instances = utils.get_wor() + if config_manager.load_config()['distributed_search']: + instances = wor_client_.get_wor_instance() for instance in instances: endpoints.append(instance['instanceUrl'] + '/sparql?') @@ -80,7 +87,7 @@ def query_sparql(query): try: results.extend(page_query(query, endpoint)) except: - utils.log('[ERROR] failed querying:' + endpoint) + logger_.log('[ERROR] failed querying:' + endpoint) raise Exception("Endpoint not responding") return deduplicate_results(results) @@ -113,7 +120,7 @@ def page_query(query, endpoint): Returns: List of parts """ - utils.log('Current endpoint: ' + endpoint) + logger_.log('Current endpoint: ' + endpoint) bar = [ "[ ]", @@ -136,7 +143,7 @@ def page_query(query, endpoint): ] bar_counter = 0 - if endpoint != utils.get_config()['sparql_endpoint']: + if endpoint != config_manager.load_config()['sparql_endpoint']: query = re.sub(r'''FROM.*\n''', '', query) query_prefix = ''' @@ -187,8 +194,8 @@ def send_query(query, endpoint): """ params = {'query': query} - if endpoint == utils.get_config()['sparql_endpoint']: - params['default-graph-uri'] = '' # utils.get_config()['synbiohub_public_graph'] + if endpoint == config_manager.load_config()['sparql_endpoint']: + params['default-graph-uri'] = '' # config_manager.load_config()['synbiohub_public_graph'] url = endpoint + urllib.parse.urlencode(params) headers = {'Accept': 'application/json'} @@ -196,12 +203,12 @@ def send_query(query, endpoint): try: r = requests.get(url, headers=headers) except Exception as e: - utils.log("[ERROR] exception when connecting: " + str(e)) + logger_.log("[ERROR] exception when connecting: " + str(e)) raise Exception("Local SynBioHub isn't responding") if r.status_code != 200: - utils.log('[ERROR] Got status code when querying: ' + str(r.status_code)) - utils.log(r.text) + logger_.log('[ERROR] Got status code when querying: ' + str(r.status_code)) + logger_.log(r.text) raise Exception(url + ' is not responding') results = [] diff --git a/flask/search.py b/flask/search.py index 4d316db..d772b8f 100644 --- a/flask/search.py +++ b/flask/search.py @@ -1,8 +1,16 @@ import re from typing import List, Dict, Tuple, Optional -import utils import query import sequencesearch +from wor_client import WORClient +from elasticsearchManager import ElasticsearchManager +from configManager import ConfigManager +from logger import Logger + +config_manager = ConfigManager() +elasticsearch_manager = ElasticsearchManager(config_manager) +logger_ = Logger() +wor_client_ = WORClient() # Compile regex patterns FROM_COUNT_PATTERN = re.compile(r'SELECT \(count\(distinct \?subject\) as \?tempcount\)\s*(.*)\s*WHERE {') @@ -53,9 +61,9 @@ def search_es(es_query: str) -> Dict: 'size': 10000 } try: - return utils.get_es().search(index=utils.get_config()['elasticsearch_index_name'], body=body) + return elasticsearch_manager.get_es().search(index=config_manager.load_config()['elasticsearch_index_name'], body=body) except Exception as e: - utils.log(f"ES search failed: {e}") + logger_.log(f"ES search failed: {e}") raise def empty_search_es(offset: int, limit: int, allowed_graphs: List[str]) -> Dict: @@ -79,9 +87,9 @@ def empty_search_es(offset: int, limit: int, allowed_graphs: List[str]) -> Dict: 'size': limit } try: - return utils.get_es().search(index=utils.get_config()['elasticsearch_index_name'], body=body) + return elasticsearch_manager.get_es().search(index=config_manager.load_config()['elasticsearch_index_name'], body=body) except Exception as e: - utils.log(f"ES search failed: {e}") + logger_.log(f"ES search failed: {e}") raise def search_es_allowed_subjects(es_query: str, allowed_subjects: List[str]) -> Dict: @@ -123,9 +131,9 @@ def search_es_allowed_subjects(es_query: str, allowed_subjects: List[str]) -> Di 'size': 10000 } try: - return utils.get_es().search(index=utils.get_config()['elasticsearch_index_name'], body=body) + return elasticsearch_manager.get_es().search(index=config_manager.load_config()['elasticsearch_index_name'], body=body) except Exception as e: - utils.log(f"ES search failed: {e}") + logger_.log(f"ES search failed: {e}") raise def search_es_allowed_subjects_empty_string(allowed_subjects: List[str]) -> Dict: @@ -153,9 +161,9 @@ def search_es_allowed_subjects_empty_string(allowed_subjects: List[str]) -> Dict 'size': 10000 } try: - return utils.get_es().search(index=utils.get_config()['elasticsearch_index_name'], body=body) + return elasticsearch_manager.get_es().search(index=config_manager.load_config()['elasticsearch_index_name'], body=body) except Exception as e: - utils.log(f"ES search failed: {e}") + logger_.log(f"ES search failed: {e}") raise def parse_sparql_query(sparql_query, is_count_query): # Find FROM clause @@ -208,8 +216,8 @@ def extract_allowed_graphs(_from: str, default_graph_uri: str) -> List[str]: Extracts the allowed graphs to search over. """ allowed_graphs = [default_graph_uri] if not _from else [graph.strip()[1:-1] for graph in _from.split('FROM') if graph.strip()] - if utils.get_config()['distributed_search']: - allowed_graphs.extend(instance['instanceUrl'] + '/public' for instance in utils.get_wor()) + if config_manager.load_config()['distributed_search']: + allowed_graphs.extend(instance['instanceUrl'] + '/public' for instance in wor_client_.get_wor_instance()) return allowed_graphs def is_count_query(sparql_query: str) -> bool: @@ -510,7 +518,7 @@ def search(sparql_query, uri2rank, clusters, default_graph_uri): else search_es_allowed_subjects(es_query, allowed_subjects)) bindings = create_bindings(es_allowed_subject, clusters, allowed_graphs, allowed_subjects) - utils.log('Advanced string search complete.') + logger_.log('Advanced string search complete.') bindings.sort(key=lambda b: b['order_by'], reverse=True) return create_response(len(bindings), bindings[offset:offset + limit], is_count_query(sparql_query)) diff --git a/flask/sequencesearch.py b/flask/sequencesearch.py index 6cb899b..89d6964 100644 --- a/flask/sequencesearch.py +++ b/flask/sequencesearch.py @@ -1,12 +1,14 @@ from xml.etree import ElementTree import subprocess -import utils import query import cluster import search from sys import platform import base64 import tempfile +from logger import Logger + +logger_ = Logger() # handling selection of VSEARCH binary @@ -15,7 +17,7 @@ elif platform == "darwin": vsearch_binary_filename = 'usearch/vsearch_macos' else: - utils.log("Sorry, your OS is not supported for sequence based-search.") + logger_.log("Sorry, your OS is not supported for sequence based-search.") # add valid flags to here globalFlags = {'maxaccepts': '50', 'id': '0.8', 'iddef': '2', 'maxrejects': '0', 'maxseqlength': '5000', 'minseqlength': '20'} @@ -51,7 +53,7 @@ def run_vsearch_global(fileName): popen = subprocess.Popen(args, stdout=subprocess.PIPE) popen.wait() output = popen.stdout.read() - utils.log(output) + logger_.log(output) def run_vsearch_exact(fileName): """ @@ -66,7 +68,7 @@ def run_vsearch_exact(fileName): popen = subprocess.Popen(args, stdout=subprocess.PIPE) popen.wait() output = popen.stdout.read() - utils.log(output) + logger_.log(output) def append_flags_to_args(argsList, flags): @@ -124,7 +126,7 @@ def sequence_search(userFlags, fileName): Returns: set -- search results by URI """ - utils.log('Starting sequence search') + logger_.log('Starting sequence search') if "search_exact" in userFlags: add_exact_flags(userFlags) @@ -132,7 +134,7 @@ def sequence_search(userFlags, fileName): else: add_global_flags(userFlags) run_vsearch_global(fileName) - utils.log('Sequence search complete') + logger_.log('Sequence search complete') return cluster.uclust2uris(fileName[:-4] + '.uc') diff --git a/flask/utils.py b/flask/utils.py deleted file mode 100644 index db0efc8..0000000 --- a/flask/utils.py +++ /dev/null @@ -1,249 +0,0 @@ -from elasticsearch import Elasticsearch -import json -import pickle -import requests -import datetime -import os - -config = None - -def get_config(): - """ - Gets a copy of the config file - Returns: Config file in JSON - - """ - global config - - if not config: - with open('config.json') as f: - config = json.load(f) - - return config - - -def set_config(new_config): - """ - Overwrites the existing config with a new config file - Args: - new_config: New config file with the updated information - - Returns: - - """ - global config - - config = get_config() - - for key in new_config: - if key in config: - config[key] = new_config[key] - - with open('config.json', 'w') as f: - json.dump(config, f) - - -def save_time(attribute): - """ - Saves the current time to an attribute in the config - Args: - attribute: Config attribute to save current time to - - Returns: - - """ - config = get_config() - - now = datetime.datetime.now() - - config[attribute] = str(now) - - set_config(config) - -def save_update_end_time(): - """ - Save end time of indexing - Returns: - - """ - save_time("last_update_end") - - -def save_update_start_time(): - """ - Save start time of indexing - Returns: - - """ - save_time("last_update_start") - - -def get_wor(): - """ - Gets all instances of SynBioHub from the Web of Registries - Returns: - - """ - try: - instances = requests.get('https://wor.synbiohub.org/instances/') - except Exception: - log('[ERROR] Web of Registries had a problem!') - return [] - - if instances.status_code != 200: - log('[ERROR] Web of Registries had a problem!') - return [] - - return instances.json() - - -def get_es(): - """ - Gets an instance of elasticsearch - Returns: The instance of elasticsearch - - """ - es = Elasticsearch([get_config()['elasticsearch_endpoint']], verify_certs=True) - - if not es.ping(): - raise ValueError('Elasticsearch connection failed') - - return es - - -def log(message): - """ - Writes a message to the log - Args: - message: Message to write - - Returns: - - """ - log_message = '[' + str(datetime.datetime.now()) + '] ' + str(message) + '\n' - print(log_message) - - with open('log.txt', 'a+') as f: - f.write(log_message) - -def log_indexing(message): - log_message = '[' + str(datetime.datetime.now()) + '] ' + str(message) + '\n' - print(log_message) - - with open('indexing_log.txt', 'a+') as f: - f.write(log_message) - -def get_log(): - """ - Gets a copy of the log - Returns: Stream from the read() method - - """ - try: - with open('log.txt', 'r') as f: - return f.read() - except: - return "" - -def get_indexing_log(): - try: - with open('indexing_log.txt', 'r') as f: - return f.read() - except: - return "" - -clusters = None -clusters_filename = 'dumps/clusters_dump' - -uri2rank = None -uri2rank_filename = 'dumps/uri2rank_dump' - - -def save_clusters(new_clusters): - """ - Save clusters of parts - Args: - new_clusters: Clusters to be saved - - Returns: - - """ - global clusters - clusters = new_clusters - serialize(clusters, clusters_filename) - - -def get_clusters(): - """ - Gets all clusters of parts - Returns: - - """ - global clusters - - if clusters is None: - clusters = deserialize(clusters_filename) - - return clusters - - -def save_uri2rank(new_uri2rank): - """ - Saves the pagerank of all URI's - Args: - new_uri2rank: - - Returns: - - """ - global uri2rank - uri2rank = new_uri2rank - serialize(uri2rank, uri2rank_filename) - - -def get_uri2rank(): - """ - Gets all pageranks of URI's - Returns: - - """ - global uri2rank - - if uri2rank is None: - uri2rank = deserialize(uri2rank_filename) - - return uri2rank - - -def serialize(data, filename): - """ - Serializes some data to a file - Args: - data: Data to be written - filename: File to be written to - - Returns: - - """ - f = open(filename, 'wb') - pickle.dump(data, f) - f.close() - - -def deserialize(filename): - """ - Deserializes data from a serialized file - Args: - filename: Serialized file - - Returns: Deserialized data from file - - """ - if not os.path.exists(filename): - return {} - - f = open(filename, 'rb') - data = pickle.load(f) - f.close() - return data - \ No newline at end of file diff --git a/flask/wor_client.py b/flask/wor_client.py new file mode 100644 index 0000000..3f0f607 --- /dev/null +++ b/flask/wor_client.py @@ -0,0 +1,19 @@ +import requests +from logger import Logger + +logger_ = Logger() +class WORClient: + @staticmethod + def get_wor_instances(): + """ + Gets all instances of SynBioHub from the Web of Registries + Returns: + + """ + try: + response = requests.get('https://wor.synbiohub.org/instances/') + response.raise_for_status() + return response.json() + except requests.RequestException: + logger_.log('[ERROR] Web of Registries had a problem!') + return [] \ No newline at end of file