From 6998d40c435c6492b2b2ce6156f867c985ce34b8 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Sat, 22 Apr 2023 10:36:24 +0000 Subject: [PATCH] Change the host ip address as the vm address --- .../com/livequery/prototype/Controller.java | 54 +++++++++++-------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/backend/src/main/java/com/livequery/prototype/Controller.java b/backend/src/main/java/com/livequery/prototype/Controller.java index 2c7e327..709ee95 100644 --- a/backend/src/main/java/com/livequery/prototype/Controller.java +++ b/backend/src/main/java/com/livequery/prototype/Controller.java @@ -90,7 +90,7 @@ private SiddhiAppRuntime getSiddhiAppRuntime(LinkedBlockingQueue linked siddhiAppName, query, new SiddhiAppComposites.Annotation.Source.LiveSource() - .addSourceComposite(new KeyValue<>("host.name","10.8.100.246:9092")) + .addSourceComposite(new KeyValue<>("host.name","20.171.27.19:9092")) .addSourceComposite(new KeyValue<>("api.key","Tu_TZ0W2cR92-sr1j-l7ACA.newone.9pej9tihskpx2vYZaxubGW3sFCJLzxe55NRh7T0uk1JMYiRmHdiQsWh5JhRXXT6c418385")), new JsonMap() .addMapComposite(new KeyValue<>("fail.on.missing.attribute","false")) @@ -108,13 +108,13 @@ private SiddhiAppRuntime getSiddhiAppRuntime(LinkedBlockingQueue linked siddhiAppRuntime.addCallback("SQL-SiddhiQL-dev-test", new QueryCallback() { @Override public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { - final long[] time = {System.currentTimeMillis()}; - try { - calculateLatency(inEvents, "initial", time, "id-1"); - } catch (IOException e) { - throw new RuntimeException(e); - } - EventPrinter.print(timeStamp, inEvents, removeEvents); +// final long[] time = {System.currentTimeMillis()}; +// try { +// calculateLatency(inEvents, "initial", time, "id-1"); +// } catch (IOException e) { +// throw new RuntimeException(e); +// } +// EventPrinter.print(timeStamp, inEvents, removeEvents); linkedBlockingQueue.add(inEvents); // System.out.println("Event in Siddhi: " + inEvents.length + " , queueSize: " + linkedBlockingQueue.size()); } @@ -123,20 +123,27 @@ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { } private void calculateLatency(Event[] event, String initial, long[] time, String prometheus_query) throws IOException { - long current = System.currentTimeMillis(); -// if (System.currentTimeMillis() > time[0] + 3.6e+6) { -// TimeInfo timeInfo = timeClient.getTime(inetAddress); -// long returnTime = timeInfo.getMessage().getTransmitTimeStamp().getTime(); -// long updatedTime = Long.parseLong(event[0].getData()[event[0].getData().length - 1].toString()); -// long traffic_latency = returnTime - updatedTime; -// System.out.println("current: " + current + " sync: " + returnTime + " updated_time: " + updatedTime + " traffic_latency: " + traffic_latency); -// meterRegistry.timer(prometheus_query).record(Duration.ofMillis(traffic_latency)); -// time[0] = System.currentTimeMillis(); -// } else { - long updatedTime = Long.parseLong(event[0].getData()[event[0].getData().length - 1].toString()); - long traffic_latency = current - updatedTime; - meterRegistry.timer(prometheus_query).record(Duration.ofMillis(traffic_latency)); - System.out.println("current: " + current + " updated_time: " + updatedTime + " traffic_latency: " + traffic_latency); + long current = System.currentTimeMillis(); +// if (Objects.equals(initial, "false")) { + if (System.currentTimeMillis() > time[0] + 3.6e+6) { + TimeInfo timeInfo = timeClient.getTime(inetAddress); + long returnTime = timeInfo.getMessage().getTransmitTimeStamp().getTime(); + long updatedTime = Long.parseLong(event[0].getData()[event[0].getData().length - 1].toString()); + long traffic_latency = returnTime - updatedTime; + System.out.println("current: " + current + " sync: " + returnTime + " updated_time: " + updatedTime + " traffic_latency: " + traffic_latency); + meterRegistry.timer(prometheus_query).record(Duration.ofMillis(traffic_latency)); + time[0] = System.currentTimeMillis(); + } else { + + String timestamp = (String) event[0].getData()[event[0].getData().length - 1]; + System.out.println("Timestamp: " + timestamp); + long updatedTime = Long.parseLong(timestamp); +// System.out.println("Updated Time: " + updatedTime); + long traffic_latency = current - updatedTime; +// System.out.println("Traffic Latency: " + traffic_latency); + meterRegistry.timer(prometheus_query).record(Duration.ofMillis(traffic_latency)); + System.out.println("current: " + current + " updated_time: " + updatedTime + " traffic_latency: " + traffic_latency); + } // } } @@ -208,7 +215,8 @@ public void run() { Event[] event = linkedBlockingQueue.take(); // responses.add(event[0].getData()); String initial = "event[0].getData()[event[0].getData().length-1].toString()"; -// calculateLatency(event, initial, time,uniqueId); +// System.out.println("Initial: " + initial); + calculateLatency(event, initial, time,uniqueId); // System.out.println("Event in Backend: " + event[0].getData()); // if (responses.size() == 5) { // sseEmitter.send(event);