Skip to content

Commit

Permalink
Merge pull request #19 from pnnl/api-develop
Browse files Browse the repository at this point in the history
Add methods to assist with API calls and new database_docs folder
  • Loading branch information
lymereJ authored Dec 1, 2023
2 parents ec3a396 + b759d3c commit 7b02115
Show file tree
Hide file tree
Showing 8 changed files with 1,755 additions and 1 deletion.
68 changes: 67 additions & 1 deletion building_energy_standards_data/query/fetch/database_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from building_energy_standards_data.query.util import (
_convert_list_tuple_to_list_dict,
_convert_tuple_to_dict,
_convert_list_single_tuple_to_list_str,
is_table_exist,
is_field_in_table,
)


Expand All @@ -27,6 +29,56 @@ def fetch_table(conn: sqlite3.Connection, table_name: str):
return []


def fetch_table_with_max_numbers_of_records(
conn, table_name: str, max: int | None = None
):
"""
Fetch data from a specific table limited to a max number of records
:param conn:
:param table_name: String data table
:param max: max number of records to be returned
:return: list of data or empty list
"""
table = fetch_table(conn, table_name)
if max and len(table) > max:
table = table[0:max]
return table


def fetch_columns_from_table(
conn: sqlite3.Connection, table_name: str, field_names: list | str
):
"""
Fetch specific columns from a specific table
:param conn:
:param table_name: String data table
:param field_names: list of columns to fetch
:return: list of data or empty list
"""
# Make sure the table exist
if is_field_in_table(conn, table_name, field_names):
if isinstance(field_names, list):
field_names = field_names.join(", ")
fetch_query = f"""SELECT {field_names} FROM {table_name}"""
cur = conn.execute(fetch_query)
data_header = list(map(lambda x: x[0], cur.description))

return _convert_list_tuple_to_list_dict(cur.fetchall(), data_header)
return []


def fetch_column_from_table(conn: sqlite3.Connection, table_name: str, field_name: str):
"""
Fetch specific column from a specific table
:param conn:
:param table_name: table name
:param field_name: column to fetch
:return: list of data in column
"""
column_list = fetch_columns_from_table(conn, table_name, field_name)
return [entry[field_name] for entry in column_list]


def fetch_a_record_from_table_by_id(
conn: sqlite3.Connection, table_name: str, index: int
):
Expand All @@ -48,7 +100,7 @@ def fetch_a_record_from_table_by_id(


def fetch_records_from_table_by_key_values(
conn: sqlite3.Connection, table_name: str, key_value_dict: dict
conn: sqlite3.Connection, table_name: str, key_value_dict: dict | None = None
):
"""
Fetch a data record matched by key value pairs in the dict from a specific table
Expand All @@ -59,6 +111,8 @@ def fetch_records_from_table_by_key_values(
"""
# Make sure the table exist
if is_table_exist(conn, table_name):
if not key_value_dict:
return fetch_table(conn, table_name)
condition = " AND ".join(
[f"{key} = '{key_value_dict[key]}'" for key in key_value_dict.keys()]
)
Expand All @@ -67,3 +121,15 @@ def fetch_records_from_table_by_key_values(
data_header = list(map(lambda x: x[0], cur.description))
return _convert_list_tuple_to_list_dict(cur.fetchall(), data_header)
return []


def fetch_table_names_containing_keyword(conn: sqlite3.Connection, keyword: str):
"""
Fetch a data record matched by key value pairs in the dict from a specific table
:param conn:
:param keyword: keyword to search tables for
:return: list
"""
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
cur = conn.execute(query, ("%" + keyword + "%",))
return _convert_list_single_tuple_to_list_str(cur.fetchall())
31 changes: 31 additions & 0 deletions building_energy_standards_data/query/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def _convert_tuple_to_dict(data: tuple, data_head_list: List[str]):
return dict(zip(data_head_list, data))


def _convert_list_single_tuple_to_list_str(data: List[tuple]):
return [row[0] for row in data]


def is_index_in_table(
conn: sqlite3.Connection, table_name: str | None, key: str | None, index: str | None
):
Expand Down Expand Up @@ -57,6 +61,33 @@ def is_table_exist(conn: sqlite3.Connection, table_name):
return True if list_of_tables else False


def is_field_in_table(
conn: sqlite3.Connection, table_name, fields_to_check: list | str
):
"""
Utility function to ensure the table name provided is correct and exist in the openstudio_standards data tables
:param conn:
:param table_name:
:return:
"""
cur = conn.cursor()

if not is_table_exist(conn, table_name):
return False

list_of_tables = cur.execute(f"""PRAGMA table_info({table_name})""").fetchall()

if isinstance(fields_to_check, str):
fields_to_check = [fields_to_check]

if list_of_tables:
existing_columns = [row[1] for row in list_of_tables]
fields_exist = all(field in existing_columns for field in fields_to_check)
else:
fields_exist = False
return fields_exist


def match_dict_data_by_key(primary_data: dict, secondary_data: dict):
"""
This function matches two data dictionaries (primary_data, secondary_data) and return only the matched portion of the dictionary
Expand Down
Loading

0 comments on commit 7b02115

Please sign in to comment.