diff --git a/jobs/payment-jobs/services/data_warehouse.py b/jobs/payment-jobs/services/data_warehouse.py index 79dbeb84f..15de87eff 100644 --- a/jobs/payment-jobs/services/data_warehouse.py +++ b/jobs/payment-jobs/services/data_warehouse.py @@ -18,11 +18,12 @@ # services/data_warehouse.py +from dataclasses import dataclass + import pg8000 +from google.cloud.sql.connector import Connector from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker -from google.cloud.sql.connector import Connector -from dataclasses import dataclass @dataclass @@ -49,14 +50,14 @@ def getconn(connector: Connector, db_config: DBConfig) -> object: """ if db_config.unix_sock: # Use Unix socket connection with the Connector for deployment - instance_connection_string = db_config.unix_sock.replace('/cloudsql/', '') + instance_connection_string = db_config.unix_sock.replace("/cloudsql/", "") return connector.connect( instance_connection_string=instance_connection_string, - ip_type='private', + ip_type="private", user=db_config.user, password=db_config.password, db=db_config.database, - driver='pg8000', + driver="pg8000", ) else: conn = pg8000.connect( @@ -64,7 +65,7 @@ def getconn(connector: Connector, db_config: DBConfig) -> object: user=db_config.user, password=db_config.password, host=db_config.host, - port=db_config.port + port=db_config.port, ) return conn @@ -98,7 +99,7 @@ def init_app(self, app): max_overflow=2, pool_timeout=10, pool_recycle=1800, - connect_args={"use_native_uuid": False} + connect_args={"use_native_uuid": False}, ) app.teardown_appcontext(self.teardown) diff --git a/jobs/payment-jobs/tasks/bcol_refund_confirmation_task.py b/jobs/payment-jobs/tasks/bcol_refund_confirmation_task.py index cf1fa5677..43fd02027 100644 --- a/jobs/payment-jobs/tasks/bcol_refund_confirmation_task.py +++ b/jobs/payment-jobs/tasks/bcol_refund_confirmation_task.py @@ -72,7 +72,7 @@ def _get_data_warehouse_bcol_records_for_invoices(cls, invoice_refs: List[Invoic # Split invoice refs into groups of 5000 invoice_ref_chunks = [] for i in range(0, len(invoice_refs), 5000): - invoice_ref_chunks.append(invoice_refs[i: i + 5000]) + invoice_ref_chunks.append(invoice_refs[i : i + 5000]) bcol_refunds_all = {} current_app.logger.debug("Connecting to data_warehouse...") @@ -81,17 +81,19 @@ def _get_data_warehouse_bcol_records_for_invoices(cls, invoice_refs: List[Invoic invoice_numbers_str = ", ".join("'" + str(x.invoice_number) + "'" for x in invoice_ref_grp) current_app.logger.debug("Collecting Data Warehouse BCOL refund records...") - query = text(f""" + query = text( + f""" SELECT key, total_amt FROM colin.bconline_billing_record WHERE key IN ({invoice_numbers_str}) AND qty = -1 - """) + """ + ) results = session.execute(query).fetchall() # Convert float from the database to Decimal - bcol_refunds_all.update({row['key']: Decimal(str(row['total_amt'])) for row in results}) + bcol_refunds_all.update({row["key"]: Decimal(str(row["total_amt"])) for row in results}) # set invoice_number as the key (makes it easier map against) return bcol_refunds_all diff --git a/jobs/payment-jobs/tasks/cfs_create_invoice_task.py b/jobs/payment-jobs/tasks/cfs_create_invoice_task.py index cbd3daff5..bc2f814a6 100644 --- a/jobs/payment-jobs/tasks/cfs_create_invoice_task.py +++ b/jobs/payment-jobs/tasks/cfs_create_invoice_task.py @@ -355,7 +355,7 @@ def _create_pad_invoices(cls): # pylint: disable=too-many-locals continue # This is synced after receiving a CSV file at 9:30 AM each day. credit_remaining_total = CreditModel.find_remaining_by_account_id(account.id) - current_app.logger.info('credit_remaining_total: %s', credit_remaining_total) + current_app.logger.info("credit_remaining_total: %s", credit_remaining_total) credit_total = min(credit_remaining_total, invoice_total) additional_params = { "credit_total": float(credit_total), diff --git a/jobs/payment-jobs/tasks/ejv_partner_distribution_task.py b/jobs/payment-jobs/tasks/ejv_partner_distribution_task.py index c640c4b49..34d72ccc7 100644 --- a/jobs/payment-jobs/tasks/ejv_partner_distribution_task.py +++ b/jobs/payment-jobs/tasks/ejv_partner_distribution_task.py @@ -32,7 +32,7 @@ from pay_api.models import Receipt as ReceiptModel from pay_api.models import db from pay_api.utils.enums import DisbursementStatus, EjvFileType, EJVLinkType, InvoiceStatus, PaymentMethod -from sqlalchemy import Date, and_, cast +from sqlalchemy import Date, and_, cast, or_ from tasks.common.cgi_ejv import CgiEjv from tasks.common.dataclasses import Disbursement, DisbursementLineItem @@ -160,6 +160,15 @@ def get_disbursement_by_distribution_for_partner(partner): .filter(PartnerDisbursementsModel.partner_code == partner.code) .filter(DistributionCodeModel.stop_ejv.is_(False) | DistributionCodeModel.stop_ejv.is_(None)) .filter(~InvoiceModel.receipts.any(cast(ReceiptModel.receipt_date, Date) >= disbursement_date.date())) + .filter( + or_( + and_( + PartnerDisbursementsModel.is_reversal.is_(False), + InvoiceModel.invoice_status_code == InvoiceStatus.PAID.value, + ), + PartnerDisbursementsModel.is_reversal.is_(True), + ) + ) .order_by(DistributionCodeModel.distribution_code_id, PaymentLineItemModel.id) .all() ) @@ -311,11 +320,15 @@ def _update_disbursement_status_and_ejv_link( raise NotImplementedError("Unknown disbursement type") # Possible this could already be created, eg two PLI. - if db.session.query(EjvLinkModel).filter( - EjvLinkModel.link_id == disbursement.line_item.identifier, - EjvLinkModel.link_type == disbursement.line_item.target_type, - EjvLinkModel.ejv_header_id == ejv_header_model.id, - ).first(): + if ( + db.session.query(EjvLinkModel) + .filter( + EjvLinkModel.link_id == disbursement.line_item.identifier, + EjvLinkModel.link_type == disbursement.line_item.target_type, + EjvLinkModel.ejv_header_id == ejv_header_model.id, + ) + .first() + ): return db.session.add( diff --git a/jobs/payment-jobs/tests/services/test_data_warehouse_connection.py b/jobs/payment-jobs/tests/services/test_data_warehouse_connection.py index eeb2fa9c0..fd709d012 100644 --- a/jobs/payment-jobs/tests/services/test_data_warehouse_connection.py +++ b/jobs/payment-jobs/tests/services/test_data_warehouse_connection.py @@ -26,11 +26,11 @@ def app(): """Create a Flask app instance configured for testing.""" app = Flask(__name__) - app.config['DW_HOST'] = 'mock_host' - app.config['DW_PORT'] = 5432 - app.config['DW_NAME'] = 'mock_database' - app.config['DW_USER'] = 'mock_user' - app.config['DW_PASSWORD'] = 'mock_password' + app.config["DW_HOST"] = "mock_host" + app.config["DW_PORT"] = 5432 + app.config["DW_NAME"] = "mock_database" + app.config["DW_USER"] = "mock_user" + app.config["DW_PASSWORD"] = "mock_password" return app