Skip to content

Commit

Permalink
fix: add schema in postgres (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
rivamarco authored Jul 19, 2024
1 parent 62f3852 commit e8e143c
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 73 deletions.
25 changes: 17 additions & 8 deletions api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ Run
poetry run ruff format
```

to format, and
to format, and

```bash
poetry run ruff check
```

to check.


## Test

Please install a PostgreSQL database locally. For example, on a macOS platform, execute:
Expand Down Expand Up @@ -61,14 +60,16 @@ The OpenAPI specs are available under `http://localhost:9000/docs`

We use [alembic](https://pypi.org/project/alembic/) as engine to manage database migrations.

Database migrations are automatically managed by alembic in docker-compose file. It will connect to the database and apply the migrations defined in [alembic](./alembic/versions/) folder.
Database migrations are automatically managed by alembic in docker-compose file. It will connect to the database and
apply the migrations defined in [alembic](./alembic/versions/) folder.

### Generate a new migration

We use [sqlalchemy](https://docs.sqlalchemy.org/en/20/) to define the database tables. All table clas are stored in [./app/db/tables/](./app/db/tables/) folder.

We use [sqlalchemy](https://docs.sqlalchemy.org/en/20/) to define the database tables. All table clas are stored
in [./app/db/tables/](./app/db/tables/) folder.

If you have updated a class inside the above folder, you can use the docker-compose file to generate the new migration file by using alembic command:
If you have updated a class inside the above folder, you can use the docker-compose file to generate the new migration
file by using alembic command:

```bash
docker compose run migrations /bin/sh -c "alembic revision --autogenerate -m "GIVE A NAME TO THIS REVISION""
Expand All @@ -78,11 +79,19 @@ this will ooutput a new migration file that is going to be used by `migrations`

### Generate migration for a new Table

If you need to create a new table in the database, you have to create a new python class in the [./app/db/tables/](./app/db/tables/) and you need to run the same command described above:
If you need to create a new table in the database, you have to create a new python class in
the [./app/db/tables/](./app/db/tables/) and you need to run the same command described above:

```bash
docker compose run migrations /bin/sh -c "alembic revision --autogenerate -m "GIVE A NAME TO THIS REVISION""
```

***Notes***: Alembic watches for all BaseTables classes imported in the [env.py](./alembic/env.py) file. If you add a new file for the table, be sure to import the class in the env.py file.
***Notes***: Alembic watches for all BaseTables classes imported in the [env.py](./alembic/env.py) file. If you add a
new file for the table, be sure to import the class in the env.py file.

### Use another PostgreSQL schema

By default, the PostgreSQL schema that is used in the platform and in the migration files is `public`. If you want to
store data and use another schema, you need to modify the environment variables of the schema in
the `docker-compose.yaml` accordingly, and you have to either manually modify migrations script or re-create the
migrations with above commands.
16 changes: 16 additions & 0 deletions api/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
# my_important_option = config.get_main_option("my_important_option")
# ... etc.

def include_name(name, type_, parent_names):
if type_ == "schema":
return name in [target_metadata.schema]
else:
return True


def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
Expand All @@ -59,9 +65,14 @@ def run_migrations_offline() -> None:
literal_binds=True,
dialect_opts={"paramstyle": "named"},
render_item=render_item,
version_table_schema=target_metadata.schema,
include_schemas=True,
include_name=include_name
)

with context.begin_transaction():
context.execute(f'create schema if not exists "{target_metadata.schema}";')
context.execute(f'set search_path to "{target_metadata.schema}"')
context.run_migrations()


Expand All @@ -83,9 +94,14 @@ def run_migrations_online() -> None:
connection=connection,
target_metadata=target_metadata,
render_item=render_item,
version_table_schema=target_metadata.schema,
include_schemas=True,
include_name=include_name
)

with context.begin_transaction():
context.execute(f'create schema if not exists "{target_metadata.schema}";')
context.execute(f'set search_path to "{target_metadata.schema}"')
context.run_migrations()


Expand Down
42 changes: 0 additions & 42 deletions api/alembic/versions/3ec04e609ae9_set_correlation_id_optional.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""init_db
Revision ID: 086f26392cc4
Revision ID: c3795dd0d722
Revises:
Create Date: 2024-06-12 14:50:02.128979
Create Date: 2024-07-18 11:42:03.862912
"""
from typing import Sequence, Union, Text
Expand All @@ -12,7 +12,7 @@
from app.db.tables.commons.json_encoded_dict import JSONEncodedDict

# revision identifiers, used by Alembic.
revision: str = '086f26392cc4'
revision: str = 'c3795dd0d722'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
Expand All @@ -38,28 +38,31 @@ def upgrade() -> None:
sa.Column('UPDATED_AT', sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column('DELETED', sa.BOOLEAN(), nullable=False),
sa.PrimaryKeyConstraint('ID'),
sa.UniqueConstraint('UUID')
sa.UniqueConstraint('UUID'),
schema='public'
)
op.create_table('current_dataset',
sa.Column('UUID', sa.UUID(), nullable=False),
sa.Column('MODEL_UUID', sa.UUID(), nullable=False),
sa.Column('PATH', sa.VARCHAR(), nullable=False),
sa.Column('DATE', sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column('CORRELATION_ID_COLUMN', sa.VARCHAR(), nullable=False),
sa.Column('CORRELATION_ID_COLUMN', sa.VARCHAR(), nullable=True),
sa.Column('STATUS', sa.VARCHAR(), nullable=False),
sa.ForeignKeyConstraint(['MODEL_UUID'], ['model.UUID'], ),
sa.ForeignKeyConstraint(['MODEL_UUID'], ['public.model.UUID'], ),
sa.PrimaryKeyConstraint('UUID'),
sa.UniqueConstraint('UUID')
sa.UniqueConstraint('UUID'),
schema='public'
)
op.create_table('reference_dataset',
sa.Column('UUID', sa.UUID(), nullable=False),
sa.Column('MODEL_UUID', sa.UUID(), nullable=False),
sa.Column('PATH', sa.VARCHAR(), nullable=False),
sa.Column('DATE', sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column('STATUS', sa.VARCHAR(), nullable=False),
sa.ForeignKeyConstraint(['MODEL_UUID'], ['model.UUID'], ),
sa.ForeignKeyConstraint(['MODEL_UUID'], ['public.model.UUID'], ),
sa.PrimaryKeyConstraint('UUID'),
sa.UniqueConstraint('UUID')
sa.UniqueConstraint('UUID'),
schema='public'
)
op.create_table('current_dataset_metrics',
sa.Column('UUID', sa.UUID(), nullable=False),
Expand All @@ -68,28 +71,30 @@ def upgrade() -> None:
sa.Column('DATA_QUALITY', JSONEncodedDict(astext_type=Text()), nullable=True),
sa.Column('DRIFT', JSONEncodedDict(astext_type=Text()), nullable=True),
sa.Column('STATISTICS', JSONEncodedDict(astext_type=Text()), nullable=True),
sa.ForeignKeyConstraint(['CURRENT_UUID'], ['current_dataset.UUID'], ),
sa.ForeignKeyConstraint(['CURRENT_UUID'], ['public.current_dataset.UUID'], ),
sa.PrimaryKeyConstraint('UUID'),
sa.UniqueConstraint('UUID')
sa.UniqueConstraint('UUID'),
schema='public'
)
op.create_table('reference_dataset_metrics',
sa.Column('UUID', sa.UUID(), nullable=False),
sa.Column('REFERENCE_UUID', sa.UUID(), nullable=False),
sa.Column('MODEL_QUALITY', JSONEncodedDict(astext_type=Text()), nullable=True),
sa.Column('DATA_QUALITY', JSONEncodedDict(astext_type=Text()), nullable=True),
sa.Column('STATISTICS', JSONEncodedDict(astext_type=Text()), nullable=True),
sa.ForeignKeyConstraint(['REFERENCE_UUID'], ['reference_dataset.UUID'], ),
sa.ForeignKeyConstraint(['REFERENCE_UUID'], ['public.reference_dataset.UUID'], ),
sa.PrimaryKeyConstraint('UUID'),
sa.UniqueConstraint('UUID')
sa.UniqueConstraint('UUID'),
schema='public'
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('reference_dataset_metrics')
op.drop_table('current_dataset_metrics')
op.drop_table('reference_dataset')
op.drop_table('current_dataset')
op.drop_table('model')
op.drop_table('reference_dataset_metrics', schema='public')
op.drop_table('current_dataset_metrics', schema='public')
op.drop_table('reference_dataset', schema='public')
op.drop_table('current_dataset', schema='public')
op.drop_table('model', schema='public')
# ### end Alembic commands ###
2 changes: 2 additions & 0 deletions api/app/core/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class DBConfig(BaseSettings):
db_user: str = 'postgres'
db_pwd: str = 'postgres'
db_name: str = 'postgres'
db_schema: str = 'public'


class FileUploadConfig(BaseSettings):
Expand Down Expand Up @@ -139,4 +140,5 @@ def create_secrets():
'POSTGRES_PORT': f'{db_config.db_port}',
'POSTGRES_USER': db_config.db_user,
'POSTGRES_PASSWORD': db_config.db_pwd,
'POSTGRES_SCHEMA': db_config.db_schema,
}
2 changes: 1 addition & 1 deletion api/app/db/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Reflected(DeferredReflection):
__abstract__ = True


BaseTable = declarative_base(metadata=MetaData())
BaseTable = declarative_base(metadata=MetaData(schema=get_config().db_config.db_schema))


class Database:
Expand Down
3 changes: 2 additions & 1 deletion api/resources/db.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ DB_HOST="localhost"
DB_PORT="5432"
DB_USER="postgres"
DB_PWD="postgres"
DB_NAME="postgres"
DB_NAME="postgres"
DB_SCHEMA="public"
5 changes: 4 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ services:
DB_USER: "postgres"
DB_PWD: "postgres"
DB_NAME: "radicalbit"
DB_SCHEMA: "public"
AWS_ACCESS_KEY_ID: "minio"
AWS_SECRET_ACCESS_KEY: "minio123"
AWS_REGION: "us-east-1"
Expand Down Expand Up @@ -64,6 +65,7 @@ services:
DB_USER: "postgres"
DB_PWD: "postgres"
DB_NAME: "radicalbit"
DB_SCHEMA: "public"
depends_on:
postgres:
condition: service_healthy
Expand Down Expand Up @@ -136,12 +138,13 @@ services:
POSTGRES_DB: radicalbit
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_SCHEMA: public
volumes:
- ./init-data/init_db.sql:/docker-entrypoint-initdb.d/init_db.sql
entrypoint: >
/bin/sh -c "
sleep 10;
PGPASSWORD=postgres psql -h postgres -U postgres -d radicalbit -f /docker-entrypoint-initdb.d/init_db.sql;
PGPASSWORD=postgres PGOPTIONS="--search_path=$${POSTGRES_SCHEMA}" psql -h postgres -U postgres -d radicalbit -f /docker-entrypoint-initdb.d/init_db.sql;
"
minio-mirror:
Expand Down
10 changes: 8 additions & 2 deletions spark/jobs/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@
db_name = os.getenv("POSTGRES_DB")
user = os.getenv("POSTGRES_USER")
password = os.getenv("POSTGRES_PASSWORD")
postgres_schema = os.getenv("POSTGRES_SCHEMA")

url = f"jdbc:postgresql://{db_host}:{db_port}/{db_name}"


def update_job_status(file_uuid: str, status: str, table_name: str):
# Use psycopg2 to update the job status
with psycopg2.connect(
host=db_host, dbname=db_name, user=user, password=password, port=db_port
host=db_host,
dbname=db_name,
user=user,
password=password,
port=db_port,
options=f"-c search_path=dbo,{postgres_schema}",
) as conn:
with conn.cursor() as cur:
cur.execute(
Expand All @@ -43,4 +49,4 @@ def write_to_db(
"stringtype", "unspecified"
).option("driver", "org.postgresql.Driver").option("user", user).option(
"password", password
).option("dbtable", table_name).mode("append").save()
).option("dbtable", f'"{postgres_schema}"."{table_name}"').mode("append").save()

0 comments on commit e8e143c

Please sign in to comment.