diff --git a/pom.xml b/pom.xml
index b636b75..8df1c99 100644
--- a/pom.xml
+++ b/pom.xml
@@ -271,6 +271,48 @@
org.eclipse.paho.client.mqttv3
1.2.1
+q
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ ${org.junit.jupiter.version}
+ test
+
+
+ io.rest-assured
+ rest-assured
+ 4.3.1
+ test
+
+
+ io.moquette
+ moquette-broker
+ 0.13
+ test
+
+
+ org.slf4j
+ slf4j-api
+ 2.0.0-alpha1
+
+
+ org.slf4j
+ slf4j-log4j12
+ 2.0.0-alpha1
+ test
+
+
+ org.eclipse.jetty.http2
+ http2-client
+ 9.4.35.v20201120
+ test
+
+
+ org.eclipse.jetty.http2
+ http2-http-client-transport
+ 9.4.35.v20201120
+ test
+
@@ -327,4 +369,11 @@
+
+
+ bintray
+ http://dl.bintray.com/andsel/maven/
+
+
+
diff --git a/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpDataTest.java b/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpDataTest.java
new file mode 100644
index 0000000..388b4ee
--- /dev/null
+++ b/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpDataTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2021 Patriot project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.patriot_framework.generator.device.consumer.http;
+
+import io.patriot_framework.generator.device.consumer.exceptions.ConsumerException;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class HttpDataTest extends HttpTestBase {
+
+ @Test
+ void checkHttpData() throws ConsumerException, IOException {
+ super.runServer();
+
+ createHttpClientWithPayload("payload", port, "text/plain");
+ httpClient.execute(httpPost);
+
+ HttpData data = (HttpData) storage.get();
+ HttpMeta meta = data.getMeta();
+
+ assertEquals(server.getUUID(), meta.getUUID());
+
+ LocalDateTime timestamp = meta.getTimestamp();
+ assertTrue(timestamp.compareTo(LocalDateTime.now().minusSeconds(3)) >= 0);
+
+ assertEquals("POST", meta.getRequestMethod());
+ assertEquals("/endpoint", meta.getEndpoint());
+ assertEquals("HTTP/1.1", meta.getProtocolVersion());
+
+ Map headers = meta.getHeaders();
+ assertEquals("text/plain", headers.get("Content-type"));
+
+ Map> queryParams = meta.getQueryParams();
+ assertEquals("value1", queryParams.get("key1").get(0));
+ assertEquals("value2", queryParams.get("key1").get(1));
+ assertEquals("value3", queryParams.get("key2").get(0));
+
+ assertArrayEquals("payload".getBytes(), data.getPayload());
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpIncompleteRequestTest.java b/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpIncompleteRequestTest.java
new file mode 100644
index 0000000..89533e2
--- /dev/null
+++ b/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpIncompleteRequestTest.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2021 Patriot project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.patriot_framework.generator.device.consumer.http;
+
+import io.patriot_framework.generator.device.consumer.exceptions.ConsumerException;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.io.*;
+import java.net.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class HttpIncompleteRequestTest extends HttpTestBase {
+
+ public void sendIncompleteRequest() {
+ try {
+ String hostname = "localhost";
+ InetAddress addr = InetAddress.getByName(hostname);
+ Socket socket = new Socket(addr, 8080);
+
+ String payload = "Payload";
+ String httpMessage = "POST / HTTP/1.0\r\nAccept: */*\r\n"
+ + "Host: " + hostname + ":" + port + "\r\n"
+ + "Content-Type: text/plain\r\n"
+ + "Content-Length: " + payload.length() + "\r\n"
+ + "\r\n\r\n" +payload + "\r\n\r\n";
+
+ if(socket.isConnected()) {
+ System.out.println("Socket is connected to: " + socket.getInetAddress() + " on port: " + socket.getPort());
+
+ OutputStream os = socket.getOutputStream();
+ os.write(httpMessage.getBytes("ASCII"));
+ os.flush();
+
+ os.close();
+ }
+ socket.close();
+ System.out.println("Socket closed");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void timeoutRequest() throws IOException {
+ String hostname = "localhost";
+ URL url = new URL("http://" + hostname +":8080");
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+
+ String payload = "Payload";
+
+ connection.setRequestMethod("POST");
+ connection.setReadTimeout(3000);
+ connection.setDoOutput(true);
+
+ try (DataOutputStream writer = new DataOutputStream(connection.getOutputStream())) {
+ for (byte ch: payload.getBytes("ASCII")) {
+ writer.write(ch);
+ Thread.sleep(1000);
+ }
+ writer.flush();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ connection.disconnect();
+ }
+ }
+
+ @Test
+ @Disabled
+ public void incompleteRequest() throws ConsumerException, IOException {
+ runServer();
+ sendIncompleteRequest();
+
+ HttpData data = (HttpData) storage.get();
+ HttpMeta meta = data.getMeta();
+
+ assertArrayEquals("Payload".getBytes(), data.getPayload());
+ }
+
+ @Test
+ public void requestTimeoutTest() throws ConsumerException, IOException {
+ runServer();
+ timeoutRequest();
+
+ assertNull(storage.get());
+ }
+}
diff --git a/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpJsonTest.java b/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpJsonTest.java
new file mode 100644
index 0000000..69e9d92
--- /dev/null
+++ b/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpJsonTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2021 Patriot project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.patriot_framework.generator.device.consumer.http;
+
+import io.patriot_framework.generator.device.consumer.exceptions.ConsumerException;
+import io.restassured.http.ContentType;
+import org.json.JSONObject;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static io.restassured.RestAssured.given;
+
+public class HttpJsonTest extends HttpTestBase {
+
+ HttpData data;
+ JSONObject request;
+
+ @Test
+ void jsonPost() throws ConsumerException, IOException {
+ super.runServer();
+
+ request = new JSONObject();
+
+ request.put("ID", 1);
+ request.put("name", "Johny");
+
+ given().
+ contentType(ContentType.JSON).
+ header("Content-Type", "application/json").
+ body(request.toString()).
+ when().
+ post("http://localhost:8080").
+ then().
+ statusCode(200).
+ statusLine("HTTP/1.1 200 OK");
+
+ data = (HttpData) storage.get();
+ }
+}
diff --git a/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpTestBase.java b/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpTestBase.java
new file mode 100644
index 0000000..4e27e32
--- /dev/null
+++ b/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpTestBase.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2021 Patriot project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.patriot_framework.generator.device.consumer.http;
+
+import io.patriot_framework.generator.device.consumer.Storage;
+import io.patriot_framework.generator.device.consumer.exceptions.ConsumerException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClients;
+import org.junit.jupiter.api.AfterEach;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+public abstract class HttpTestBase {
+
+ Server server;
+ static int port = 8080;
+ Storage storage = new Storage();
+ HttpClient httpClient;
+ HttpPost httpPost;
+
+ void createHttpClientWithPayload(String payload, int port, String contentType) throws UnsupportedEncodingException {
+ httpClient = HttpClients.createDefault();
+
+ httpPost = new HttpPost("http://localhost:" + port + "/endpoint?key1=value1&key1=value2&key2=value3");
+ httpPost.setHeader("Content-type", contentType);
+ httpPost.setEntity(new StringEntity(payload));
+ }
+
+ // TODO ? @mijaros -> @BeforeAll
+ void runServer() throws ConsumerException, IOException {
+ storage = new Storage();
+ server = new Server("localhost", port, storage);
+ server.start();
+ }
+
+ void runServer(SSLInit sslInit) throws ConsumerException, IOException {
+ storage = new Storage();
+ server = new Server("localhost", port, storage, sslInit);
+ server.start();
+ }
+
+ // TODO ? @mijaros -> @AfterAll
+ @AfterEach
+ void closeServer() {
+ server.stop();
+ }
+}
diff --git a/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpsTest.java b/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpsTest.java
new file mode 100644
index 0000000..fdb81cd
--- /dev/null
+++ b/src/test/java/io/patriot_framework/generator/device/consumer/http/HttpsTest.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2021 Patriot project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.patriot_framework.generator.device.consumer.http;
+
+import io.patriot_framework.generator.device.consumer.exceptions.ConsumerException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static io.restassured.RestAssured.given;
+
+public class HttpsTest extends HttpTestBase {
+
+ @BeforeEach
+ void runServer() throws ConsumerException, IOException {
+ String passphrase = "Patriot";
+ SSLInit sslInit = new BasicSSLInit("src/test/resources/sslcerts/server_keystore.jks", passphrase,
+ "src/test/resources/sslcerts/server_truststore.jks", passphrase);
+ super.runServer(sslInit);
+ }
+
+ @Test
+ void statusCode200(){
+ given().relaxedHTTPSValidation("TLS").when().post("https://localhost:" + port).then().statusCode(200);
+ }
+}
diff --git a/src/test/java/io/patriot_framework/generator/device/consumer/http/ServerExceptionTest.java b/src/test/java/io/patriot_framework/generator/device/consumer/http/ServerExceptionTest.java
new file mode 100644
index 0000000..6421d0f
--- /dev/null
+++ b/src/test/java/io/patriot_framework/generator/device/consumer/http/ServerExceptionTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2021 Patriot project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.patriot_framework.generator.device.consumer.http;
+
+import io.patriot_framework.generator.device.consumer.Storage;
+import io.patriot_framework.generator.device.consumer.exceptions.ConsumerException;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.BindException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class ServerExceptionTest extends HttpTestBase {
+
+ @Test
+ void throwsConsumerException() {
+ server = new Server("hostlocal", port, storage);
+
+ assertThrows(ConsumerException.class, server::start);
+ }
+
+ @Test
+ void invalidHostname() {
+ server = new Server("hostlocal", port, storage);
+
+ ConsumerException exception = assertThrows(ConsumerException.class, server::start);
+ assertEquals("Invalid hostname", exception.getMessage());
+ }
+
+ @Test
+ void alreadyRunning() throws ConsumerException, IOException {
+ runServer();
+
+ ConsumerException exception = assertThrows(ConsumerException.class, server::start);
+ assertEquals("Server is already running", exception.getMessage());
+ }
+
+ @Test
+ void occupiedPort() throws ConsumerException {
+ server = new Server("localhost", 8080, new Storage());
+ server.start();
+
+ Server server2 = new Server("localhost", 8080, new Storage());
+
+ BindException exception = assertThrows(BindException.class, server2::start);
+ assertEquals("Address already in use", exception.getMessage());
+ }
+}
diff --git a/src/test/java/io/patriot_framework/generator/device/consumer/http/ServerTest.java b/src/test/java/io/patriot_framework/generator/device/consumer/http/ServerTest.java
new file mode 100644
index 0000000..a5494aa
--- /dev/null
+++ b/src/test/java/io/patriot_framework/generator/device/consumer/http/ServerTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2021 Patriot project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.patriot_framework.generator.device.consumer.http;
+
+import io.patriot_framework.generator.device.consumer.ConsumerData;
+import io.patriot_framework.generator.device.consumer.exceptions.ConsumerException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static io.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class ServerTest extends HttpTestBase {
+
+ @BeforeEach
+ void runServer() throws ConsumerException, IOException {
+ super.runServer();
+ }
+
+ @Test
+ void statusCode200(){
+ given().when().get("http://localhost:8080").then().statusCode(200);
+ }
+
+ @Test
+ public void responseTime() {
+ given().when().get("http://localhost:8080").then().time(lessThan(1000L));
+ }
+
+ @Test
+ void FIFO() throws IOException {
+ for (int i = 1; i <= 3; i++) {
+ createHttpClientWithPayload("payload-" + i, port, "text/plain");
+ httpClient.execute(httpPost);
+ }
+
+ assertArrayEquals("payload-1".getBytes(), storage.get().getPayload());
+ }
+
+ @Test
+ void storageContents() throws IOException {
+ for (int i = 1; i <= 5; i++) {
+ createHttpClientWithPayload("payload-" + i, port, "text/plain");
+ httpClient.execute(httpPost);
+ }
+
+ List expected = Stream.of("1", "2", "3", "4", "5")
+ .map(s -> s.getBytes()[0])
+ .collect(Collectors.toList());
+
+ List result = storage.get(5).stream()
+ .map(ConsumerData::getPayload)
+ .map(payload -> payload[payload.length - 1])
+ .collect(Collectors.toList());
+
+ assertEquals(expected, result);
+ }
+}
diff --git a/src/test/java/io/patriot_framework/generator/device/consumer/http/StorageTest.java b/src/test/java/io/patriot_framework/generator/device/consumer/http/StorageTest.java
new file mode 100644
index 0000000..464f6d2
--- /dev/null
+++ b/src/test/java/io/patriot_framework/generator/device/consumer/http/StorageTest.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2021 Patriot project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.patriot_framework.generator.device.consumer.http;
+
+import io.patriot_framework.generator.device.consumer.ConsumerData;
+import io.patriot_framework.generator.device.consumer.Storage;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class StorageTest {
+ Storage storage;
+ ConsumerData invalidConsumerData = new HttpData(new HttpMetaImpl(UUID.randomUUID(), "host", 0,
+ "requestMethod", "endpoint", "protocolVersion",
+ new HashMap<>(), new HashMap<>()),
+ "payload".getBytes());
+
+ @BeforeEach
+ void initVariables() {
+ storage = new Storage();
+ }
+
+ @Test
+ void writeNull() {
+ assertDoesNotThrow(() -> storage.write(null));
+ }
+
+ @Test
+ void writeOne() {
+ assertDoesNotThrow(() -> storage.write(invalidConsumerData));
+ }
+
+ @Test
+ void getEmpty() {
+ assertNull(storage.get());
+ }
+
+ @Test
+ void getOne() {
+ storage.write(invalidConsumerData);
+ assertEquals(invalidConsumerData, storage.get());
+ }
+
+ @Test
+ void getTwo() {
+ storage.write(invalidConsumerData);
+ storage.write(invalidConsumerData);
+ assertEquals(Arrays.asList(invalidConsumerData, invalidConsumerData), storage.get(2));
+ }
+
+ @Test
+ void emptyTheStorage() {
+ storage.write(invalidConsumerData);
+ assertEquals(Collections.singletonList(invalidConsumerData), storage.get(2));
+ }
+
+ @Test
+ void checkSize() {
+ for (int expected = 1; expected <= 10; expected++) {
+ storage.write(invalidConsumerData);
+ assertEquals(expected, storage.size());
+ }
+ }
+
+ @Test
+ void checkSizeAfterEmptying(){
+ storage.write(invalidConsumerData);
+ storage.write(invalidConsumerData);
+ storage.get(7);
+ assertEquals(0, storage.size());
+ }
+}
diff --git a/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttBroker.java b/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttBroker.java
new file mode 100644
index 0000000..1b64bb9
--- /dev/null
+++ b/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttBroker.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2021 Patriot project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.patriot_framework.generator.device.consumer.mqtt;
+
+import io.moquette.broker.Server;
+import io.moquette.broker.config.ClasspathResourceLoader;
+import io.moquette.broker.config.IConfig;
+import io.moquette.broker.config.IResourceLoader;
+import io.moquette.broker.config.ResourceLoaderConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class MqttBroker {
+
+ public Server mqttBroker;
+
+ public static final Logger LOGGER = LoggerFactory.getLogger(MqttBroker.class);
+
+ public void startMqttBroker() throws IOException {
+ IResourceLoader classpathLoader = new ClasspathResourceLoader();
+ final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader);
+
+ mqttBroker = new Server();
+ mqttBroker.startServer(classPathConfig);
+ LOGGER.info(String.format("Started MQTT broker on port %d", mqttBroker.getPort()));
+ }
+
+ public void stopMqttBroker() {
+ mqttBroker.stopServer();
+ }
+}
diff --git a/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttConsumerTest.java b/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttConsumerTest.java
new file mode 100644
index 0000000..27bcc46
--- /dev/null
+++ b/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttConsumerTest.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2021 Patriot project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.patriot_framework.generator.device.consumer.mqtt;
+
+import io.patriot_framework.generator.device.consumer.exceptions.ConsumerException;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class MqttConsumerTest extends MqttTestBase {
+
+ @Test
+ void simpleMessage() throws MqttException, ConsumerException {
+ startSubscriber("front-door");
+ summonPublisher();
+ publish("front-door", "patriot");
+
+ assertArrayEquals("patriot".getBytes(), storage.get().getPayload());
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttDataTest.java b/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttDataTest.java
new file mode 100644
index 0000000..3e7c1ee
--- /dev/null
+++ b/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttDataTest.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2021 Patriot project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.patriot_framework.generator.device.consumer.mqtt;
+
+import io.patriot_framework.generator.device.consumer.exceptions.ConsumerException;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class MqttDataTest extends MqttTestBase {
+
+ @Test
+ void basicDataTest() throws MqttException, ConsumerException {
+ startSubscriber("front-door");
+ summonPublisher();
+ publish("front-door", "test message");
+
+ MqttData data = (MqttData) storage.get();
+ MqttMeta meta = data.getMeta();
+
+ assertEquals(subscriber.getUUID(), meta.getUUID());
+
+ LocalDateTime timestamp = meta.getTimestamp();
+ assertTrue(timestamp.compareTo(LocalDateTime.now().minusSeconds(3)) >= 0);
+
+ assertEquals("front-door", meta.getTopic());
+ assertEquals(1, meta.getQos());
+
+ assertFalse(meta.isDuplicate());
+ assertFalse(meta.isRetained());
+
+ assertArrayEquals("test message".getBytes(), data.getPayload());
+ }
+
+ @Test
+ @Disabled
+ void multiplePayloadsTest() throws MqttException, ConsumerException {
+ startSubscriber("temperature-meter");
+ summonPublisher();
+
+ for (int i = 1; i <= 10; i++) {
+ publish("temperature-meter", i + "°C");
+ }
+
+ for (int i = 1; i <= 10; i++) {
+ String temperature = i + "°C";
+ assertArrayEquals(temperature.getBytes(), storage.get().getPayload());
+ }
+ }
+
+ @Test
+ void bigPayload() throws MqttException, ConsumerException {
+ byte[] message = new byte[500000];
+ java.util.Arrays.fill(message, (byte) 's');
+
+ startSubscriber("big-messages");
+ summonPublisher();
+ publish("big-messages", message);
+
+ assertArrayEquals(message, storage.get().getPayload());
+ }
+}
diff --git a/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttExceptionTest.java b/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttExceptionTest.java
new file mode 100644
index 0000000..f1ef4b4
--- /dev/null
+++ b/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttExceptionTest.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2021 Patriot project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.patriot_framework.generator.device.consumer.mqtt;
+
+import io.patriot_framework.generator.device.consumer.exceptions.ConsumerException;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class MqttExceptionTest extends MqttTestBase {
+
+ @Test
+ void unknownHost() {
+ assertThrows(ConsumerException.class, () -> startSubscriber("tcp://hostlocal:8883", "front-door"));
+ }
+}
diff --git a/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttTestBase.java b/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttTestBase.java
new file mode 100644
index 0000000..610cd41
--- /dev/null
+++ b/src/test/java/io/patriot_framework/generator/device/consumer/mqtt/MqttTestBase.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2021 Patriot project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.patriot_framework.generator.device.consumer.mqtt;
+
+import io.patriot_framework.generator.device.consumer.Storage;
+import io.patriot_framework.generator.device.consumer.exceptions.ConsumerException;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+public abstract class MqttTestBase {
+
+ String brokerURI = "tcp://localhost:8883";
+ MqttBroker broker;
+ Storage storage = new Storage();
+ MqttConsumer subscriber;
+ MqttClient publisher;
+
+ void startSubscriber(String topic) throws ConsumerException {
+ storage = new Storage();
+ subscriber = new MqttConsumer(brokerURI, topic, "patriot-subscriber", 2, storage);
+ subscriber.start();
+ }
+
+ void startSubscriber(String broker, String topic) throws ConsumerException {
+ storage = new Storage();
+ subscriber = new MqttConsumer(broker, topic, "patriot-subscriber", 2, storage);
+ subscriber.start();
+ }
+
+ void summonPublisher() throws MqttException {
+ publisher = new MqttClient(brokerURI, "patriot-publisher");
+ publisher.connect();
+ }
+
+ void publish(String topic, String message) throws MqttException {
+ publisher.publish(topic, new MqttMessage(message.getBytes()));
+ }
+
+ void publish(String topic, byte[] message) throws MqttException {
+ publisher.publish(topic, new MqttMessage(message));
+ }
+
+ @BeforeEach
+ void createBroker() throws Exception {
+ broker = new MqttBroker();
+ broker.startMqttBroker();
+ }
+
+ @AfterEach
+ void close() throws MqttException {
+ if (publisher != null) {
+ publisher.disconnect();
+ }
+ subscriber.stop();
+ broker.stopMqttBroker();
+ }
+}
diff --git a/src/test/resources/config/moquette.conf b/src/test/resources/config/moquette.conf
new file mode 100644
index 0000000..5523d50
--- /dev/null
+++ b/src/test/resources/config/moquette.conf
@@ -0,0 +1,4 @@
+port 8883
+host 127.0.0.1
+allow_anonymous true
+netty.mqtt.message_size 500016
\ No newline at end of file
diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties
new file mode 100644
index 0000000..271e001
--- /dev/null
+++ b/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+#
+# Copyright 2020 Patriot project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=INFO, DEFAULT
+
+log4j.logger.io.moquette=WARN
+
+log4j.appender.DEFAULT=org.apache.log4j.ConsoleAppender
+log4j.appender.DEFAULT.layout=org.apache.log4j.PatternLayout
+log4j.appender.DEFAULT.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/src/test/resources/sslcerts/server_keystore.jks b/src/test/resources/sslcerts/server_keystore.jks
new file mode 100644
index 0000000..08b2000
Binary files /dev/null and b/src/test/resources/sslcerts/server_keystore.jks differ
diff --git a/src/test/resources/sslcerts/server_truststore.jks b/src/test/resources/sslcerts/server_truststore.jks
new file mode 100644
index 0000000..ae663a3
Binary files /dev/null and b/src/test/resources/sslcerts/server_truststore.jks differ