diff --git a/pom.xml b/pom.xml
index 1f47138..c720dc4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
4.0.0
org.lucee
kinesis-extension
- 1.0.1.0-SNAPSHOT
+ 1.0.1.1-SNAPSHOT
pom
Kinesis Extension
diff --git a/source/fld/function.fld b/source/fld/function.fld
index fcc8846..1e9663e 100755
--- a/source/fld/function.fld
+++ b/source/fld/function.fld
@@ -198,8 +198,9 @@
streamName
string
- yes
- The name of the AWS Kinesis stream from which records are to be fetched. This parameter is required to identify the target stream.
+ no
+ The name of the AWS Kinesis stream from which records are to be fetched.
+ If not defined the function will return info for all streams.
accessKeyId
diff --git a/source/java/src/org/lucee/extension/aws/kinesis/function/KinesisInfo.java b/source/java/src/org/lucee/extension/aws/kinesis/function/KinesisInfo.java
index f4b3451..1585979 100644
--- a/source/java/src/org/lucee/extension/aws/kinesis/function/KinesisInfo.java
+++ b/source/java/src/org/lucee/extension/aws/kinesis/function/KinesisInfo.java
@@ -19,6 +19,8 @@
import software.amazon.awssdk.services.kinesis.model.DescribeLimitsResponse;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
import software.amazon.awssdk.services.kinesis.model.Shard;
public class KinesisInfo extends KinesisFunction {
@@ -39,61 +41,83 @@ public static Struct call(PageContext pc, String streamName, String accessKeyId,
try {
Log log = pc.getConfig().getLog("application");
-
KinesisClient client = AmazonKinesisClient.get(CommonUtil.toKinesisProps(pc, accessKeyId, secretAccessKey, host, location), toTimeout(timeout), log);
-
- Struct result = eng.getCreationUtil().createStruct();
-
- // Describe the stream and print shard IDs
- String exclusiveStartShardId = null;
- Query qShards = eng.getCreationUtil().createQuery(new String[] { "shardId", "parentShardId", "adjacentParentShardId" }, 0, "shards");
- result.put("shards", qShards);
- int row;
- do {
- // describeStream
- DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder().streamName(streamName).exclusiveStartShardId(exclusiveStartShardId).build();
- DescribeStreamResponse describeStreamResponse = client.describeStream(describeStreamRequest);
- List listShards = describeStreamResponse.streamDescription().shards();
-
- for (Shard s: listShards) {
- row = qShards.addRow();
- qShards.setAtEL("shardId", row, s.shardId());
- qShards.setAtEL("parentShardId", row, s.parentShardId());
- qShards.setAtEL("adjacentParentShardId", row, s.adjacentParentShardId());
- }
-
- if (describeStreamResponse.streamDescription().hasMoreShards()) {
- exclusiveStartShardId = describeStreamResponse.streamDescription().shards().get(describeStreamResponse.streamDescription().shards().size() - 1).shardId();
- }
- else {
- exclusiveStartShardId = null;
- }
+ if (!Util.isEmpty(streamName, true)) {
+ return getStream(eng, client, streamName);
}
- while (exclusiveStartShardId != null);
+ else {
+ return getStreams(eng, client);
- // describeLimits
- try {
- DescribeLimitsResponse limitsResponse = client.describeLimits();
- result.put("shardLimit", limitsResponse.shardLimit());
- }
- catch (AccessDeniedException ade) {
- result.put("shardLimit", ade.getMessage());
- // if (throwOnAccessDenied) throw ade;
}
- return result;
}
catch (Exception e) {
throw CommonUtil.toPageException(e);
}
}
+ private static Struct getStreams(CFMLEngine eng, KinesisClient client) throws PageException {
+ Struct result = eng.getCreationUtil().createStruct();
+
+ ListStreamsRequest listStreamsRequest = ListStreamsRequest.builder().build();
+ ListStreamsResponse listStreamsResponse = client.listStreams(listStreamsRequest);
+
+ List streamNames = listStreamsResponse.streamNames();
+
+ for (String sn: streamNames) {
+ result.setEL(sn, getStream(eng, client, sn));
+ }
+ return result;
+ }
+
+ private static Struct getStream(CFMLEngine eng, KinesisClient client, String streamName) throws PageException {
+ Struct result = eng.getCreationUtil().createStruct();
+
+ // Describe the stream and print shard IDs
+ String exclusiveStartShardId = null;
+ Query qShards = eng.getCreationUtil().createQuery(new String[] { "shardId", "parentShardId", "adjacentParentShardId" }, 0, "shards");
+ result.put("shards", qShards);
+ int row;
+ do {
+ // describeStream
+ DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder().streamName(streamName).exclusiveStartShardId(exclusiveStartShardId).build();
+ DescribeStreamResponse describeStreamResponse = client.describeStream(describeStreamRequest);
+ List listShards = describeStreamResponse.streamDescription().shards();
+
+ for (Shard s: listShards) {
+ row = qShards.addRow();
+ qShards.setAtEL("shardId", row, s.shardId());
+ qShards.setAtEL("parentShardId", row, s.parentShardId());
+ qShards.setAtEL("adjacentParentShardId", row, s.adjacentParentShardId());
+ }
+
+ if (describeStreamResponse.streamDescription().hasMoreShards()) {
+ exclusiveStartShardId = describeStreamResponse.streamDescription().shards().get(describeStreamResponse.streamDescription().shards().size() - 1).shardId();
+ }
+ else {
+ exclusiveStartShardId = null;
+ }
+ }
+ while (exclusiveStartShardId != null);
+
+ // describeLimits
+ try {
+ DescribeLimitsResponse limitsResponse = client.describeLimits();
+ result.put("shardLimit", limitsResponse.shardLimit());
+ }
+ catch (AccessDeniedException ade) {
+ result.put("shardLimit", ade.getMessage());
+ // if (throwOnAccessDenied) throw ade;
+ }
+ return result;
+ }
+
@Override
public Object invoke(PageContext pc, Object[] args) throws PageException {
CFMLEngine engine = CFMLEngineFactory.getInstance();
Cast cast = engine.getCastUtil();
- if (args.length < 1 || args.length > 6) throw engine.getExceptionUtil().createFunctionException(pc, "KinesisInfo", 1, 6, args.length);
+ if (args.length > 6) throw engine.getExceptionUtil().createFunctionException(pc, "KinesisInfo", 0, 6, args.length);
// streamName
String streamName = cast.toString(args[0]);