Skip to content

Commit

Permalink
only set iteratorType "AT_SEQUENCE_NUMBER" when none was set
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeloffner committed Aug 8, 2024
1 parent e32e26c commit 643d7c3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.lucee</groupId>
<artifactId>kinesis-extension</artifactId>
<version>1.0.1.1-SNAPSHOT</version>
<version>1.0.1.2-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Kinesis Extension</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

public class KinesisGet extends KinesisFunction {

Expand All @@ -58,8 +59,8 @@ public class KinesisGet extends KinesisFunction {
_iteration = creator.createKey("iteration");
}

public static Query call(PageContext pc, String streamName, String shardId, String sequenceNumber, DateTime timestamp, String iteratorType, double maxrows, String accessKeyId,
String secretAccessKey, String host, String location, double timeout) throws PageException {
public static Query call(PageContext pc, String streamName, String shardId, String sequenceNumber, DateTime timestamp, String strIteratorType, double maxrows,
String accessKeyId, String secretAccessKey, String host, String location, double timeout) throws PageException {

CFMLEngine eng = CFMLEngineFactory.getInstance();
Cast caster = eng.getCastUtil();
Expand All @@ -79,46 +80,61 @@ public static Query call(PageContext pc, String streamName, String shardId, Stri
boolean sequenceNumberRequired = false;
boolean timestampRequired = false;

if (!Util.isEmpty(iteratorType, true)) {
iteratorType = iteratorType.trim().toUpperCase();
if ("TRIM_HORIZON".equalsIgnoreCase(iteratorType)) iteratorType = "TRIM_HORIZON";
else if ("LATEST".equalsIgnoreCase(iteratorType)) iteratorType = "LATEST";
else if ("AT_SEQUENCE_NUMBER".equalsIgnoreCase(iteratorType) || "AFTER_SEQUENCE_NUMBER".equalsIgnoreCase(iteratorType)) {
ShardIteratorType iteratorType = null;
if (!Util.isEmpty(strIteratorType, true)) {
strIteratorType = strIteratorType.trim().toUpperCase();
if ("TRIM_HORIZON".equalsIgnoreCase(strIteratorType)) {
iteratorType = ShardIteratorType.TRIM_HORIZON;
}
else if ("LATEST".equalsIgnoreCase(strIteratorType)) {
iteratorType = ShardIteratorType.LATEST;
}
else if ("AT_SEQUENCE_NUMBER".equalsIgnoreCase(strIteratorType)) {
iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER;
sequenceNumberRequired = true;
}
else if ("AFTER_SEQUENCE_NUMBER".equalsIgnoreCase(strIteratorType)) {
iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER;
sequenceNumberRequired = true;
}
else if ("AT_TIMESTAMP".equalsIgnoreCase(iteratorType)) {
else if ("AT_TIMESTAMP".equalsIgnoreCase(strIteratorType)) {
iteratorType = ShardIteratorType.AT_TIMESTAMP;
timestampRequired = true;
}
else throw eng.getExceptionUtil().createFunctionException(pc, "KinesisGet", 3, "iteratorType",
"invalid iteratorType [" + iteratorType + "], valid types are [TRIM_HORIZON,LATEST,AT_SEQUENCE_NUMBER,AFTER_SEQUENCE_NUMBER,AT_TIMESTAMP]", null);
else {
throw eng.getExceptionUtil().createFunctionException(pc, "KinesisGet", 3, "iteratorType",
"invalid iteratorType [" + strIteratorType + "], valid types are [TRIM_HORIZON,LATEST,AT_SEQUENCE_NUMBER,AFTER_SEQUENCE_NUMBER,AT_TIMESTAMP]", null);
}
}
else {
iteratorType = "TRIM_HORIZON";
iteratorType = ShardIteratorType.AT_TIMESTAMP;
}

// validate sequenceNumber
if (eng.getStringUtil().isEmpty(sequenceNumber, true)) {
sequenceNumber = null;
if (sequenceNumberRequired) throw eng.getExceptionUtil().createFunctionException(pc, "KinesisGet", 4, "sequenceNumber",
"when iteratorType is to [" + iteratorType + "], then the [sequenceNumber] is required", null);
"when iteratorType is to [" + strIteratorType + "], then the [sequenceNumber] is required", null);
}
else {
sequenceNumber = sequenceNumber.trim();
if (!sequenceNumberRequired) throw eng.getExceptionUtil().createFunctionException(pc, "KinesisGet", 4, "sequenceNumber",
"when iteratorType is [" + iteratorType + "], then the [sequenceNumber] cannot be set", null);
"when iteratorType is [" + strIteratorType + "], then the [sequenceNumber] cannot be set", null);
}

// validate timestamp
if (timestamp == null) {
if (timestampRequired) throw eng.getExceptionUtil().createFunctionException(pc, "KinesisGet", 5, "timestamp",
"when iteratorType is to [" + iteratorType + "], then the [timestamp] is required", null);
"when iteratorType is to [" + strIteratorType + "], then the [timestamp] is required", null);
}
else {
if (!timestampRequired) throw eng.getExceptionUtil().createFunctionException(pc, "KinesisGet", 5, "timestamp",
"when iteratorType is [" + iteratorType + "], then the [timestamp] cannot be set", null);
"when iteratorType is [" + strIteratorType + "], then the [timestamp] cannot be set", null);
}

if (sequenceNumber != null) iteratorType = "AT_SEQUENCE_NUMBER";
if (sequenceNumber != null && iteratorType == null) {
iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER;
}

// validate shardId
String shardIterator;
Expand Down

0 comments on commit 643d7c3

Please sign in to comment.