diff --git a/pom.xml b/pom.xml index c720dc4..4a42f92 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.lucee kinesis-extension - 1.0.1.1-SNAPSHOT + 1.0.1.2-SNAPSHOT pom Kinesis Extension diff --git a/source/java/src/org/lucee/extension/aws/kinesis/function/KinesisGet.java b/source/java/src/org/lucee/extension/aws/kinesis/function/KinesisGet.java index e2f4593..5780fa2 100644 --- a/source/java/src/org/lucee/extension/aws/kinesis/function/KinesisGet.java +++ b/source/java/src/org/lucee/extension/aws/kinesis/function/KinesisGet.java @@ -32,6 +32,7 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class KinesisGet extends KinesisFunction { @@ -58,8 +59,8 @@ public class KinesisGet extends KinesisFunction { _iteration = creator.createKey("iteration"); } - public static Query call(PageContext pc, String streamName, String shardId, String sequenceNumber, DateTime timestamp, String iteratorType, double maxrows, String accessKeyId, - String secretAccessKey, String host, String location, double timeout) throws PageException { + public static Query call(PageContext pc, String streamName, String shardId, String sequenceNumber, DateTime timestamp, String strIteratorType, double maxrows, + String accessKeyId, String secretAccessKey, String host, String location, double timeout) throws PageException { CFMLEngine eng = CFMLEngineFactory.getInstance(); Cast caster = eng.getCastUtil(); @@ -79,46 +80,61 @@ public static Query call(PageContext pc, String streamName, String shardId, Stri boolean sequenceNumberRequired = false; boolean timestampRequired = false; - if (!Util.isEmpty(iteratorType, true)) { - iteratorType = iteratorType.trim().toUpperCase(); - if ("TRIM_HORIZON".equalsIgnoreCase(iteratorType)) iteratorType = "TRIM_HORIZON"; - else if ("LATEST".equalsIgnoreCase(iteratorType)) iteratorType = "LATEST"; - else if ("AT_SEQUENCE_NUMBER".equalsIgnoreCase(iteratorType) || "AFTER_SEQUENCE_NUMBER".equalsIgnoreCase(iteratorType)) { + ShardIteratorType iteratorType = null; + if (!Util.isEmpty(strIteratorType, true)) { + strIteratorType = strIteratorType.trim().toUpperCase(); + if ("TRIM_HORIZON".equalsIgnoreCase(strIteratorType)) { + iteratorType = ShardIteratorType.TRIM_HORIZON; + } + else if ("LATEST".equalsIgnoreCase(strIteratorType)) { + iteratorType = ShardIteratorType.LATEST; + } + else if ("AT_SEQUENCE_NUMBER".equalsIgnoreCase(strIteratorType)) { + iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER; + sequenceNumberRequired = true; + } + else if ("AFTER_SEQUENCE_NUMBER".equalsIgnoreCase(strIteratorType)) { + iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER; sequenceNumberRequired = true; } - else if ("AT_TIMESTAMP".equalsIgnoreCase(iteratorType)) { + else if ("AT_TIMESTAMP".equalsIgnoreCase(strIteratorType)) { + iteratorType = ShardIteratorType.AT_TIMESTAMP; timestampRequired = true; } - else throw eng.getExceptionUtil().createFunctionException(pc, "KinesisGet", 3, "iteratorType", - "invalid iteratorType [" + iteratorType + "], valid types are [TRIM_HORIZON,LATEST,AT_SEQUENCE_NUMBER,AFTER_SEQUENCE_NUMBER,AT_TIMESTAMP]", null); + else { + throw eng.getExceptionUtil().createFunctionException(pc, "KinesisGet", 3, "iteratorType", + "invalid iteratorType [" + strIteratorType + "], valid types are [TRIM_HORIZON,LATEST,AT_SEQUENCE_NUMBER,AFTER_SEQUENCE_NUMBER,AT_TIMESTAMP]", null); + } } else { - iteratorType = "TRIM_HORIZON"; + iteratorType = ShardIteratorType.AT_TIMESTAMP; } // validate sequenceNumber if (eng.getStringUtil().isEmpty(sequenceNumber, true)) { sequenceNumber = null; if (sequenceNumberRequired) throw eng.getExceptionUtil().createFunctionException(pc, "KinesisGet", 4, "sequenceNumber", - "when iteratorType is to [" + iteratorType + "], then the [sequenceNumber] is required", null); + "when iteratorType is to [" + strIteratorType + "], then the [sequenceNumber] is required", null); } else { sequenceNumber = sequenceNumber.trim(); if (!sequenceNumberRequired) throw eng.getExceptionUtil().createFunctionException(pc, "KinesisGet", 4, "sequenceNumber", - "when iteratorType is [" + iteratorType + "], then the [sequenceNumber] cannot be set", null); + "when iteratorType is [" + strIteratorType + "], then the [sequenceNumber] cannot be set", null); } // validate timestamp if (timestamp == null) { if (timestampRequired) throw eng.getExceptionUtil().createFunctionException(pc, "KinesisGet", 5, "timestamp", - "when iteratorType is to [" + iteratorType + "], then the [timestamp] is required", null); + "when iteratorType is to [" + strIteratorType + "], then the [timestamp] is required", null); } else { if (!timestampRequired) throw eng.getExceptionUtil().createFunctionException(pc, "KinesisGet", 5, "timestamp", - "when iteratorType is [" + iteratorType + "], then the [timestamp] cannot be set", null); + "when iteratorType is [" + strIteratorType + "], then the [timestamp] cannot be set", null); } - if (sequenceNumber != null) iteratorType = "AT_SEQUENCE_NUMBER"; + if (sequenceNumber != null && iteratorType == null) { + iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER; + } // validate shardId String shardIterator;