Skip to content

Commit

Permalink
Added tests and fix around incorrect offset storage & Removed the TPP…
Browse files Browse the repository at this point in the history
… references I could from the Confluent Archive (#16)
  • Loading branch information
rufusfnash authored and pegerto committed Feb 25, 2019
1 parent c8cd0c4 commit 3dff0f2
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 23 deletions.
14 changes: 8 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<artifactId>kafka-connect-venafi-tpp</artifactId>
<version>0.9.4-SNAPSHOT</version>
<name>kafka-connect-venafi-tpp</name>
<description>Kafka connector for Venafi Trust Protection Platform Security Events</description>
<description>Kafka connector for Venafi Security Events</description>

<scm>
<url>https://github.com/opencredo/kafka-connect-venafi-tpp.git</url>
Expand Down Expand Up @@ -182,6 +182,9 @@
<exclude>*:xml-apis</exclude>
<exclude>org.apache.maven:lib:tests</exclude>
<exclude>log4j:log4j:jar:</exclude>
<exclude>io.confluent.*</exclude>
<exclude>org.apache.kafka.connect.*</exclude>
<exclude>org.apache.kafka:connect-api</exclude>
</excludes>
</artifactSet>
</configuration>
Expand All @@ -198,15 +201,14 @@
<goal>kafka-connect</goal>
</goals>
<configuration>
<title>Kafka Connect Venafi TPP</title>
<title>Kafka Connect Venafi</title>
<documentationUrl>
https://github.com/opencredo/kafka-connect-venafi-tpp/blob/master/README.md
</documentationUrl>
<description>
This connector connects via HTTP to your instance of the Venafi Trust Protection
Platform ( which shall be referred to as TPP from here on) and pulls your Log events
into Kafka, allowing you to do any filtering/transforming/processing you'd like to do
within a comfortable Kafka environment.
This connector connects via HTTP to your instance of the Venafi Platform and pulls
your Log events into Kafka, allowing you to do any filtering/transforming/processing
you'd like to do within a comfortable Kafka environment.

N.B. Currently the connector starts from the beginning of time (i.e. processes all past
events first), a future release will allow the option of starting from now (i.e.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,17 @@ public List<SourceRecord> poll() {

private List<SourceRecord> getTppLogsAsSourceRecords(String token) {

long loopOffset = 0;

List<EventLog> jsonLogs = getTppLogs(token, fromDate, apiOffset);
ArrayList<SourceRecord> records = new ArrayList<>();
for (EventLog eventLog : jsonLogs) {

String newFromDate = eventLog.getClientTimestamp().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
loopOffset = calculateLoopOffset(loopOffset, newFromDate, fromDate);
apiOffset = calculateApiOffset(apiOffset, newFromDate, fromDate);
fromDate = newFromDate;

log.debug(" The fromDate is now {}.", fromDate);
records.add(buildSourceRecord(eventLog, fromDate, apiOffset));
}
apiOffset = calculateApiOffset(loopOffset, jsonLogs);

return records;
}
Expand All @@ -114,20 +111,13 @@ private SourceRecord buildSourceRecord(EventLog eventLog, String lastRead, Long
return new SourceRecord(sourcePartition, sourceOffset, topic, EventLog.TppLogSchema(), eventLog.toStruct());
}

private long calculateLoopOffset(long currentLoopOffset, String newFromDate, String oldFromDate) {
private long calculateApiOffset(long currentLoopOffset, String newFromDate, String oldFromDate) {
if (newFromDate.equals(oldFromDate)) {
return ++currentLoopOffset;
}
return 1L;
}

private long calculateApiOffset(long currentLoopOffset, List<EventLog> jsonLogs) {
if (jsonLogs.size() == currentLoopOffset) {
return apiOffset + currentLoopOffset;
}
return currentLoopOffset;
}

private Map<String, Object> buildSourcePartition() {
return Collections.singletonMap(URL, baseUrl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public void as_a_task_I_want_to_make_only_one_logs_call_per_the_poll_interval()

List<SourceRecord> logs = when_the_task_is_polled(task);
then_the_logs_are_of_size(logs, 2);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(logs, 2, 1L);
List<SourceRecord> logs2 = when_the_task_is_polled(task);
then_the_logs_are_of_size(logs2, 0);
wireMockServer.verify(1, postRequestedFor(urlPathMatching(AUTHORIZE_API_REGEX_PATH)));
Expand Down Expand Up @@ -174,9 +175,13 @@ public void as_a_task_I_want_a_valid_context() {

List<SourceRecord> sourceRecords1 = when_the_task_is_polled(task);
then_the_logs_are_of_size(sourceRecords1, 3);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(sourceRecords1, 2, 1L);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(sourceRecords1, 1, 2L);


List<SourceRecord> sourceRecords2 = when_the_task_is_polled(task);
then_the_logs_are_of_size(sourceRecords2, 1);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(sourceRecords2, 1, 3L);
}

@Test
Expand All @@ -190,9 +195,13 @@ public void as_a_task_I_want_to_handle_an_empty_context() {

List<SourceRecord> sourceRecords1 = when_the_task_is_polled(task);
then_the_logs_are_of_size(sourceRecords1, 3);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(sourceRecords1, 2, 1L);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(sourceRecords1, 1, 2L);


List<SourceRecord> sourceRecords2 = when_the_task_is_polled(task);
then_the_logs_are_of_size(sourceRecords2, 1);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(sourceRecords2, 1, 2L);
}

@Test
Expand All @@ -205,6 +214,7 @@ public void as_a_client_I_want_some_logs() {

List<SourceRecord> logs = when_the_task_is_polled(task);
then_the_logs_are_of_size(logs, 2);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(logs, 2, 1L);
}

@Test
Expand All @@ -228,17 +238,33 @@ public void as_a_client_I_want_to_paginate_logs() {

List<SourceRecord> page_1_of_logs = when_the_task_is_polled(task);
then_the_logs_are_of_size(page_1_of_logs, 5);

then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(page_1_of_logs, 5, 1L);
//get next page
List<SourceRecord> page_2_of_logs = when_the_task_is_polled(task);
then_the_logs_are_of_size(page_2_of_logs, 5);
then_the_number_of_logs_with_timestamp_is(5, page_2_of_logs, getTodayPlus(5));
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(page_2_of_logs, 1, 2L);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(page_2_of_logs, 1, 3L);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(page_2_of_logs, 1, 4L);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(page_2_of_logs, 1, 5L);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(page_2_of_logs, 1, 6L);
//get next Page
List<SourceRecord> page_3_of_logs = when_the_task_is_polled(task);
then_the_logs_are_of_size(page_3_of_logs, 4);
then_the_number_of_logs_with_timestamp_is(2, page_3_of_logs, getTodayPlus(6));
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(page_3_of_logs, 1, 7L);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(page_3_of_logs, 2, 1L);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(page_3_of_logs, 1, 2L);

List<SourceRecord> page_4_of_logs = when_the_task_is_polled(task);
then_the_logs_are_of_size(page_4_of_logs, 2);
then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(page_4_of_logs, 2, 1L);

}

private void then_the_logs_will_have_an_expected_number_of_records_with_a_specific_apioffset(List<SourceRecord> logs, int expectedNumberOfRecords, long specificApiOffset) {
assertEquals(expectedNumberOfRecords, logs.stream().filter(sourceRecord -> specificApiOffset == (Long) sourceRecord.sourceOffset().get(LAST_API_OFFSET)).count());
}

@Test
Expand Down Expand Up @@ -283,13 +309,13 @@ private void then_the_number_of_logs_with_timestamp_is(int count, List<SourceRec
assertEquals(count, getCountOfLogsWithLastRead(todayPlus, page_of_logs));
}

private void then_the_logs_are_of_size(List<SourceRecord> page_3_of_logs, int i) {
assertNotNull(page_3_of_logs);
assertEquals(i, page_3_of_logs.size());
private void then_the_logs_are_of_size(List<SourceRecord> logs, int i) {
assertNotNull(logs);
assertEquals(i, logs.size());
}

private long getCountOfLogsWithLastRead(ZonedDateTime date, List<SourceRecord> page_2_of_logs) {
return page_2_of_logs.stream().filter(sourceRecord -> date.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME).equals(sourceRecord.sourceOffset().get(LAST_READ))).count();
private long getCountOfLogsWithLastRead(ZonedDateTime date, List<SourceRecord> logs) {
return logs.stream().filter(sourceRecord -> date.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME).equals(sourceRecord.sourceOffset().get(LAST_READ))).count();
}

private String when_a_token_is_got(TppLogSourceTask task) {
Expand Down

0 comments on commit 3dff0f2

Please sign in to comment.