Skip to content

Commit

Permalink
make streamName optional for KinesisInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeloffner committed Aug 8, 2024
1 parent 11e1d32 commit d3a9874
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 42 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.lucee</groupId>
<artifactId>kinesis-extension</artifactId>
<version>1.0.1.0-SNAPSHOT</version>
<version>1.0.1.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Kinesis Extension</name>

Expand Down
5 changes: 3 additions & 2 deletions source/fld/function.fld
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@
<argument>
<name>streamName</name>
<type>string</type>
<required>yes</required>
<description>The name of the AWS Kinesis stream from which records are to be fetched. This parameter is required to identify the target stream.</description>
<required>no</required>
<description>The name of the AWS Kinesis stream from which records are to be fetched.
If not defined the function will return info for all streams.</description>
</argument>
<argument>
<name>accessKeyId</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import software.amazon.awssdk.services.kinesis.model.DescribeLimitsResponse;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
import software.amazon.awssdk.services.kinesis.model.Shard;

public class KinesisInfo extends KinesisFunction {
Expand All @@ -39,61 +41,83 @@ public static Struct call(PageContext pc, String streamName, String accessKeyId,

try {
Log log = pc.getConfig().getLog("application");

KinesisClient client = AmazonKinesisClient.get(CommonUtil.toKinesisProps(pc, accessKeyId, secretAccessKey, host, location), toTimeout(timeout), log);

Struct result = eng.getCreationUtil().createStruct();

// Describe the stream and print shard IDs
String exclusiveStartShardId = null;
Query qShards = eng.getCreationUtil().createQuery(new String[] { "shardId", "parentShardId", "adjacentParentShardId" }, 0, "shards");
result.put("shards", qShards);
int row;
do {
// describeStream
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder().streamName(streamName).exclusiveStartShardId(exclusiveStartShardId).build();
DescribeStreamResponse describeStreamResponse = client.describeStream(describeStreamRequest);
List<Shard> listShards = describeStreamResponse.streamDescription().shards();

for (Shard s: listShards) {
row = qShards.addRow();
qShards.setAtEL("shardId", row, s.shardId());
qShards.setAtEL("parentShardId", row, s.parentShardId());
qShards.setAtEL("adjacentParentShardId", row, s.adjacentParentShardId());
}

if (describeStreamResponse.streamDescription().hasMoreShards()) {
exclusiveStartShardId = describeStreamResponse.streamDescription().shards().get(describeStreamResponse.streamDescription().shards().size() - 1).shardId();
}
else {
exclusiveStartShardId = null;
}
if (!Util.isEmpty(streamName, true)) {
return getStream(eng, client, streamName);
}
while (exclusiveStartShardId != null);
else {
return getStreams(eng, client);

// describeLimits
try {
DescribeLimitsResponse limitsResponse = client.describeLimits();
result.put("shardLimit", limitsResponse.shardLimit());
}
catch (AccessDeniedException ade) {
result.put("shardLimit", ade.getMessage());
// if (throwOnAccessDenied) throw ade;
}

return result;
}
catch (Exception e) {
throw CommonUtil.toPageException(e);
}
}

private static Struct getStreams(CFMLEngine eng, KinesisClient client) throws PageException {
Struct result = eng.getCreationUtil().createStruct();

ListStreamsRequest listStreamsRequest = ListStreamsRequest.builder().build();
ListStreamsResponse listStreamsResponse = client.listStreams(listStreamsRequest);

List<String> streamNames = listStreamsResponse.streamNames();

for (String sn: streamNames) {
result.setEL(sn, getStream(eng, client, sn));
}
return result;
}

private static Struct getStream(CFMLEngine eng, KinesisClient client, String streamName) throws PageException {
Struct result = eng.getCreationUtil().createStruct();

// Describe the stream and print shard IDs
String exclusiveStartShardId = null;
Query qShards = eng.getCreationUtil().createQuery(new String[] { "shardId", "parentShardId", "adjacentParentShardId" }, 0, "shards");
result.put("shards", qShards);
int row;
do {
// describeStream
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder().streamName(streamName).exclusiveStartShardId(exclusiveStartShardId).build();
DescribeStreamResponse describeStreamResponse = client.describeStream(describeStreamRequest);
List<Shard> listShards = describeStreamResponse.streamDescription().shards();

for (Shard s: listShards) {
row = qShards.addRow();
qShards.setAtEL("shardId", row, s.shardId());
qShards.setAtEL("parentShardId", row, s.parentShardId());
qShards.setAtEL("adjacentParentShardId", row, s.adjacentParentShardId());
}

if (describeStreamResponse.streamDescription().hasMoreShards()) {
exclusiveStartShardId = describeStreamResponse.streamDescription().shards().get(describeStreamResponse.streamDescription().shards().size() - 1).shardId();
}
else {
exclusiveStartShardId = null;
}
}
while (exclusiveStartShardId != null);

// describeLimits
try {
DescribeLimitsResponse limitsResponse = client.describeLimits();
result.put("shardLimit", limitsResponse.shardLimit());
}
catch (AccessDeniedException ade) {
result.put("shardLimit", ade.getMessage());
// if (throwOnAccessDenied) throw ade;
}
return result;
}

@Override
public Object invoke(PageContext pc, Object[] args) throws PageException {
CFMLEngine engine = CFMLEngineFactory.getInstance();
Cast cast = engine.getCastUtil();

if (args.length < 1 || args.length > 6) throw engine.getExceptionUtil().createFunctionException(pc, "KinesisInfo", 1, 6, args.length);
if (args.length > 6) throw engine.getExceptionUtil().createFunctionException(pc, "KinesisInfo", 0, 6, args.length);

// streamName
String streamName = cast.toString(args[0]);
Expand Down

0 comments on commit d3a9874

Please sign in to comment.