From e8e143c77203da15887529c73522b27f0c37cd9b Mon Sep 17 00:00:00 2001 From: rivamarco Date: Fri, 19 Jul 2024 09:32:27 +0200 Subject: [PATCH] fix: add schema in postgres (#122) --- api/README.md | 25 +++++++---- api/alembic/env.py | 16 +++++++ ...ec04e609ae9_set_correlation_id_optional.py | 42 ------------------- ...cc4_init_db.py => c3795dd0d722_init_db.py} | 41 ++++++++++-------- api/app/core/config/config.py | 2 + api/app/db/database.py | 2 +- api/resources/db.conf | 3 +- docker-compose.yaml | 5 ++- spark/jobs/utils/db.py | 10 ++++- 9 files changed, 73 insertions(+), 73 deletions(-) delete mode 100644 api/alembic/versions/3ec04e609ae9_set_correlation_id_optional.py rename api/alembic/versions/{086f26392cc4_init_db.py => c3795dd0d722_init_db.py} (77%) diff --git a/api/README.md b/api/README.md index 463b1153..dd4db5bf 100644 --- a/api/README.md +++ b/api/README.md @@ -22,7 +22,7 @@ Run poetry run ruff format ``` -to format, and +to format, and ```bash poetry run ruff check @@ -30,7 +30,6 @@ poetry run ruff check to check. - ## Test Please install a PostgreSQL database locally. For example, on a macOS platform, execute: @@ -61,14 +60,16 @@ The OpenAPI specs are available under `http://localhost:9000/docs` We use [alembic](https://pypi.org/project/alembic/) as engine to manage database migrations. -Database migrations are automatically managed by alembic in docker-compose file. It will connect to the database and apply the migrations defined in [alembic](./alembic/versions/) folder. +Database migrations are automatically managed by alembic in docker-compose file. It will connect to the database and +apply the migrations defined in [alembic](./alembic/versions/) folder. ### Generate a new migration -We use [sqlalchemy](https://docs.sqlalchemy.org/en/20/) to define the database tables. All table clas are stored in [./app/db/tables/](./app/db/tables/) folder. - +We use [sqlalchemy](https://docs.sqlalchemy.org/en/20/) to define the database tables. All table clas are stored +in [./app/db/tables/](./app/db/tables/) folder. -If you have updated a class inside the above folder, you can use the docker-compose file to generate the new migration file by using alembic command: +If you have updated a class inside the above folder, you can use the docker-compose file to generate the new migration +file by using alembic command: ```bash docker compose run migrations /bin/sh -c "alembic revision --autogenerate -m "GIVE A NAME TO THIS REVISION"" @@ -78,11 +79,19 @@ this will ooutput a new migration file that is going to be used by `migrations` ### Generate migration for a new Table -If you need to create a new table in the database, you have to create a new python class in the [./app/db/tables/](./app/db/tables/) and you need to run the same command described above: +If you need to create a new table in the database, you have to create a new python class in +the [./app/db/tables/](./app/db/tables/) and you need to run the same command described above: ```bash docker compose run migrations /bin/sh -c "alembic revision --autogenerate -m "GIVE A NAME TO THIS REVISION"" ``` -***Notes***: Alembic watches for all BaseTables classes imported in the [env.py](./alembic/env.py) file. If you add a new file for the table, be sure to import the class in the env.py file. +***Notes***: Alembic watches for all BaseTables classes imported in the [env.py](./alembic/env.py) file. If you add a +new file for the table, be sure to import the class in the env.py file. + +### Use another PostgreSQL schema +By default, the PostgreSQL schema that is used in the platform and in the migration files is `public`. If you want to +store data and use another schema, you need to modify the environment variables of the schema in +the `docker-compose.yaml` accordingly, and you have to either manually modify migrations script or re-create the +migrations with above commands. diff --git a/api/alembic/env.py b/api/alembic/env.py index a62dd0c1..2f9c06f2 100644 --- a/api/alembic/env.py +++ b/api/alembic/env.py @@ -39,6 +39,12 @@ # my_important_option = config.get_main_option("my_important_option") # ... etc. +def include_name(name, type_, parent_names): + if type_ == "schema": + return name in [target_metadata.schema] + else: + return True + def run_migrations_offline() -> None: """Run migrations in 'offline' mode. @@ -59,9 +65,14 @@ def run_migrations_offline() -> None: literal_binds=True, dialect_opts={"paramstyle": "named"}, render_item=render_item, + version_table_schema=target_metadata.schema, + include_schemas=True, + include_name=include_name ) with context.begin_transaction(): + context.execute(f'create schema if not exists "{target_metadata.schema}";') + context.execute(f'set search_path to "{target_metadata.schema}"') context.run_migrations() @@ -83,9 +94,14 @@ def run_migrations_online() -> None: connection=connection, target_metadata=target_metadata, render_item=render_item, + version_table_schema=target_metadata.schema, + include_schemas=True, + include_name=include_name ) with context.begin_transaction(): + context.execute(f'create schema if not exists "{target_metadata.schema}";') + context.execute(f'set search_path to "{target_metadata.schema}"') context.run_migrations() diff --git a/api/alembic/versions/3ec04e609ae9_set_correlation_id_optional.py b/api/alembic/versions/3ec04e609ae9_set_correlation_id_optional.py deleted file mode 100644 index feab27f6..00000000 --- a/api/alembic/versions/3ec04e609ae9_set_correlation_id_optional.py +++ /dev/null @@ -1,42 +0,0 @@ -"""set_correlation_id_optional - -Revision ID: 3ec04e609ae9 -Revises: 086f26392cc4 -Create Date: 2024-07-08 10:28:35.068312 - -""" -from typing import Sequence, Union, Text - -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision: str = '3ec04e609ae9' -down_revision: Union[str, None] = '086f26392cc4' -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None - - -def upgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.alter_column('current_dataset', 'CORRELATION_ID_COLUMN', - existing_type=sa.VARCHAR(), - nullable=True) - op.create_unique_constraint(None, 'current_dataset', ['UUID']) - op.create_unique_constraint(None, 'current_dataset_metrics', ['UUID']) - op.create_unique_constraint(None, 'reference_dataset', ['UUID']) - op.create_unique_constraint(None, 'reference_dataset_metrics', ['UUID']) - # ### end Alembic commands ### - - -def downgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.drop_constraint(None, 'reference_dataset_metrics', type_='unique') - op.drop_constraint(None, 'reference_dataset', type_='unique') - op.drop_constraint(None, 'current_dataset_metrics', type_='unique') - op.drop_constraint(None, 'current_dataset', type_='unique') - op.alter_column('current_dataset', 'CORRELATION_ID_COLUMN', - existing_type=sa.VARCHAR(), - nullable=False) - # ### end Alembic commands ### diff --git a/api/alembic/versions/086f26392cc4_init_db.py b/api/alembic/versions/c3795dd0d722_init_db.py similarity index 77% rename from api/alembic/versions/086f26392cc4_init_db.py rename to api/alembic/versions/c3795dd0d722_init_db.py index ed6c7030..2a8eed8b 100644 --- a/api/alembic/versions/086f26392cc4_init_db.py +++ b/api/alembic/versions/c3795dd0d722_init_db.py @@ -1,8 +1,8 @@ """init_db -Revision ID: 086f26392cc4 +Revision ID: c3795dd0d722 Revises: -Create Date: 2024-06-12 14:50:02.128979 +Create Date: 2024-07-18 11:42:03.862912 """ from typing import Sequence, Union, Text @@ -12,7 +12,7 @@ from app.db.tables.commons.json_encoded_dict import JSONEncodedDict # revision identifiers, used by Alembic. -revision: str = '086f26392cc4' +revision: str = 'c3795dd0d722' down_revision: Union[str, None] = None branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None @@ -38,18 +38,20 @@ def upgrade() -> None: sa.Column('UPDATED_AT', sa.TIMESTAMP(timezone=True), nullable=False), sa.Column('DELETED', sa.BOOLEAN(), nullable=False), sa.PrimaryKeyConstraint('ID'), - sa.UniqueConstraint('UUID') + sa.UniqueConstraint('UUID'), + schema='public' ) op.create_table('current_dataset', sa.Column('UUID', sa.UUID(), nullable=False), sa.Column('MODEL_UUID', sa.UUID(), nullable=False), sa.Column('PATH', sa.VARCHAR(), nullable=False), sa.Column('DATE', sa.TIMESTAMP(timezone=True), nullable=False), - sa.Column('CORRELATION_ID_COLUMN', sa.VARCHAR(), nullable=False), + sa.Column('CORRELATION_ID_COLUMN', sa.VARCHAR(), nullable=True), sa.Column('STATUS', sa.VARCHAR(), nullable=False), - sa.ForeignKeyConstraint(['MODEL_UUID'], ['model.UUID'], ), + sa.ForeignKeyConstraint(['MODEL_UUID'], ['public.model.UUID'], ), sa.PrimaryKeyConstraint('UUID'), - sa.UniqueConstraint('UUID') + sa.UniqueConstraint('UUID'), + schema='public' ) op.create_table('reference_dataset', sa.Column('UUID', sa.UUID(), nullable=False), @@ -57,9 +59,10 @@ def upgrade() -> None: sa.Column('PATH', sa.VARCHAR(), nullable=False), sa.Column('DATE', sa.TIMESTAMP(timezone=True), nullable=False), sa.Column('STATUS', sa.VARCHAR(), nullable=False), - sa.ForeignKeyConstraint(['MODEL_UUID'], ['model.UUID'], ), + sa.ForeignKeyConstraint(['MODEL_UUID'], ['public.model.UUID'], ), sa.PrimaryKeyConstraint('UUID'), - sa.UniqueConstraint('UUID') + sa.UniqueConstraint('UUID'), + schema='public' ) op.create_table('current_dataset_metrics', sa.Column('UUID', sa.UUID(), nullable=False), @@ -68,9 +71,10 @@ def upgrade() -> None: sa.Column('DATA_QUALITY', JSONEncodedDict(astext_type=Text()), nullable=True), sa.Column('DRIFT', JSONEncodedDict(astext_type=Text()), nullable=True), sa.Column('STATISTICS', JSONEncodedDict(astext_type=Text()), nullable=True), - sa.ForeignKeyConstraint(['CURRENT_UUID'], ['current_dataset.UUID'], ), + sa.ForeignKeyConstraint(['CURRENT_UUID'], ['public.current_dataset.UUID'], ), sa.PrimaryKeyConstraint('UUID'), - sa.UniqueConstraint('UUID') + sa.UniqueConstraint('UUID'), + schema='public' ) op.create_table('reference_dataset_metrics', sa.Column('UUID', sa.UUID(), nullable=False), @@ -78,18 +82,19 @@ def upgrade() -> None: sa.Column('MODEL_QUALITY', JSONEncodedDict(astext_type=Text()), nullable=True), sa.Column('DATA_QUALITY', JSONEncodedDict(astext_type=Text()), nullable=True), sa.Column('STATISTICS', JSONEncodedDict(astext_type=Text()), nullable=True), - sa.ForeignKeyConstraint(['REFERENCE_UUID'], ['reference_dataset.UUID'], ), + sa.ForeignKeyConstraint(['REFERENCE_UUID'], ['public.reference_dataset.UUID'], ), sa.PrimaryKeyConstraint('UUID'), - sa.UniqueConstraint('UUID') + sa.UniqueConstraint('UUID'), + schema='public' ) # ### end Alembic commands ### def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - op.drop_table('reference_dataset_metrics') - op.drop_table('current_dataset_metrics') - op.drop_table('reference_dataset') - op.drop_table('current_dataset') - op.drop_table('model') + op.drop_table('reference_dataset_metrics', schema='public') + op.drop_table('current_dataset_metrics', schema='public') + op.drop_table('reference_dataset', schema='public') + op.drop_table('current_dataset', schema='public') + op.drop_table('model', schema='public') # ### end Alembic commands ### diff --git a/api/app/core/config/config.py b/api/app/core/config/config.py index ef74ea5c..30c099a4 100644 --- a/api/app/core/config/config.py +++ b/api/app/core/config/config.py @@ -15,6 +15,7 @@ class DBConfig(BaseSettings): db_user: str = 'postgres' db_pwd: str = 'postgres' db_name: str = 'postgres' + db_schema: str = 'public' class FileUploadConfig(BaseSettings): @@ -139,4 +140,5 @@ def create_secrets(): 'POSTGRES_PORT': f'{db_config.db_port}', 'POSTGRES_USER': db_config.db_user, 'POSTGRES_PASSWORD': db_config.db_pwd, + 'POSTGRES_SCHEMA': db_config.db_schema, } diff --git a/api/app/db/database.py b/api/app/db/database.py index 572ec219..a7cc851b 100644 --- a/api/app/db/database.py +++ b/api/app/db/database.py @@ -15,7 +15,7 @@ class Reflected(DeferredReflection): __abstract__ = True -BaseTable = declarative_base(metadata=MetaData()) +BaseTable = declarative_base(metadata=MetaData(schema=get_config().db_config.db_schema)) class Database: diff --git a/api/resources/db.conf b/api/resources/db.conf index 808e9f3f..7fc77ef0 100644 --- a/api/resources/db.conf +++ b/api/resources/db.conf @@ -2,4 +2,5 @@ DB_HOST="localhost" DB_PORT="5432" DB_USER="postgres" DB_PWD="postgres" -DB_NAME="postgres" \ No newline at end of file +DB_NAME="postgres" +DB_SCHEMA="public" \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 2b2e1896..85cdf947 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -29,6 +29,7 @@ services: DB_USER: "postgres" DB_PWD: "postgres" DB_NAME: "radicalbit" + DB_SCHEMA: "public" AWS_ACCESS_KEY_ID: "minio" AWS_SECRET_ACCESS_KEY: "minio123" AWS_REGION: "us-east-1" @@ -64,6 +65,7 @@ services: DB_USER: "postgres" DB_PWD: "postgres" DB_NAME: "radicalbit" + DB_SCHEMA: "public" depends_on: postgres: condition: service_healthy @@ -136,12 +138,13 @@ services: POSTGRES_DB: radicalbit POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres + POSTGRES_SCHEMA: public volumes: - ./init-data/init_db.sql:/docker-entrypoint-initdb.d/init_db.sql entrypoint: > /bin/sh -c " sleep 10; - PGPASSWORD=postgres psql -h postgres -U postgres -d radicalbit -f /docker-entrypoint-initdb.d/init_db.sql; + PGPASSWORD=postgres PGOPTIONS="--search_path=$${POSTGRES_SCHEMA}" psql -h postgres -U postgres -d radicalbit -f /docker-entrypoint-initdb.d/init_db.sql; " minio-mirror: diff --git a/spark/jobs/utils/db.py b/spark/jobs/utils/db.py index 006260bd..495cd509 100644 --- a/spark/jobs/utils/db.py +++ b/spark/jobs/utils/db.py @@ -12,6 +12,7 @@ db_name = os.getenv("POSTGRES_DB") user = os.getenv("POSTGRES_USER") password = os.getenv("POSTGRES_PASSWORD") +postgres_schema = os.getenv("POSTGRES_SCHEMA") url = f"jdbc:postgresql://{db_host}:{db_port}/{db_name}" @@ -19,7 +20,12 @@ def update_job_status(file_uuid: str, status: str, table_name: str): # Use psycopg2 to update the job status with psycopg2.connect( - host=db_host, dbname=db_name, user=user, password=password, port=db_port + host=db_host, + dbname=db_name, + user=user, + password=password, + port=db_port, + options=f"-c search_path=dbo,{postgres_schema}", ) as conn: with conn.cursor() as cur: cur.execute( @@ -43,4 +49,4 @@ def write_to_db( "stringtype", "unspecified" ).option("driver", "org.postgresql.Driver").option("user", user).option( "password", password - ).option("dbtable", table_name).mode("append").save() + ).option("dbtable", f'"{postgres_schema}"."{table_name}"').mode("append").save()