diff --git a/.env-example b/.env-example index 8efdf5e..108d468 100755 --- a/.env-example +++ b/.env-example @@ -13,3 +13,4 @@ RQ_QUEUES="default" D3M_DB_SUBMITTER=submitter_name D3M_DB_TOKEN=token +SAVE_TO_D3M=false diff --git a/docker-compose.yml b/docker-compose.yml index 6b4ff38..0ff1bda 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,7 +7,7 @@ services: volumes: - type: bind source: '${DATA_DIR}/${REDIS_DATA_DIR}' - target: '/data' + target: /data networks: - default @@ -15,6 +15,14 @@ services: image: 'd3m-experimenter:latest' env_file: - ./.env + volumes: + - type: bind + source: '${DATASETS_DIR}' + target: /datasets + read_only: true + - type: bind + source: '${DATA_DIR}' + target: /data command: 'rq worker --url redis://${REDIS_HOST} ${RQ_QUEUES}' networks: - default diff --git a/experimenter/__init__.py b/experimenter/__init__.py old mode 100755 new mode 100644 diff --git a/experimenter/cli.py b/experimenter/cli.py index f2e513c..e87ff94 100644 --- a/experimenter/cli.py +++ b/experimenter/cli.py @@ -1,7 +1,9 @@ import argparse import typing -from experimenter import exceptions, queue + +from experimenter.modify_generator import ModifyGenerator +from experimenter import config, exceptions, queue def main(argv: typing.Sequence) -> None: @@ -21,6 +23,12 @@ def configure_parser(parser: argparse.ArgumentParser) -> None: ) configure_queue_parser(queue_parser) + generator_parser = subparsers.add_parser( + 'generator', + description='generates new pipelines and queues them to run on available datasets', + ) + configure_generator_parser(generator_parser) + def handler(arguments: argparse.Namespace, parser: argparse.ArgumentParser) -> None: experimenter_command = arguments.experimenter_command @@ -28,6 +36,8 @@ def handler(arguments: argparse.Namespace, parser: argparse.ArgumentParser) -> N if experimenter_command == 'queue': queue_handler(arguments, subparser) + elif experimenter_command == 'generator': + generator_handler(arguments, subparser) else: raise exceptions.InvalidStateError('Unknown experimenter command: {}'.format(experimenter_command)) @@ -40,6 +50,12 @@ def configure_queue_parser(parser: argparse.ArgumentParser) -> None: empty_parser = subparsers.add_parser('empty', help='remove all jobs from a queue') empty_parser.add_argument('-q', '--queue-name', help='the name of the queue to empty') + empty_parser.add_argument('-f', '--failed', help='remove the failed queue', action='store_true') + + #save a failed traceback parser + save_failed_parser = subparsers.add_parser('save-failed', help='save failed job error output') + save_failed_parser.add_argument('-q', '--queue-name', help='the name of the queue to empty') + save_failed_parser.add_argument('-j', '--job-num', type=int, default=0, help='the failed job number') def queue_handler(arguments: argparse.Namespace, parser: argparse.ArgumentParser) -> None: @@ -48,6 +64,127 @@ def queue_handler(arguments: argparse.Namespace, parser: argparse.ArgumentParser if queue_command == 'status': queue.status() elif queue_command == 'empty': - queue.empty(arguments.queue_name) + queue.empty(arguments.queue_name, arguments.failed) + elif queue_command == 'save-failed': + queue.save_failed_job(arguments.queue_name, arguments.job_num) else: raise exceptions.InvalidStateError('Unknown queue command: {}'.format(queue_command)) + + +def configure_generator_parser(parser: argparse.ArgumentParser) -> None: + parser.add_argument('-j', '--max-jobs', type=int, default=None, action='store', help='maximum number of jobs generated') + parser.add_argument('-t', '--job-timeout', type=int, default=None, action='store', help='maximum runtime for a single job in seconds') + + subparsers = parser.add_subparsers(dest='generator_command') + subparsers.required = True # type: ignore + + search_subparser = subparsers.add_parser( + 'search', + help='searches for new pipelines not found in the metalearning database', + ) + configure_search_parser(search_subparser) + + modify_subparser = subparsers.add_parser( + 'modify', + help='modifies existing pipelines in the metalearning database', + ) + configure_modify_parser(modify_subparser) + + update_subparser = subparsers.add_parser( + 'update', + help='updates existing pipeline runs in the metalearning database to use the current versions of datasets and primitives', + ) + configure_update_parser(update_subparser) + + +def generator_handler(arguments: argparse.Namespace, parser: argparse.ArgumentParser) -> None: + generator_command = arguments.generator_command + subparser = parser._subparsers._group_actions[0].choices[generator_command] # type: ignore + + if generator_command == 'search': + search_handler(arguments, subparser) + elif generator_command == 'modify': + modify_handler(arguments, subparser) + elif generator_command == 'update': + update_handler(arguments, subparser) + else: + raise exceptions.InvalidStateError('Unknown queue command: {}'.format(generator_command)) + + +def configure_search_parser(parser: argparse.ArgumentParser) -> None: + pass + + +def search_handler(arguments: argparse.Namespace, parser: argparse.ArgumentParser) -> None: + raise exceptions.NotImplementedError() + + +def configure_modify_parser(parser: argparse.ArgumentParser) -> None: + #create the subparsers for the different types of modifications + + #seed swapper functionality + subparsers = parser.add_subparsers(dest='modify_type') + subparsers.required = True + swap_seed_subparser = subparsers.add_parser( + 'random-seed', + description='Uses database data to search pipelines and run functional pipelines on different random seeds', + ) + #subparser arguments + swap_seed_subparser.add_argument( + '--pipeline_id', + help='The pipeline id to search for in the query, if none, searches all pipelines', + default=None, + type=str) + swap_seed_subparser.add_argument( + '--submitter', + help='The pipeline submitter to add to the query', + default=None, + type=str) + swap_seed_subparser.add_argument( + '--seed-limit', + help='The amount of random seeds that each ran pipeline will have at the end of the test', + default=2, + type=int) + swap_seed_subparser.add_argument( + '--test', + help='run the test instead of random pipeline generation', + action='store_true') + + #Primitive swapper functionality + primitive_swap_subparser = subparsers.add_parser( + 'primitive-swap', + description='Searches database for pipeline runs containing a primitive and swaps out primitive for a different given primitive') + #subparser arguments + primitive_swap_subparser.add_argument( + '--primitive_id', + help='The id of the primitive to swap out', + default=None, + type=str) + primitive_swap_subparser.add_argument( + '--limit_indeces', + help='Details for primitive swapping', + default=None) + primitive_swap_subparser.add_argument( + '--swap_primitive_id', + help='The id of the primitve to swap in', + default=None, + type=str) + + +def modify_handler(arguments: argparse.Namespace, parser: argparse.ArgumentParser) -> None: + modify_type = arguments.modify_type + modify_generator = ModifyGenerator(modify_type = modify_type, + max_jobs = arguments.max_jobs, + seed_limit = arguments.seed_limit, + submitter = arguments.submitter, + pipeline_id = arguments.pipeline_id) + #now run the enqueuer part + queue.enqueue_jobs(jobs=modify_generator, job_timeout=arguments.job_timeout) + + +def configure_update_parser(parser: argparse.ArgumentParser) -> None: + pass + + +def update_handler(arguments: argparse.Namespace, parser: argparse.ArgumentParser) -> None: + raise exceptions.NotImplementedError() diff --git a/experimenter/config.py b/experimenter/config.py index 461cb4b..04d6332 100644 --- a/experimenter/config.py +++ b/experimenter/config.py @@ -10,6 +10,7 @@ _ERROR_MESSAGE = 'environment variable not set: {}' +#parse the .env file datasets_dir: str = os.environ.get('DATASETS_DIR', None) def validate_datasets_dir(): if datasets_dir is None: @@ -27,7 +28,7 @@ def validate_data_dir(): def validate_redis_host(): if redis_host is None: raise exceptions.ConfigError(_ERROR_MESSAGE.format('REDIS_HOST')) - + d3m_db_submitter: str = os.environ.get('D3M_DB_SUBMITTER', None) def validate_d3m_db_submitter(): @@ -39,3 +40,46 @@ def validate_d3m_db_submitter(): def validate_d3m_db_token(): if d3m_db_token is None: raise exceptions.ConfigError(_ERROR_MESSAGE.format('D3M_DB_TOKEN')) + + +save_to_d3m: bool = os.environ.get('SAVE_TO_D3M', None) == 'true' +def validate_save(): + if save_to_d3m is None: + raise exceptions.ConfigError(_ERROR_MESSAGE.format('SAVE_TO_D3M')) + + +query_host: str = os.environ.get('QUERY_HOST', 'https://metalearning.datadrivendiscovery.org/es') +def validate_query_host(): + if query_host is None: + raise exceptions.ConfigError(_ERROR_MESSAGE.format('QUERY_HOST')) + + +query_timeout: int = int(os.environ.get('QUERY_TIMEOUT', '500')) +def validate_query_timeout(): + if query_timeout is None: + raise exceptions.ConfigError(_ERROR_MESSAGE.format('QUERY_TIMEOUT')) + + +#get the save paths for the experimenter from the point of view of the docker container +output_run_path: str = os.path.abspath(os.path.join('/data', 'pipeline_runs')) +if (not os.path.exists(output_run_path)): + #create the directory + os.makedirs(output_run_path, exist_ok=True) + + +pipelines_path: str = os.path.abspath(os.path.join('/data', 'pipelines')) +if (not os.path.exists(pipelines_path)): + #create the directory + os.makedirs(pipelines_path, exist_ok=True) + + +data_prep_pipelines_path: str = os.path.abspath(os.path.join('/data', 'data-preparation-pipelines')) +if (not os.path.exists(data_prep_pipelines_path)): + #create the directory + os.makedirs(data_prep_pipelines_path, exist_ok=True) + + +scoring_pipelines_path: str = os.path.abspath(os.path.join('/data', 'scoring-pipelines')) +if (not os.path.exists(scoring_pipelines_path)): + #create the directory + os.makedirs(scoring_pipelines_path, exist_ok=True) diff --git a/experimenter/databases/d3m_mtl.py b/experimenter/databases/d3m_mtl.py index f8dde8d..b63a333 100644 --- a/experimenter/databases/d3m_mtl.py +++ b/experimenter/databases/d3m_mtl.py @@ -1,5 +1,6 @@ import logging import json +import yaml import requests from d3m.primitive_interfaces.base import PrimitiveBase @@ -23,7 +24,7 @@ def __init__(self) -> None: self._post_url = D3M_MTL_DB_POST_URL # This env var allows code calling this class to be run during # unit tests without actually saving to the production DB. - self.should_save = config.SAVE_TO_D3M + self.should_save = config.save_to_d3m # A reference to a low-level elasticsearch client. This can be # used to query the D3M DB, or this classe's `search` method # can be used, and is preferred, since its API is more straightforward. @@ -31,9 +32,9 @@ def __init__(self) -> None: # certain things though. self.es = Elasticsearch(hosts=[D3M_MTL_DB_GET_URL], timeout=30) # Our submitter name. - self._submitter = config.D3M_DB_SUBMITTER + self._submitter = config.d3m_db_submitter # The secret verifying us as the submitter we say we are. - self._x_token = config.D3M_DB_TOKEN + self._x_token = config.d3m_db_token if self._is_identifying_as_submitter(): logger.info( f"Documents will be saved under submitter name: '{self._submitter}'" @@ -105,7 +106,15 @@ def does_pipeline_exist_in_db(self, pipeline: Pipeline) -> bool: .count() ) return num_pipeline_matches > 0 - + + def save_pipeline_runs_from_path(self, pipeline_run_path: str) -> requests.Response: + responses = list() + with open(pipeline_run_path, 'r') as pipeline_data: + pipeline_runs = yaml.safe_load_all(pipeline_data) + for pipeline_run in pipeline_runs: + responses.append(self.save_pipeline_run(pipeline_run).content) + return responses + def save_pipeline_run(self, pipeline_run: dict) -> requests.Response: return self._save(pipeline_run, "pipeline-run") @@ -156,7 +165,7 @@ def _create_no_save_response(self) -> requests.Response: response.status_code = 200 response._content = ( b'{ "result" : "No request was made to the D3M DB API to save a record, ' - b'since the SAVE_TO_D3M environment variable is not set." }' + b'since the SAVE_TO_D3M environment variable is not set to true." }' ) return response diff --git a/experimenter/execute_pipeline.py b/experimenter/execute_pipeline_old.py similarity index 100% rename from experimenter/execute_pipeline.py rename to experimenter/execute_pipeline_old.py diff --git a/experimenter/modify_generator.py b/experimenter/modify_generator.py new file mode 100644 index 0000000..211ac2f --- /dev/null +++ b/experimenter/modify_generator.py @@ -0,0 +1,171 @@ +from random import randint +import json +import os +import yaml + +from d3m.contrib.pipelines import K_FOLD_TABULAR_SPLIT_PIPELINE_PATH +from d3m.contrib.pipelines import SCORING_PIPELINE_PATH as scoring_file + +from experimenter import queue, utils, query +from experimenter.utils import download_from_database +from experimenter.runtime import evaluate + + +class ModifyGenerator: + """ Generator to be used for creating modified pipelines based on existing + pipelines in the database + """ + def __init__(self, modify_type: str='random-seed', + max_jobs: int=None, seed_limit = None, + submitter = None, pipeline_id = None): + #intialize commonly used variables + self.modifier_type = modify_type + self.max_jobs = max_jobs + self.seed_limit = seed_limit + self.submitter = submitter + self.pipeline_id = pipeline_id + self.num_complete = 0 + #run the query on initializing to define the query results + self._set_query_results() + + + def __iter__(self): + return self + + + def __next__(self): + #iterate through query results + job = next(self.generator) + if (self.max_jobs): + if (self.num_complete > self.max_jobs): + raise StopIteration + return job + + + def _set_query_results(self, query_results=None): + self.query_results = query_results + if query_results is None: + self.query_results = self._query() + self.generator = self._get_generator() + + + def _get_generator(self): + """ + Main generator to be used of ModifyGenerator class + Can only handle cases where there is a data preparation + pipeline in the pipeline run + """ + for query_result in self.query_results: + #iterate through modifier results + for pipeline, problem_path, dataset_doc, seed, data, score in self._modify(query_result): + #save the pipeline to path and return pipeline path + data_prep_pipeline, data_random_seed, data_params = data + scoring_pipeline, scoring_random_seed, scoring_params = score + pipeline_path = download_from_database(pipeline, type_to_download='pipelines') + #TODO - catch when there is no data preparation pipeline and pass it further to evaluate + #catch error returning none for file paths or preparation pipeline + if (problem_path is None or dataset_doc is None or data_prep_pipeline is None): + continue + #check if query returned a path or an id + if (os.path.exists(data_prep_pipeline) is False): + data_prep_pipeline = download_from_database(data_prep_pipeline, type_to_download='data-preparation-pipelines') + if (os.path.exists(scoring_pipeline) is False): + scoring_pipeline = download_from_database(scoring_pipeline, type_to_download='scoring-pipelines') + job = queue.make_job(evaluate, + pipeline=pipeline_path, + problem=problem_path, + input=dataset_doc, + random_seed=seed, + data_pipeline=data_prep_pipeline, + data_random_seed=data_random_seed, + data_params=data_params, + scoring_pipeline=scoring_pipeline, + scoring_random_seed=scoring_random_seed, + scoring_params=scoring_params) + self.num_complete += 1 + yield job + + + def _query(self): + """method for querying database according to pipeline modification type + """ + if (self.modifier_type=='random-seed'): + return query_on_seeds(self.pipeline_id, self.seed_limit, self.submitter) + if (self.modifier_type=='swap-primitive'): + return query_on_primitive(self.primitive_id, self.limit_indeces) + else: + raise ValueError("This type of modification is not yet an option") + + + def _modify(self, query_args): + """Handler for different types of pipeline modification tasks + """ + if self.modifier_type=='random-seed': + return self._modify_random_seed(self.seed_limit, query_args) + if self.modifier_type=='swap-primitive': + return self._modify_swap_primitive(self.swap_primitive_id, query_args) + else: + raise ValueError("This type of modification is not yet an option") + + + def _check_for_duplicates(self, pipeline_to_check, problem_ref_to_check): + """Pseudo function/method for duplicate checking + - This function is not complete and will be used for future generation type jobs + """ + #create the pipeline to check for duplicates from the path + pipeline_object = d3m.metadata.pipeline.Pipeline.from_json(pipeline_to_check) + #query through the database for equal pipelines + similar_pipeline_runs_in_database = query.generate_similar_pipeline_runs() + for pipeline in similar_pipeline_runs_in_database: + if (pipeline_object.equals(pipeline)): + return True + return False + + + def _modify_random_seed(self, seed_limit, query_args): + """Generates new seeds for a given pipeline, problem, and dataset + It is dependent on the seed limit for how many it will generate + """ + used_seeds = query_args['tested_seeds'] + num_run = len(used_seeds) + #run until the right number of seeds have been run + while (num_run < self.seed_limit): + new_seed = randint(1,100000) + if (new_seed in used_seeds): + continue + num_run += 1 + used_seeds.add(new_seed) + #yield the necessary job requirements + yield (query_args['pipeline'], query_args['problem_path'], query_args['dataset_doc_path'], new_seed, + (query_args['data_prep_pipeline'], query_args['data_prep_seed'], query_args['data_params']), + (query_args['scoring_pipeline'], query_args['scoring_seed'], query_args['scoring_params'])) + + + def _modify_swap_primitive(self, swap_pipeline, query_args): + raise ValueError("No functionality for swapping primitives yet") + + +def query_on_seeds(pipeline_id: str=None, limit: int=None, submitter: str='byu'): + """ + Helper function for generating jobs to be used in the random-seed swapping + generator + """ + arguments = {'id': pipeline_id, '_submitter': submitter} + pipeline_search = query.match_query(index='pipelines', arguments=arguments) + for pipeline in pipeline_search.scan(): + pipeline_run_query = query.scan_pipeline_runs(pipeline.id, submitter) + pipeline = pipeline.to_dict() + for run_tuple, pipeline_run_params in pipeline_run_query.items(): + #get the unqiue params from the params list + unique_run_params = query.combine_unique_params(pipeline_run_params) + #unpack values from tuple + query_arg_dict = query.unpack_run_tuple_args(run_tuple) + for params in unique_run_params: + query_args = query_arg_dict.copy() + query_args['data_params'] = params['data_params'] + query_args['scoring_params'] = params['scoring_params'] + query_args['tested_seeds'] = params['random_seeds'] + query_args['pipeline'] = pipeline + if limit and len(query_args['tested_seeds']) > limit: + continue + yield query_args diff --git a/experimenter/query.py b/experimenter/query.py new file mode 100644 index 0000000..c0e477d --- /dev/null +++ b/experimenter/query.py @@ -0,0 +1,146 @@ +from elasticsearch import Elasticsearch +from elasticsearch_dsl import Search, Q +from experimenter.utils import get_problem_path, get_dataset_doc_path, get_pipelines_from_d3m +from d3m.runtime import _get_data_and_scoring_params_from_pipeline_run as _data_score_params +from experimenter import config, exceptions + +CONNECTION = Elasticsearch(hosts=[config.query_host], timeout=config.query_timeout) + + +def match_query(index:str, arguments: dict = None, connection = CONNECTION): + #remove None arguments from the dictionary + filtered_args = {k:v for k,v in arguments.items() if v is not None} + #initialize the search + index_search = Search(using=connection, index=index) + for field, argument in filtered_args.items(): + arg_dict = dict() + arg_dict[field] = argument + index_search = index_search.query('match', **arg_dict) + return index_search + + +def unpack_run_tuple_args(run_tuple: tuple): + #unpack values from tuple + problem_id, dataset_id, data_prep_data, scoring_data = run_tuple + scoring_id, scoring_random_seed = scoring_data + data_prep_id, data_prep_seed = data_prep_data + #get preparation and scoring pipelines + data_prep_pipeline = get_pipeline(data_prep_id, types='Data') + scoring_pipeline = get_pipeline(scoring_id, types='Scoring') + return {'problem_path': get_problem_path(problem_id), + 'dataset_doc_path':get_dataset_doc_path(dataset_id), + 'data_prep_pipeline': data_prep_pipeline, 'data_prep_seed': data_prep_seed, + 'scoring_pipeline': scoring_pipeline, 'scoring_seed': scoring_random_seed,} + + +def get_pipeline(pipeline_id: str=None, types: str='Data'): + """ + gets a pipeline from the database, if it is not already + in the d3m module + """ + if (pipeline_id is None): + return None + pipeline = get_pipelines_from_d3m(pipeline_id, types=types) + #get from database if not in d3m module + if (pipeline is None): + arguments = {'id': data_prep_id} + search = match_query(index='pipelines', arguments=arguments) + pipeline = next(search.scan()) + pipeline = pipeline.to_dict() + return pipeline + + +def check_for_data_prep(pipeline_run=None): + """Handles cases with an explicit data preparation pipeline + in the pipeline run, will return none when pipeline run has + no preparation pipeline + """ + try: + data_prep = pipeline_run.run.data_preparation + data_prep_seed = data_prep.random_seed + data_prep_id = data_prep.pipeline.id + data_prep = data_prep.to_dict() + data_params = _data_score_params(data_prep.get('steps', [])) + except KeyError: + #no data preparation pipeline in pipeline run, return none + data_prep, data_prep_seed, data_prep_id, data_params = None + + return (data_prep_id, data_prep_seed), data_params + + +def get_scoring_pipeline(pipeline_run): + """ + returns the scoring pipeline from the pipeline run + """ + scoring = pipeline_run.run.scoring + scoring_seed = scoring.random_seed + scoring_id = scoring.pipeline.id + scoring = scoring.to_dict() + scoring_params = _data_score_params(scoring.get('steps', [])) + return (scoring_id, scoring_seed), scoring_params + + +def get_list_duplicates(params_list, match_item): + """ + takes in a list of params and an item to match, + returns a list of matching indeces in the list + """ + start_loc = -1 + locs = [] + while True: + try: + loc = params_list.index(match_item,start_loc+1) + except ValueError: + break + else: + locs.append(loc) + start_loc = loc + return locs + + +def combine_unique_params(param_dict_list: dict = None): + """ + reduces the param_dict_list into a list of unique paramers with + combined random seeds + """ + random_seeds_list = param_dict_list['random_seeds'] + params_list = param_dict_list['params'] + final_list = list() + location_dict = dict() #initalize dictionary for storing matchine indices + #loop through the parameter values + for it, param in enumerate(params_list): + #get matching pairs of each value + location_dict[it] = get_list_duplicates(params_list, param) + skip = set() #initialize set of locations to skip + for loc, values in location_dict.items(): + #only need to match once to match in other locations (add to skip) + if loc in skip: + continue + random_seeds = set() + for value in values: + #add matched params random seeds to same set + random_seeds.add(random_seeds_list[value]) + skip.add(value) + data_params, scoring_params = params_list[loc] + #combine matching params with aggregated set of random seeds + final_list.append({'data_params': data_params, 'scoring_params': scoring_params, 'random_seeds': random_seeds}) + return final_list + + +def scan_pipeline_runs(pipeline_id, submitter=None): + query_arguments = {'pipeline__id': pipeline_id, 'run__phase': 'PRODUCE', + 'status__state': 'SUCCESS', '_submitter': submitter} + pipeline_run_search = match_query(index='pipeline_runs', arguments=query_arguments) + query_results = dict() + for pipeline_run in pipeline_run_search.scan(): + data_prep, data_params = check_for_data_prep(pipeline_run=pipeline_run) + scoring, scoring_params = get_scoring_pipeline(pipeline_run) + for dataset in pipeline_run.datasets: + run_tuple = (pipeline_run.problem.id, dataset.id, data_prep, scoring) + query_results[run_tuple] = query_results.get(run_tuple, dict()) + query_results[run_tuple]['random_seeds'] = query_results[run_tuple].get('random_seed', list()) + query_results[run_tuple]['params'] = query_results[run_tuple].get('params', list()) + query_results[run_tuple]['random_seeds'].append(pipeline_run.random_seed) + query_results[run_tuple]['params'].append((data_params, scoring_params)) + return query_results + diff --git a/experimenter/queue.py b/experimenter/queue.py index 3ebdb95..6dadbbf 100644 --- a/experimenter/queue.py +++ b/experimenter/queue.py @@ -2,13 +2,14 @@ import redis import rq +import os from experimenter import config, exceptions _DEFAULT_QUEUE = 'default' _EMPTIED_MESSAGE = 'queue {} emptied' - +_SAVE_FAILED_MESSAGE = 'Failed job output saved to {}' def get_connection(): config.validate_redis_host() @@ -25,24 +26,79 @@ def is_running(): def get_queue(queue_name: str = _DEFAULT_QUEUE) -> rq.Queue: return rq.Queue(queue_name, connection=get_connection()) + + +def get_worker_message(workers: list, queue): + num_workers = len(workers) + message = 'number of workers on queue {}: {}'.format(queue.name, num_workers) + for it, worker in enumerate(workers): + success = worker.successful_job_count + fail = worker.failed_job_count + message = message+'\n\t\t\t worker: {}'.format(it) + message = message+'\n\t\t\t\t number of successful jobs: {}'.format(success) + message = message+'\n\t\t\t\t number of failed jobs: {}'.format(fail) + return message + + +def get_failed_job(queue_name:str = _DEFAULT_QUEUE, job_num:int = 0): + #pass name and connection + reg = rq.registry.FailedJobRegistry(name = queue_name, connection = get_connection()) + job_ids = reg.get_job_ids() + if (len(job_ids)<=0): + return "None", reg + job_id = job_ids[0] + return job_id, reg + + +def save_failed_job(queue_name:str = _DEFAULT_QUEUE, job_num:int = 0): + if (queue_name is None): + queue_name = _DEFAULT_QUEUE + job_id, failed_queue = get_failed_job() + job = rq.job.Job.fetch(job_id, connection=get_connection()) + with open (os.path.join('/data',"failed_job_{}.txt".format(job_num)), 'w') as job_file: + job_file.write(job.exc_info) + #remove the job + failed_queue.remove(job_id, delete_job=True) + print(_SAVE_FAILED_MESSAGE.format(os.path.join('/data', + "failed_job_{}.txt".format(job_num)))) + + +def get_queue_message(queues: list): + queues_message = 'getting queues, jobs, and workers' + for queue in queues: + queues_message = queues_message + '\n\t number of jobs on queue {}: {}'.format(queue.name, len(queue)) + _, reg = get_failed_job(queue.name) + num_fails = len(reg) + queues_message = queues_message + '\n\t number of failed jobs on queue {}: {}'.format(queue.name, num_fails) + workers = rq.Worker.all(queue=queue) + queues_message = queues_message + '\n\t\t' + str(get_worker_message(workers=workers, queue=queue)) + + return queues_message def status() -> None: conn = get_connection() - print('available queues: {}'.format(rq.Queue.all(conn))) - + queues = rq.Queue.all(conn) + queues_message = get_queue_message(queues) + print('available queues: {}'.format(queues)) + print(queues_message) + def enqueue(job, queue_name: str = _DEFAULT_QUEUE, job_timeout: int = None) -> rq.job.Job: q = get_queue(queue_name) return q.enqueue(**job, job_timeout=job_timeout) -def empty(queue_name: str = None) -> None: +def empty(queue_name: str = None, empty_failed_queue: bool = False) -> None: if queue_name is None: queue_name = _DEFAULT_QUEUE - queue = get_queue(queue_name) - queue.empty() - print(_EMPTIED_MESSAGE.format(queue_name)) + #empty the failed queue or just the normal one + if (empty_failed_queue is True): + empty_failed(queue_name=queue_name) + else: + queue = get_queue(queue_name) + queue.empty() + print(_EMPTIED_MESSAGE.format(queue_name)) def _check_redis_connection() -> typing.Optional[Exception]: @@ -52,7 +108,19 @@ def _check_redis_connection() -> typing.Optional[Exception]: except redis.exceptions.RedisError as e: error = e return error - + + +def empty_failed(queue_name: str = None) -> None: + if queue_name is None: + queue_name = _DEFAULT_QUEUE + _, failed_queue = get_failed_job(queue_name=queue_name) + #loop through the jobs and remove them + conn = get_connection() + job_ids = failed_queue.get_job_ids() + for job_id in job_ids: + result = failed_queue.remove(job_id, delete_job=True) + print(_EMPTIED_MESSAGE.format(queue_name+str(' failed'))) + def make_job(f: typing.Callable, *args: typing.Any, **kwargs: typing.Any) -> typing.Dict[str, typing.Any]: return {'f':f, 'args': args, 'kwargs': kwargs} diff --git a/experimenter/run_pipeline.py b/experimenter/run_pipeline.py index 6d722b0..c6752f0 100644 --- a/experimenter/run_pipeline.py +++ b/experimenter/run_pipeline.py @@ -57,7 +57,7 @@ def run(self, pipeline: Pipeline, metric_names: list = None) -> list: simimlar to that of `_evaluate` in the Runtime code. The aforementioned function does not allow for returning the data, so it did not fit in the workflow. - + :param pipeline: the pipeline object to be run OR the path to the pipeline file to be used :param metric_names: if provided, the pipeline will be scored against this custom diff --git a/experimenter/runtime.py b/experimenter/runtime.py new file mode 100644 index 0000000..648bf29 --- /dev/null +++ b/experimenter/runtime.py @@ -0,0 +1,96 @@ +import json +import yaml +import os + +from typing import Any, List, Tuple +from experimenter import config, utils, exceptions + +from d3m import cli as d3m_cli +from d3m.contrib.pipelines import K_FOLD_TABULAR_SPLIT_PIPELINE_PATH as k_fold_split_path +from experimenter.databases.d3m_mtl import D3MMtLDB + + +def evaluate(pipeline: str=None, + problem: str=None, + input: str=None, + random_seed: int=0, + data_pipeline: str=k_fold_split_path, + data_random_seed: int=0, + data_params=None, + scoring_pipeline: str=None, + scoring_params=None, + scoring_random_seed: int=0): + """ + Evaluate pipeline on problem using d3m's runtime cli. + Wrapper function to execute d3m's runtime cli 'evaluate' command. + Arguments mirror the same arguments using the cli. + Only handles cases with a data preparation pipeline in the + pipeline run. + + Parameters + ---------- + pipeline : path_like str + path to pipeline doc or pipeline ID + problem : path_like str + path to problem doc + input : path_like str + path to input full data + random_seed : int + random seed to used for + pipeline run + data_pipeline_path: str + path to data prepation pipeline + data_random_seed: int + random_seed to be used in data preparation + data_params: + parameters for data preparation + scoring_params: + parameters for scoring pipeline + scoring_random_seed: int + random seed for scoring + scoring_pipeline: str + path to scoring pipeline + Return: + ------- + None + + Raises: + ------- + ValueError + when parameter value is + invalid + """ + if (not os.path.isfile(pipeline)): + raise exceptions.InvalidArgumentValueError('\'{}\' param not a file path'.format('pipeline')) + + if (not os.path.isfile(problem)): + raise exceptions.InvalidArgumentValueError('\'{}\' param not a file path'.format('problem')) + + if (not os.path.isfile(input)): + raise exceptions.InvalidArgumentValueError('\'{}\' param not a file path'.format('input')) + + if (not os.path.isfile(data_pipeline)): + raise exceptions.InvalidArgumentValueError('\'{}\' param not a file path'.format('input')) + + if (not os.path.isfile(scoring_pipeline)): + raise exceptions.InvalidArgumentValueError('\'{}\' param not a file path'.format('input')) + + output_run = utils.get_pipeline_run_output_path(pipeline, input, random_seed) + #get the runtime arguments for the d3m cli + args = ['d3m', 'runtime','--random-seed', str(random_seed), 'evaluate', + '--pipeline', pipeline, '--problem', problem, '--input', input, + '--output-run', output_run, '--data-pipeline', data_pipeline, + '--data-random-seed', str(data_random_seed), + '--scoring-pipeline', scoring_pipeline, + '--scoring-random-seed', str(scoring_random_seed)] + #add the data parameters to the cli arguments + if (data_params is not None): + for name, value in data_params.items(): + args.extend(('--data-param', name, value)) + #add the scoring parameters to the cli arguments + if (scoring_params is not None): + for name, value in scoring_params.items(): + args.extend(('--scoring-param', name, value)) + d3m_cli.main(args) + #save if proper system variable SAVE_TO_D3M is set to true + responses = D3MMtLDB().save_pipeline_runs_from_path(output_run) diff --git a/experimenter/utils.py b/experimenter/utils.py index baca8af..bec55ac 100644 --- a/experimenter/utils.py +++ b/experimenter/utils.py @@ -9,14 +9,98 @@ from d3m.metadata import problem as problem_module from d3m.utils import get_datasets_and_problems +from d3m.contrib import pipelines -from experimenter import exceptions +from experimenter import exceptions, config -DEFAULT_DATASET_DIR = "/datasets/training_datasets/LL0" +DEFAULT_DATASET_DIR = "/datasets" datasets, problems = None, None +def get_data_prep_pipelines(): + """ + Get data preparation pipelines that are already in the d3m module + """ + data_prep_dict = dict() + data_prep_id_list = list() + #save the relevant paths and ids for data preparation + data_prep_id_list.append(pipelines.NO_SPLIT_TABULAR_SPLIT_PIPELINE_ID) + data_prep_dict[pipelines.NO_SPLIT_TABULAR_SPLIT_PIPELINE_ID] = pipelines.NO_SPLIT_TABULAR_SPLIT_PIPELINE_PATH + data_prep_id_list.append(pipelines.FIXED_SPLIT_TABULAR_SPLIT_PIPELINE_ID) + data_prep_dict[pipelines.FIXED_SPLIT_TABULAR_SPLIT_PIPELINE_ID] = pipelines.FIXED_SPLIT_TABULAR_SPLIT_PIPELINE_PATH + data_prep_id_list.append(pipelines.TRAIN_TEST_TABULAR_SPLIT_PIPELINE_ID) + data_prep_dict[pipelines.TRAIN_TEST_TABULAR_SPLIT_PIPELINE_ID] = pipelines.TRAIN_TEST_TABULAR_SPLIT_PIPELINE_PATH + data_prep_id_list.append(pipelines.K_FOLD_TABULAR_SPLIT_PIPELINE_ID) + data_prep_dict[pipelines.K_FOLD_TABULAR_SPLIT_PIPELINE_ID] = pipelines.K_FOLD_TABULAR_SPLIT_PIPELINE_PATH + return data_prep_dict, data_prep_id_list + + +def get_scoring_pipelines(): + """ + Get the scoring pipelines that are already in the d3m module + """ + scoring_dict = dict() + scoring_id_list = list() + #save relevant paths and ids for scoring pipelines + scoring_id_list.append(pipelines.SCORING_PIPELINE_ID) + scoring_dict[pipelines.SCORING_PIPELINE_ID] = pipelines.SCORING_PIPELINE_PATH + return scoring_dict, scoring_id_list + + +def get_pipeline_run_output_path(pipeline_path: str, dataset_path: str, random_seed: int): + """ + get the output path of the pipeline run + """ + output_run_path = [] + #get the digests from the dataset and problem paths + with open(pipeline_path, 'r') as data: + pipeline = json.load(data) + output_run_path.append(pipeline['digest']) + with open(dataset_path, 'r') as data: + dataset = json.load(data) + output_run_path.append(dataset['about']['digest']) + output_run_path.append(str(random_seed)) + output_run_path = os.path.abspath(os.path.join(config.output_run_path, '_'.join(output_run_path)+'.yaml')) + return output_run_path + + +def get_pipelines_from_d3m(pipeline_id: str = None, types='Data'): + """Checks if data preparation pipeline is in d3m module, + if not, return None + """ + if (types=='Data'): + dict_ids, id_list = get_data_prep_pipelines() + elif (types=='Scoring'): + dict_ids, id_list = get_scoring_pipelines() + if (pipeline_id in id_list): + return dict_ids[pipeline_id] + return None + + +def save_to_not_exist_file(filename:str = 'dataset_dne.txt', save_id:str = None): + #create the directory + os.makedirs(os.path.join('/data','DoesNotExist'),exist_ok=True) + #get the tag to write or append + if (os.path.exists(os.path.join('/data','DoesNotExist',filename))): + tag = 'a' # append to file + else: + tag = 'w' # write and create + #append the non existing value to the file + with open(os.path.join('/data','DoesNotExist',filename),tag) as to_save: + to_save.write(save_id+'\n') + + +def download_from_database(data, type_to_download: str = 'pipeline'): + i_d = data['id'] + save_path = os.path.join('/data', type_to_download, i_d+str('.json')) + #save the file to the directory + with open(save_path, 'w') as to_save: + json.dump(data, to_save, indent=4) + #return the location + return save_path + + def get_dataset_doc_path(dataset_id: str, datasets_dir: str=None) -> str: """ A quick helper function to gather a dataset doc path @@ -29,7 +113,12 @@ def get_dataset_doc_path(dataset_id: str, datasets_dir: str=None) -> str: if datasets_dir is None: datasets_dir = os.getenv('DATASETS', DEFAULT_DATASET_DIR) datasets, problems = get_datasets_and_problems(datasets_dir) - return datasets[dataset_id] + try: + return datasets[dataset_id] + except: + #save to dataset id does not exist file + save_to_not_exist_file('dataset_dne.txt', dataset_id) + return None def get_dataset_doc(dataset_id: str, datasets_dir: str=None) -> dict: @@ -57,7 +146,12 @@ def get_problem_path(problem_id: str, datasets_dir: str=None) -> str: if datasets_dir is None: datasets_dir = os.getenv('DATASETS', DEFAULT_DATASET_DIR) datasets, problems = get_datasets_and_problems(datasets_dir) - return problems[problem_id] + try: + return problems[problem_id] + except: + #save to problem id does not exist file + save_to_not_exist_file('problem_dne.txt', problem_id) + return None def get_problem(problem_path: str, *, parse: bool = True) -> dict: diff --git a/setup.py b/setup.py index d57d227..fe96b85 100644 --- a/setup.py +++ b/setup.py @@ -14,5 +14,7 @@ 'redis>=3.5.0<3.6.0', 'rq>=1.7.0<1.8.0', 'rq-dashboard>=0.6.0<0.7.0', + 'elasticsearch==7.11.0', + 'elasticsearch_dsl==7.3.0' ], ) diff --git a/tests/test_modifier.py b/tests/test_modifier.py new file mode 100644 index 0000000..4206c0e --- /dev/null +++ b/tests/test_modifier.py @@ -0,0 +1,73 @@ +import unittest +import json +from experimenter import modify_generator, queue, exceptions, utils +from experimenter.databases.d3m_mtl import D3MMtLDB +from d3m.contrib.pipelines import K_FOLD_TABULAR_SPLIT_PIPELINE_PATH +from d3m.contrib.pipelines import SCORING_PIPELINE_PATH as scoring_file + + +class GeneratorModifierTestCase(unittest.TestCase): + + + def test_random_seed_modifier_job_count(self): + #initialize the modifier with random-seed and a given max jobs + num_test = 5 + seed_limit = 25 + modifier = modify_generator.ModifyGenerator(modify_type='random-seed', + seed_limit=seed_limit, + max_jobs=num_test) + modifier._set_query_results(self.get_seed_test_args()) + #begin the test if number of generated seed jobs is correct + self.assertEqual(len(list(modifier._modify_random_seed(seed_limit, next(modifier.query_results)))), seed_limit-2) + #reinitialize to test if total job count is right + modifier = modify_generator.ModifyGenerator(modify_type = 'random-seed', + max_jobs = num_test, + seed_limit = seed_limit) + modifier.query_results = self.get_seed_test_args() + self.assertEqual(modifier.max_jobs, num_test) + self.assertEqual(len(list(modifier)), modifier.max_jobs) + + + def test_query_random_seeds_set_size(self): + args = {'seed_limit':25, 'submitter':'byu', 'pipeline_id':None} + seed_limit = 25 + query_results = modify_generator.query_on_seeds(args['pipeline_id'], args['seed_limit'], args['submitter']) + #test 10 query results + for i in range(10): + query = next(query_results) + self.assertTrue(len(query['tested_seeds']) < seed_limit) + + + def test_d3m_interface_init(self): + init_fail = False + try: + d3m_db = D3MMtLDB() + except: + init_fail = True + self.assertFalse(init_fail, "D3M Interface Failed") + + + def get_seed_test_args(self): + """ returns args for testing modify generator random-seed + functionality purposes. It uses a dataset and pipeline + that is saved in the d3m-experimenter + """ + with open('experimenter/pipelines/bagging_classification.json', 'r') as pipeline_file: + pipeline = json.load(pipeline_file) + dataset_path = utils.get_dataset_doc_path('185_baseball_MIN_METADATA_dataset') + problem_path = utils.get_problem_path('185_baseball_MIN_METADATA_problem') + data_prep_seed = 0 + data_prep_seed = 0 + data_prep_pipeline = K_FOLD_TABULAR_SPLIT_PIPELINE_PATH + scoring_pipeline = scoring_file + scoring_seed = 0 + used_seeds = {2,15} + yield {'pipeline': pipeline, 'problem_path': problem_path, 'dataset_doc_path': dataset_path, + 'tested_seeds': used_seeds, 'data_prep_pipeline': data_prep_pipeline, + 'data_prep_seed': data_prep_seed, 'data_params': None, + 'scoring_pipeline': scoring_pipeline, 'scoring_seed': scoring_seed, + 'scoring_params': None} + + +if __name__ == '__main__': + unittest.main()