diff --git a/lorrystream/process/kinesis_cratedb_lambda.py b/lorrystream/process/kinesis_cratedb_lambda.py index c2a15ef..1d1787e 100644 --- a/lorrystream/process/kinesis_cratedb_lambda.py +++ b/lorrystream/process/kinesis_cratedb_lambda.py @@ -25,7 +25,7 @@ # requires-python = ">=3.9" # dependencies = [ # "commons-codec", -# "sqlalchemy-cratedb==0.38.0", +# "sqlalchemy-cratedb>=0.38.0", # ] # /// import base64 @@ -38,7 +38,7 @@ from commons_codec.exception import UnknownOperationError from commons_codec.model import ColumnTypeMapStore from commons_codec.transform.aws_dms import DMSTranslatorCrateDB -from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB +from commons_codec.transform.dynamodb import DynamoDBCDCTranslator from sqlalchemy.util import asbool LOG_LEVEL: str = os.environ.get("LOG_LEVEL", "INFO") @@ -79,7 +79,7 @@ if MESSAGE_FORMAT == "dms": cdc = DMSTranslatorCrateDB(column_types=column_types) elif MESSAGE_FORMAT == "dynamodb": - cdc = DynamoCDCTranslatorCrateDB(table_name=SINK_TABLE) + cdc = DynamoDBCDCTranslator(table_name=SINK_TABLE) # Create the database connection outside the handler to allow # connections to be re-used by subsequent function invocations. @@ -121,8 +121,8 @@ def handler(event, context): logger.debug(f"Record Data: {record_data}") # Process record. - sql = cdc.to_sql(record_data) - connection.execute(sa.text(sql)) + operation = cdc.to_sql(record_data) + connection.execute(sa.text(operation.statement), operation.parameters) connection.commit() # Bookkeeping. diff --git a/pyproject.toml b/pyproject.toml index 793ee7b..faa496a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,7 +97,7 @@ dependencies = [ "pika<1.4", "simplejson<4", "sqlalchemy==2.0.*", - "sqlalchemy-cratedb==0.38.0", + "sqlalchemy-cratedb>=0.38.0", "streamz", "tomli", "toolz", @@ -109,7 +109,7 @@ optional-dependencies.carabas = [ "aiobotocore==2.13.*", # for async-kinesis "async-kinesis<1.2", "aws-lambda-layer<0.6", - "boto3>=1.34,<1.36", # for async-kinesis + "boto3>=1.34,<1.36", # for async-kinesis "cottonformation<1.2", "localstack[base-runtime]<3.7", ]