diff --git a/bkbit/cli.py b/bkbit/cli.py index 7c71ec7..7204b35 100644 --- a/bkbit/cli.py +++ b/bkbit/cli.py @@ -1,5 +1,6 @@ import click from bkbit.model_converters.sheets_converter import schema2model +from bkbit.model_converters.yaml2sheet_converter import yaml2cvs @click.group() def cli(): @@ -8,6 +9,7 @@ def cli(): # Add commands to the CLI group cli.add_command(schema2model) +cli.add_command(yaml2cvs) if __name__ == '__main__': cli() diff --git a/bkbit/model_converters/sheets_converter.py b/bkbit/model_converters/sheets_converter.py index 5bf8b30..845e976 100644 --- a/bkbit/model_converters/sheets_converter.py +++ b/bkbit/model_converters/sheets_converter.py @@ -8,44 +8,108 @@ from linkml_runtime.linkml_model.meta import SchemaDefinition, SlotDefinition from linkml_runtime.utils.schema_as_dict import schema_as_dict from schemasheets import schemamaker as sm +import pandas as pd -def fix_tsv_files(tsv_files): +SIMPLE_TYPES_NOSTR = ["integer", "float", "boolean", "date", "datetime"] + + +def fix_tsv_files(tsv_files, inlined=False, ref_by_ind=True): """ - Check if the tsv files have changed - :param tsv_files: - :return: + Fixing all the tsv files, modyfying the range column, and adding any_of, exactly_one_of, and inlined columns. + :param tsv_files: list of tsv files + :param inlined: if True, the inlined column will be added + "param ref_by_ind: if True (and if inlined is True) the range will be modified (adding string) to be able to reference by index + :return: list of fixed tsv files """ + tsv_file_fixed_list = [] dir_fixed = Path(tsv_files[0]).parent / "fixed_sheets" dir_fixed.mkdir(exist_ok=True) for tsv_file in list(tsv_files): - modified_rows = [] + # TODO: check if the file indeed has 3 lines of headers + tsv_file_fixed = dir_fixed / Path(tsv_file).name + tsv_file_fixed_list.append(str(tsv_file_fixed)) + with open(tsv_file, 'r', newline='') as file: # cleaned of any ^M characters content = file.read().replace('\r', '') # convert the cleaned content back to a file-like object data = StringIO(content) - # read the file-like object as a csv file - tsv_reader = csv.reader(data, delimiter='\t') - for ii, row in enumerate(tsv_reader): - if ii == 1: - columns_to_change = [] - for jj, col in enumerate(row): - if "mapping" in col.lower(): - columns_to_change.append(jj) - if ii > 1: - for jj in columns_to_change: - if jj > len(row)-1: breakpoint() - if row[jj]: - row[jj] = row[jj].replace(" ", "%20") - modified_rows.append(row) - tsv_file_fixed = dir_fixed / Path(tsv_file).name - tsv_file_fixed_list.append(str(tsv_file_fixed)) - with open(tsv_file_fixed, 'w', newline='') as file: - tsv_writer = csv.writer(file, delimiter='\t') - tsv_writer.writerows(modified_rows) + # read the file-like object to a pandas dataframe + df = pd.read_csv(data, header=[0, 1, 2], delimiter='\t') + + columns_to_change_new = [] + for ind in df.columns: + if "mapping" in ind[1].lower(): + columns_to_change_new.append(ind) + for col in columns_to_change_new: + df[col] = df[col].str.replace(" ", "%20") + + # finding the range column, and other columns that are relevant for the following changes + range_ind, range_col = None, None + multival_col, exactlyone_col, valset_col = None, None, None + for ind, col in enumerate(df.columns): + if "range" in col[1].lower().strip(): + range_ind = ind + range_col = col + elif "multivalued" in col[0].lower().strip(): + multival_col = col + elif "exactlyoneof" in col[0].lower().strip(): + exactlyone_col = col + elif "permissible" in col[0].lower().strip(): + valset_col = col + + if range_ind is not None: + any_of_col = (f"{range_col[0]}: any_of", "any_of", "inner_key: range") + exactly_one_of_col = (f"{range_col[0]}: exactly_one_of", "exactly_one_of", "inner_key: range") + if inlined: + inline_col = ("inlined", "inlined", "") + else: # ignoring if inlined is set to False + inline_col = ("inlined", "ignore", "") + df.insert(range_ind + 1, any_of_col, None) + df.insert(range_ind + 2, exactly_one_of_col, None) + df.insert(range_ind + 3, inline_col, None) + + def fix_range(row): + """ Fixing the range column, moving some ranges to any_of or exactly_one_of columns + It also depends on the values of ref_by_ind and inlined. + """ + if pd.isna(row[range_col]): + return row + # do not add string to range if range already has string or all the elements are simple types + elif "string" in row[range_col] or all([el in SIMPLE_TYPES_NOSTR for el in row[range_col].split("|")]): + pass + # checking if the range is not value set (TODO: in the future might need modification) + elif valset_col is not None and row[valset_col]: + pass + elif inlined: # setting inlined to True for range that have complex types + row[inline_col] = True + if ref_by_ind: # adding string to the range to be able to reference by index + row[range_col] = row[range_col] + "|string" + + # checking if range has multiple values, and if it should be treated as any_of or exactly_one_of + if "|" in row[range_col]: + if (row[multival_col] is True) and (exactlyone_col is not None) and (row[exactlyone_col] is True): + row[exactly_one_of_col] = row[range_col] + else: + row[any_of_col] = row[range_col] + row[range_col] = None + return row + + df = df.apply(fix_range, axis=1) + + df.to_csv(tsv_file_fixed, sep='\t', index=False) + + # fixing the headers that are saved by pandas + with open(tsv_file_fixed, 'r') as file: + lines = file.readlines() + lines[2] = "\t".join(["" if el.startswith("Unnamed") else el for el in lines[2].split("\t")]) + "\n" + lines[1] = "\t".join(["" if el.startswith("Unnamed") else el for el in lines[1].split("\t")]) + "\n" + with open(tsv_file_fixed, 'w') as file: + file.writelines(lines) + return tsv_file_fixed_list @@ -149,15 +213,23 @@ def download_gsheets(gsheet_id, sheets, gsheet_download_dir): default=True, show_default=True, help="Standard Linkml auto-repair schema") +@click.option("--inlined/--no-inlined", + default=True, + show_default=True, + help="Adding inlined=True to all slots that have complex type as a range") +@click.option("--ref_by_ind/--no-ref_by_ind", + default=True, + show_default=True, + help="Adding string to the range to be able to reference by index (relevant only if inlined=True)") @click.option("--fix_bican_model/--no-fix_bican_model", default=True, show_default=True, help="Automated repair specifically for the BICAN YAML model") @click.argument('spreadsheets', nargs=-1) def schema2model(spreadsheets, output, fix_tsv, fix_tsv_save, repair, fix_bican_model, template, - gsheet, gsheet_download_dir): + gsheet, gsheet_download_dir, inlined, ref_by_ind): """ - This converter allows to create a yaml linkml model from set of spreadsheets. + This converter allows creating a yaml linkml model from a set of spreadsheets. It can either use tsv files or Google Sheet as an input. The default behavior is to run the converter starting with TSV files, @@ -167,7 +239,7 @@ def schema2model(spreadsheets, output, fix_tsv, fix_tsv_save, repair, fix_bican_ from Google Sheets. The argument must be a YAML file that has `gsheet_id` and a list of `sheets` with `gid` (a unique identifier for each individual sheet) and `name` (optionally) - that will be used as a name of the downloaded TSV file (if not available `gid` wil be used). + that will be used as a name of the downloaded TSV file (if not available `gid` will be used). """ schema_maker = sm.SchemaMaker() @@ -192,8 +264,7 @@ def schema2model(spreadsheets, output, fix_tsv, fix_tsv_save, repair, fix_bican_ template = Path(spreadsheets[0]).parent / "classes_base.yaml" if fix_tsv: - spreadsheets = fix_tsv_files(list(spreadsheets)) - + spreadsheets = fix_tsv_files(list(spreadsheets), inlined=inlined, ref_by_ind=ref_by_ind) schema = schema_maker.create_schema(list(spreadsheets)) if repair: schema = schema_maker.repair_schema(schema) diff --git a/bkbit/model_converters/yaml2sheet_converter.py b/bkbit/model_converters/yaml2sheet_converter.py new file mode 100644 index 0000000..e2db805 --- /dev/null +++ b/bkbit/model_converters/yaml2sheet_converter.py @@ -0,0 +1,168 @@ +import csv, yaml +import click +from pathlib import Path + +from linkml_runtime.utils.schemaview import SchemaView + +SIMPLE_TYPES_NOSTR = ["integer", "float", "boolean", "date", "datetime"] + +CLASS_HEADERS = [ + # header, linkml_header, linkml_header_minor + ("Class Name", "> class", ">"), + ("Inheritance: is_a", "is_a", ""), + ("Inheritance: mixin", "mixins", 'internal_separator: "|"'), + ("Subsets", "in_subset", 'internal_separator: "|"'), + ("Description", "description", ""), + ("NIMP Terminology NHash", "exact_mappings: {curie_prefix: NIMP}", "") +] + +SLOTS_HEADERS = [ + # header, linkml_header, linkml_header_minor + ("Proposed BICAN Field", "> alias", ">"), + ("LinkML Slot or Attribute Name", "attribute", ""), + ("BICAN UUID", "slot_uri: {curie_prefix: bican}", ""), + ("SubGroup/LinkML Class Name", "class", ""), + ("Definition", "description", ""), + ("Required (True/False)", "is_required", ""), + ("Multivalued (True/False)", "multivalued", ""), + ("Data Type/Value Set", "range", ""), + ("Data Examples", "ignore", ""), + ("Min Value", "ignore", ""), + ("Max Value", "ignore", ""), + ("Unit", "ignore", ""), + ("Statistical Type", "ignore", ""), + ("Subsets", "in_subset", ""), + ("Notes", "ignore", ""), + ("NIMP Category", "ignore", ""), + ("NIMP Terminology NHash", "exact_mappings: {curie_prefix: NIMP}", ""), + ("Local Variable Name (e.g. NIMP)", "local_names", "inner_key: local_name_value"), + ("Local Variable Source (e.g. NIMP)", "local_names", "inner_key: local_name_source") +] + +ENUM_HEADERS = [ + # header, linkml_header, linkml_header_minor + ("Value Set Name", "> enum", ">"), + ("Permissible Value", "permissible_value", ""), + ("Description", "description", ""), + ("NIMP Terminology NHash", "meaning: {curie_prefix: NIMP}", "") +] + +PREFIXES_HEADERS = [ + # header, linkml_header + ("Schema Name", "> schema"), + ("Title", "title"), + ("Description", "description"), + ("ID", "id"), + ("Default Prefix", "default_prefix"), + ("Imports", "imports"), + ("Prefix", "prefix"), + ("Prefix URI", "prefix_reference"), +] + + +def create_classes_slots_cvs(classes: dict, output_dir: Path): + # creating headers (including linkml header lines) + classes_cvs = [[], [], []] + for header, linkml_header, linkml_header_minor in CLASS_HEADERS: + classes_cvs[0].append(header) + classes_cvs[1].append(linkml_header) + classes_cvs[2].append(linkml_header_minor) + + slots_cvs = [[], [], []] + for header, linkml_header, linkml_header_minor in SLOTS_HEADERS: + slots_cvs[0].append(header) + slots_cvs[1].append(linkml_header) + slots_cvs[2].append(linkml_header_minor) + + sl_header = \ + ["Proposed BICAN Field", "LinkML Slot or Attribute Name", "BICAN UUID", "SubGroup/LinkML Class Name", "Definition", "Required (True/False)", "Multivalued (True/False)", "Data Type/Value Set", "Data Examples", "Min Value", "Max Value", "Unit", "Statistical Type", "Subsets", "Notes", "NIMP Category", "NIMP Terminology NHash", "Local Variable Name (e.g. NIMP)", "Local Variable Source (e.g. NIMP)"] + sl_linkml_header = \ + ["> alias", "attribute", "slot_uri: {curie_prefix: bican}", "class", "description", "is_required", "multivalued", "range", "ignore", "ignore", "ignore", "ignore", "ignore", "in_subset", "ignore", "ignore", "exact_mappings: {curie_prefix: NIMP}", "local_names", "local_names"] + sl_linkml_header_minor = [">", "", "", "", "", "", "", "", "", "", "", "", "", 'internal_separator: "|"', "", "", "", "inner_key: local_name_value", "inner_key: local_name_source"] + slots_cvs = [sl_header, sl_linkml_header, sl_linkml_header_minor] + for class_name, class_d in classes.items(): + if class_name in "NamedThing": + continue + cl_l = [class_name, class_d.is_a, "|".join(class_d.mixins), "|".join(class_d.in_subset), class_d.description, ""] + classes_cvs.append(cl_l) + class_attr_dict = class_d.attributes + class_attr_dict.update(class_d.slot_usage) + for slot_name, slot_obj in class_attr_dict.items(): + if slot_obj.range: + range = slot_obj.range + elif slot_obj.any_of: + # removing an additional type + range = "|".join(_removing_str_type(slot_obj.any_of)) + else: + range = "string" # default range + sl_l = ["", slot_name, slot_obj.slot_uri, class_name, slot_obj.description, slot_obj.required, slot_obj.multivalued, range, "", "", "", "", "", "", "", "", "", "", ""] + slots_cvs.append(sl_l) + _write_cvs(Path(output_dir / "classes.csv"), classes_cvs) + _write_cvs(Path(output_dir / "slots.csv"), slots_cvs) + + +def create_enums_cvs(enums: dict, output_dir: Path): + enums_cvs = [[], [], []] + for header, linkml_header, linkml_header_minor in ENUM_HEADERS: + enums_cvs[0].append(header) + enums_cvs[1].append(linkml_header) + enums_cvs[2].append(linkml_header_minor) + if enums: + for enum_name, enum in enums.items(): + for value_nm, value_obj in enum.permissible_values.items(): + enums_cvs.append([enum_name, value_nm, value_obj.title, value_obj.meaning]) + _write_cvs(Path(output_dir / "enums.csv"), enums_cvs) + +def create_prefix_headers_csv(schema: SchemaView, output_dir: Path): + prefixes_cvs = [[], []] + for header, linkml_header in PREFIXES_HEADERS: + prefixes_cvs[0].append(header) + prefixes_cvs[1].append(linkml_header) + + prefixes_cvs.append([schema.name, schema.title, schema.description, schema.id, schema.default_prefix, "", "", ""]) + for imp in schema.imports: + if imp != "linkml:types": # this is imported by default + prefixes_cvs.append(["", "", "", "", "", imp, "", ""]) + for prefix in schema.prefixes.values(): + prefixes_cvs.append(["", "", "", "", "", "", prefix.prefix_prefix, prefix.prefix_reference]) + _write_cvs(Path(output_dir / "prefixes.csv"), prefixes_cvs) + +def _removing_str_type(any_of_list: list): + """If the range list contains only more complex types, it removes string from the list. + String is used in these cases as an additional type to be able to refer by id, + no need to include it in the google sheet + """ + range_list = [el.range for el in any_of_list] + simple_types = ["integer", "float", "boolean", "date", "datetime"] + if "string" in range_list and not(any([el in simple_types for el in range_list])): + range_list.remove("string") + return range_list + + +def _write_cvs(filename, data): + with open(filename, 'w', newline='') as file: + csv_writer = csv.writer(file) + csv_writer.writerows(data) + +@click.command() +@click.option('-o', '--output_dir', + type=click.Path(), + default="output_dir_cvs", + help="Path to the output directory, where csv files will be stored.") +@click.argument('yaml_model', type=click.Path(exists=True)) +def yaml2cvs(yaml_model, output_dir): + """ + This converter create csv files from the yaml model. + The cvs files can be used to create Google Spreadsheet (automation TODO) + Takes a path to yaml model as an input. + """ + output_dir = Path(output_dir) + output_dir.mkdir(exist_ok=True) + + schema = SchemaView(yaml_model) + create_prefix_headers_csv(schema.schema, output_dir) + create_enums_cvs(schema.all_enums(), output_dir) + create_classes_slots_cvs(schema.all_classes(), output_dir) + +if __name__ == "__main__": + yaml2cvs() \ No newline at end of file