Skip to content

Commit

Permalink
fix flake8 errors
Browse files Browse the repository at this point in the history
  • Loading branch information
vikineema committed Feb 22, 2024
1 parent cafc1d2 commit 69e6e82
Showing 1 changed file with 30 additions and 6 deletions.
36 changes: 30 additions & 6 deletions deafrica_conflux/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ def get_engine_waterbodies() -> Engine:
return create_engine(database_url, future=True)


def get_engine_waterbodies_dev_sandbox(password: str, pool_size: int = 5, max_overflow:int = 10) -> Engine:
def get_engine_waterbodies_dev_sandbox(
password: str, pool_size: int = 5, max_overflow: int = 10
) -> Engine:
"""Get the DEV Waterbodies database engine."""
username = "waterbodies_writer"
host = "db-writer"
port = 5432
port = 5432
database_name = "waterbodies"

dialect = "postgresql"
Expand All @@ -95,6 +97,28 @@ def get_engine_waterbodies_dev_sandbox(password: str, pool_size: int = 5, max_ov
return create_engine(database_url, future=True, pool_size=pool_size, max_overflow=max_overflow)


def list_schemas(engine: Engine) -> list[str]:
"""
List the schemas present in the database.
Parameters
----------
engine: sqlalchemy.engine.Engine
Database engine.
"""
# Create an inspector
inspector = inspect(engine)

# List schemas in the database
schemas = inspector.get_schema_names()

if schemas:
_log.info(f"Schemas in the database: {', '.join(schemas)}")
else:
_log.info("No schemas found in database")

return schemas


def list_tables(engine: Engine) -> list[str]:
"""List the tables in present.
Expand Down Expand Up @@ -122,7 +146,7 @@ def list_tables(engine: Engine) -> list[str]:
def get_table(engine: Engine, table_name: str) -> Table:
"""Get a table using the table name."""
# Create a metadata object
metadata = MetaData()
metadata = MetaData(bind=engine)

# Reflect the table from the database
_log.info(f"Finding {table_name} table...")
Expand Down Expand Up @@ -523,7 +547,7 @@ def add_waterbody_observations_table_to_db(
table = create_waterbody_obs_table(engine)

# Note: Doing it this way because drill outputs can be millions of rows.
session = Session() # Create a new session for this task
session = Session() # Create a new session for this task
obs_ids_to_check = [f"{task_id_string}_{i}" for i in df.index.to_list()]
obs_ids_exist = session.scalars(select(table).where(table.c.obs_id.in_(obs_ids_to_check))).all()
session.close() # Close the session to return the connection to the pool
Expand Down Expand Up @@ -567,7 +591,7 @@ def add_waterbody_observations_table_to_db(

if update_statements:
_log.info(f"Updating {len(update_statements)} observations in the {table.name} table")
session = Session() # Create a new session for this task
session = Session() # Create a new session for this task
for statement in update_statements:
try:
session.execute(statement)
Expand All @@ -582,7 +606,7 @@ def add_waterbody_observations_table_to_db(
_log.error(f"No observations to update in the {table.name} table")

if insert_objects_list:
session = Session() # Create a new session for this task
session = Session() # Create a new session for this task
try:
_log.info(f"Adding {len(insert_objects_list)} observations to the {table.name} table")
session.execute(insert(table), insert_objects_list)
Expand Down

0 comments on commit 69e6e82

Please sign in to comment.