Skip to content

Commit

Permalink
misc job fixes etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
seeker25 committed Nov 5, 2024
1 parent 28cab6d commit 6ed2a5a
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 23 deletions.
15 changes: 8 additions & 7 deletions jobs/payment-jobs/services/data_warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

# services/data_warehouse.py

from dataclasses import dataclass

import pg8000
from google.cloud.sql.connector import Connector
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from google.cloud.sql.connector import Connector
from dataclasses import dataclass


@dataclass
Expand All @@ -49,22 +50,22 @@ def getconn(connector: Connector, db_config: DBConfig) -> object:
"""
if db_config.unix_sock:
# Use Unix socket connection with the Connector for deployment
instance_connection_string = db_config.unix_sock.replace('/cloudsql/', '')
instance_connection_string = db_config.unix_sock.replace("/cloudsql/", "")
return connector.connect(
instance_connection_string=instance_connection_string,
ip_type='private',
ip_type="private",
user=db_config.user,
password=db_config.password,
db=db_config.database,
driver='pg8000',
driver="pg8000",
)
else:
conn = pg8000.connect(
database=db_config.database,
user=db_config.user,
password=db_config.password,
host=db_config.host,
port=db_config.port
port=db_config.port,
)
return conn

Expand Down Expand Up @@ -98,7 +99,7 @@ def init_app(self, app):
max_overflow=2,
pool_timeout=10,
pool_recycle=1800,
connect_args={"use_native_uuid": False}
connect_args={"use_native_uuid": False},
)

app.teardown_appcontext(self.teardown)
Expand Down
10 changes: 6 additions & 4 deletions jobs/payment-jobs/tasks/bcol_refund_confirmation_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _get_data_warehouse_bcol_records_for_invoices(cls, invoice_refs: List[Invoic
# Split invoice refs into groups of 5000
invoice_ref_chunks = []
for i in range(0, len(invoice_refs), 5000):
invoice_ref_chunks.append(invoice_refs[i: i + 5000])
invoice_ref_chunks.append(invoice_refs[i : i + 5000])

bcol_refunds_all = {}
current_app.logger.debug("Connecting to data_warehouse...")
Expand All @@ -81,17 +81,19 @@ def _get_data_warehouse_bcol_records_for_invoices(cls, invoice_refs: List[Invoic
invoice_numbers_str = ", ".join("'" + str(x.invoice_number) + "'" for x in invoice_ref_grp)

current_app.logger.debug("Collecting Data Warehouse BCOL refund records...")
query = text(f"""
query = text(
f"""
SELECT key, total_amt
FROM colin.bconline_billing_record
WHERE key IN ({invoice_numbers_str})
AND qty = -1
""")
"""
)

results = session.execute(query).fetchall()

# Convert float from the database to Decimal
bcol_refunds_all.update({row['key']: Decimal(str(row['total_amt'])) for row in results})
bcol_refunds_all.update({row["key"]: Decimal(str(row["total_amt"])) for row in results})
# set invoice_number as the key (makes it easier map against)
return bcol_refunds_all

Expand Down
2 changes: 1 addition & 1 deletion jobs/payment-jobs/tasks/cfs_create_invoice_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def _create_pad_invoices(cls): # pylint: disable=too-many-locals
continue
# This is synced after receiving a CSV file at 9:30 AM each day.
credit_remaining_total = CreditModel.find_remaining_by_account_id(account.id)
current_app.logger.info('credit_remaining_total: %s', credit_remaining_total)
current_app.logger.info("credit_remaining_total: %s", credit_remaining_total)
credit_total = min(credit_remaining_total, invoice_total)
additional_params = {
"credit_total": float(credit_total),
Expand Down
25 changes: 19 additions & 6 deletions jobs/payment-jobs/tasks/ejv_partner_distribution_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from pay_api.models import Receipt as ReceiptModel
from pay_api.models import db
from pay_api.utils.enums import DisbursementStatus, EjvFileType, EJVLinkType, InvoiceStatus, PaymentMethod
from sqlalchemy import Date, and_, cast
from sqlalchemy import Date, and_, cast, or_

from tasks.common.cgi_ejv import CgiEjv
from tasks.common.dataclasses import Disbursement, DisbursementLineItem
Expand Down Expand Up @@ -160,6 +160,15 @@ def get_disbursement_by_distribution_for_partner(partner):
.filter(PartnerDisbursementsModel.partner_code == partner.code)
.filter(DistributionCodeModel.stop_ejv.is_(False) | DistributionCodeModel.stop_ejv.is_(None))
.filter(~InvoiceModel.receipts.any(cast(ReceiptModel.receipt_date, Date) >= disbursement_date.date()))
.filter(
or_(
and_(
PartnerDisbursementsModel.is_reversal.is_(False),
InvoiceModel.invoice_status_code == InvoiceStatus.PAID.value,
),
PartnerDisbursementsModel.is_reversal.is_(True),
)
)
.order_by(DistributionCodeModel.distribution_code_id, PaymentLineItemModel.id)
.all()
)
Expand Down Expand Up @@ -311,11 +320,15 @@ def _update_disbursement_status_and_ejv_link(
raise NotImplementedError("Unknown disbursement type")

# Possible this could already be created, eg two PLI.
if db.session.query(EjvLinkModel).filter(
EjvLinkModel.link_id == disbursement.line_item.identifier,
EjvLinkModel.link_type == disbursement.line_item.target_type,
EjvLinkModel.ejv_header_id == ejv_header_model.id,
).first():
if (
db.session.query(EjvLinkModel)
.filter(
EjvLinkModel.link_id == disbursement.line_item.identifier,
EjvLinkModel.link_type == disbursement.line_item.target_type,
EjvLinkModel.ejv_header_id == ejv_header_model.id,
)
.first()
):
return

db.session.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
def app():
"""Create a Flask app instance configured for testing."""
app = Flask(__name__)
app.config['DW_HOST'] = 'mock_host'
app.config['DW_PORT'] = 5432
app.config['DW_NAME'] = 'mock_database'
app.config['DW_USER'] = 'mock_user'
app.config['DW_PASSWORD'] = 'mock_password'
app.config["DW_HOST"] = "mock_host"
app.config["DW_PORT"] = 5432
app.config["DW_NAME"] = "mock_database"
app.config["DW_USER"] = "mock_user"
app.config["DW_PASSWORD"] = "mock_password"
return app


Expand Down

0 comments on commit 6ed2a5a

Please sign in to comment.