diff --git a/conf/tem/config.yaml b/conf/tem/config.yaml deleted file mode 100644 index 70dab6a..0000000 --- a/conf/tem/config.yaml +++ /dev/null @@ -1,47 +0,0 @@ -#database: -# dbms: 'postgresql' -# host: '35.235.95.37' # Replace with your DB host -# port: 5439 # Replace with your DB port -# user: 'jize' # Replace with your DB username -# password: 'jz@uscdash23' # Replace with your DB password - -database_number: 2 - -database1: - nickname: 'local db' # will shows in the selectbox - dbms: 'postgresql' - host: 'db' # Replace with your DB host - port: 5432 # Replace with your DB port - user: 'admin' # Replace with your DB username - password: 'admin' - -database2: - nickname: 'gcp' - dbms: 'postgresql' - host: '35.235.95.37' # Replace with your DB host - port: 5439 # Replace with your DB port - user: 'jize' # Replace with your DB username - password: 'jz@uscdash23' # Replace with your DB password - -mapping: - columns: - user_id: 'user_id' - timestamp: 'timestamp' - value: 'value' - tables: - user_table: - name: 'geomts_users' - columns: - user_id: String(50) - device: String(50) - location: String(50) - time_series: - - heart_rates - - calories - - mets - - distances - - steps - - sleep - - weight - geo: - - locations diff --git a/script/import_hub_main.py b/script/import_hub_main.py index e2371d1..07cbf92 100644 --- a/script/import_hub_main.py +++ b/script/import_hub_main.py @@ -125,61 +125,72 @@ def import_page(): # option to populate subject table or feature time series tables is_subjects_populated = st.checkbox("Populate subject table?") - if uploaded_file and is_subjects_populated: - # set subject table name - subject_tbl_name = st.text_input("Enter subject table name", value="subjects") - st.success("File uploaded!") + if uploaded_file: + st.success(f"File `{uploaded_file.name}` uploaded!") df = pd.read_csv(uploaded_file) st.write("Columns in your CSV:") st.write(df.columns) - if st.button("Populate Database"): - populate_subject_table(df, selected_db, config_path, user_tbl_name=subject_tbl_name) - st.success("Database populated!") - - - if uploaded_file and not is_subjects_populated: - st.success("File uploaded!") - df = pd.read_csv(uploaded_file) - df = generate_mets_by_calories(df) - st.write("Columns in your CSV:") - st.write(df.columns) - - # Map columns - st.subheader("Mapping") - - # Default selections based on column name similarity - default_timestamp = find_closest_name(df.columns, 'time timestamp date start_time end_time') - default_user_id = find_closest_name(df.columns, 'user id email patient') - - timestamp_col = st.selectbox("**Select Timestamp Column**", df.columns, index=df.columns.get_loc(default_timestamp)) - user_id_col = st.selectbox("**Select User ID Column**", df.columns, index=df.columns.get_loc(default_user_id)) - - # Foldable block for optional mappings - mappings = { - config['mapping']['columns']['timestamp']: timestamp_col, - config["mapping"]['columns']['user_id']: user_id_col, - } - table_mappings = {} - with st.expander("**Map Features to W4H Tables**", expanded=True): - st.write("Map your CSV columns to corresponding W4H tables.") - - choices = ["None"] + list(df.columns) - for target_table_name in config['mapping']['tables']['time_series'] + config['mapping']['tables']['geo']: - target_table_label = ' '.join([label.capitalize() for label in target_table_name.replace('_', ' ').split()]) - st.subheader(target_table_label) - def_choice = find_closest_name(choices, target_table_label) - mapped_col = st.selectbox("Select Corresponding Column", choices, - key=target_table_name, index=choices.index(def_choice)) - table_mappings[target_table_name] = mapped_col if mapped_col != "None" else None - - # Once mappings are set, allow the user to populate the database - if st.button("Populate Database"): - mappings = {**mappings, **table_mappings} - - populate_db(df, selected_db, mappings, config_path) - st.success("Database populated!") - + # create a mapping between the csv columns and the database tables + st.subheader("Mapping") + choices = ["None"] + list(df.columns) + mappings = {} + # if subject table is populated, populate subject table + if is_subjects_populated: + user_tbl_name = config['mapping']['tables']['user_table']['name'] + with st.expander(f"**Map subject attributes to the W4H `{user_tbl_name}` table attributes**", expanded=True): + st.write(f"Map your CSV columns to corresponding W4H `{user_tbl_name}` table attributes.") + for target_attribute in config['mapping']['tables']['user_table']['attributes']: + target_attribute_label = ' '.join([label.capitalize() for label in target_attribute['name'].replace('_', ' ').split()]) + st.write(f"**{target_attribute_label}**") + # write the description of the attribute + st.write(f'**Description:** {target_attribute["description"]}') + # write the data type of the attribute + st.write(f'**Data Type:** {target_attribute["type"]}') + def_choice = find_closest_name(choices, target_attribute_label) + mapped_col = st.selectbox("Select Corresponding Column", choices, + key=target_attribute['name'], index=choices.index(def_choice)) + mappings[target_attribute['name']] = mapped_col if mapped_col != "None" else None + st.markdown("""---""") + # Once mappings are set, allow the user to populate the database + if st.button("Populate Database"): + populate_subject_table(df, selected_db, mappings, config_path) + st.success(f"Subject table `{user_tbl_name}` populated!") + + + # else, populate feature time series tables + else: + # Default selections based on column name similarity + default_timestamp = find_closest_name(df.columns, 'time timestamp date start_time end_time') + default_user_id = find_closest_name(df.columns, 'user id email patient') + + timestamp_col = st.selectbox("**Select Timestamp Column**", df.columns, index=df.columns.get_loc(default_timestamp)) + user_id_col = st.selectbox("**Select User ID Column**", df.columns, index=df.columns.get_loc(default_user_id)) + + # Foldable block for optional mappings + mappings = { + config['mapping']['columns']['timestamp']: timestamp_col, + config["mapping"]['columns']['user_id']: user_id_col, + } + table_mappings = {} + with st.expander("**Map Features to W4H Tables**", expanded=True): + st.write("Map your CSV columns to corresponding W4H tables.") + + choices = ["None"] + list(df.columns) + for target_table_name in config['mapping']['tables']['time_series'] + config['mapping']['tables']['geo']: + target_table_label = ' '.join([label.capitalize() for label in target_table_name.replace('_', ' ').split()]) + st.subheader(target_table_label) + def_choice = find_closest_name(choices, target_table_label) + mapped_col = st.selectbox("Select Corresponding Column", choices, + key=target_table_name, index=choices.index(def_choice)) + table_mappings[target_table_name] = mapped_col if mapped_col != "None" else None + + # Once mappings are set, allow the user to populate the database + if st.button("Populate Database"): + mappings = {**mappings, **table_mappings} + + populate_db(df, selected_db, mappings, config_path) + st.success("GeoMTS tables populated!") # if __name__ == "__main__": # main() \ No newline at end of file diff --git a/script/w4h_db_utils.py b/script/w4h_db_utils.py index 32c1a6e..a61bfa9 100644 --- a/script/w4h_db_utils.py +++ b/script/w4h_db_utils.py @@ -6,7 +6,7 @@ from loguru import logger import pandas as pd -from sqlalchemy import create_engine, text, MetaData, Table, Column, String, ForeignKey, DateTime, REAL +from sqlalchemy import create_engine, text, MetaData, Table, Column, String, ForeignKey, DateTime, REAL, Integer, Float, Boolean from sqlalchemy.orm import sessionmaker from sqlalchemy_utils import database_exists, create_database @@ -26,35 +26,36 @@ def create_tables(db_server_nickname:str, db_name: str, config_file='conf/config metadata = MetaData() config = load_config(config_file=config_file) db_engine = get_db_engine(config_file, db_server_nickname=db_server_nickname, db_name=db_name) - try: - columns_config = config["mapping"]["columns"] - - # Create the user table - user_table_config = config["mapping"]["tables"]["user_table"] - user_columns = [eval(f'Column("{col_name}", {col_dtype}, primary_key={col_name == columns_config["user_id"]})') for col_name, col_dtype in user_table_config["columns"].items()] # Convert string to actual SQLAlchemy type - user_table = Table(user_table_config["name"], metadata, *user_columns) - - - # Create time series tables - for table_name in config["mapping"]["tables"]["time_series"]: - table = Table(table_name, metadata, - Column(columns_config["user_id"], ForeignKey(user_table_config["name"] + '.' + columns_config["user_id"]), primary_key=True), - Column(columns_config["timestamp"], DateTime, primary_key=True), - Column(columns_config["value"], REAL), - ) - - # Create geo tables - for table_name in config["mapping"]["tables"]["geo"]: - table = Table(table_name, metadata, - Column(columns_config["user_id"], ForeignKey(user_table_config["name"] + '.' + columns_config["user_id"]), primary_key=True), - Column(columns_config["timestamp"], DateTime, primary_key=True), - Column(columns_config["value"], Geometry('POINT')) - ) - - metadata.create_all(db_engine) - except Exception as err: - db_engine.dispose() - logger.error(err) + # try: + columns_config = config["mapping"]["columns"] + + # Create the user table + user_table_config = config["mapping"]["tables"]["user_table"] + dtype_mappings = config['mapping']['data_type_mappings'] + user_columns = [eval(f'Column("{col_attribute["name"]}", {dtype_mappings[col_attribute["type"]]}, primary_key={col_attribute["name"] == columns_config["user_id"]})') for col_attribute in user_table_config["attributes"]] # Convert string to actual SQLAlchemy type + user_table = Table(user_table_config["name"], metadata, *user_columns) + + + # Create time series tables + for table_name in config["mapping"]["tables"]["time_series"]: + table = Table(table_name, metadata, + Column(columns_config["user_id"], ForeignKey(user_table_config["name"] + '.' + columns_config["user_id"]), primary_key=True), + Column(columns_config["timestamp"], DateTime, primary_key=True), + Column(columns_config["value"], REAL), + ) + + # Create geo tables + for table_name in config["mapping"]["tables"]["geo"]: + table = Table(table_name, metadata, + Column(columns_config["user_id"], ForeignKey(user_table_config["name"] + '.' + columns_config["user_id"]), primary_key=True), + Column(columns_config["timestamp"], DateTime, primary_key=True), + Column(columns_config["value"], Geometry('POINT')) + ) + + metadata.create_all(db_engine) + # except Exception as err: + # db_engine.dispose() + # logger.error(err) @@ -204,13 +205,14 @@ def populate_tables(df: pd.DataFrame, db_name: str, mappings: dict, config_path= engine.dispose() -def populate_subject_table(df: pd.DataFrame, db_name: str, config_path='conf/config.yaml', user_tbl_name=None): - """Populate the W4H subject table in the given database with the data from the given dataframe based on - the given subject table name in the config file. +def populate_subject_table(df: pd.DataFrame, db_name: str, mappings: dict, config_path='conf/config.yaml'): + """Populate the W4H tables in the given database with the data from the given dataframe based on + the mappings between the CSV columns and the database tables. Args: - df (pd.DataFrame): Dataframe containing the subject data to be inserted into the database - db_name (str): Name of the subject database to insert the data into + df (pd.DataFrame): Dataframe containing the data to be inserted into the database + db_name (str): Name of the database to insert the data into + mappings (dict): Dictionary containing the mappings between the CSV columns and the database tables config_path (str, optional): Path to the config file. Defaults to 'conf/config.yaml'. """ # Load the config @@ -219,12 +221,21 @@ def populate_subject_table(df: pd.DataFrame, db_name: str, config_path='conf/con # Create a session engine = get_db_engine(config_path, mixed_db_name=db_name) + # create a user table dataframe using the mappings + user_tbl_name = config['mapping']['tables']['user_table']['name'] + user_df = pd.DataFrame() + for k, v in mappings.items(): + if v is not None: + user_df[k] = df[v] # populate the user table (directly push df to table), if already exists, append new users - df.to_sql(user_tbl_name, engine, if_exists='append', index=False) + # if columns don't exist, ignore + user_df.to_sql(user_tbl_name, engine, if_exists='append', index=False) # Commit the remaining changes and close the session engine.dispose() + + def getCurrentDbByUsername(username): with sqlite3.connect('user.db') as conn: cursor = conn.cursor()