Skip to content

Commit

Permalink
Change the host ip address as the vm address
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuvindu committed Apr 22, 2023
1 parent 2bd5ed4 commit 6998d40
Showing 1 changed file with 31 additions and 23 deletions.
54 changes: 31 additions & 23 deletions backend/src/main/java/com/livequery/prototype/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private SiddhiAppRuntime getSiddhiAppRuntime(LinkedBlockingQueue<Event[]> linked
siddhiAppName,
query,
new SiddhiAppComposites.Annotation.Source.LiveSource()
.addSourceComposite(new KeyValue<>("host.name","10.8.100.246:9092"))
.addSourceComposite(new KeyValue<>("host.name","20.171.27.19:9092"))
.addSourceComposite(new KeyValue<>("api.key","Tu_TZ0W2cR92-sr1j-l7ACA.newone.9pej9tihskpx2vYZaxubGW3sFCJLzxe55NRh7T0uk1JMYiRmHdiQsWh5JhRXXT6c418385")),
new JsonMap()
.addMapComposite(new KeyValue<>("fail.on.missing.attribute","false"))
Expand All @@ -108,13 +108,13 @@ private SiddhiAppRuntime getSiddhiAppRuntime(LinkedBlockingQueue<Event[]> linked
siddhiAppRuntime.addCallback("SQL-SiddhiQL-dev-test", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
final long[] time = {System.currentTimeMillis()};
try {
calculateLatency(inEvents, "initial", time, "id-1");
} catch (IOException e) {
throw new RuntimeException(e);
}
EventPrinter.print(timeStamp, inEvents, removeEvents);
// final long[] time = {System.currentTimeMillis()};
// try {
// calculateLatency(inEvents, "initial", time, "id-1");
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
// EventPrinter.print(timeStamp, inEvents, removeEvents);
linkedBlockingQueue.add(inEvents);
// System.out.println("Event in Siddhi: " + inEvents.length + " , queueSize: " + linkedBlockingQueue.size());
}
Expand All @@ -123,20 +123,27 @@ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
}

private void calculateLatency(Event[] event, String initial, long[] time, String prometheus_query) throws IOException {
long current = System.currentTimeMillis();
// if (System.currentTimeMillis() > time[0] + 3.6e+6) {
// TimeInfo timeInfo = timeClient.getTime(inetAddress);
// long returnTime = timeInfo.getMessage().getTransmitTimeStamp().getTime();
// long updatedTime = Long.parseLong(event[0].getData()[event[0].getData().length - 1].toString());
// long traffic_latency = returnTime - updatedTime;
// System.out.println("current: " + current + " sync: " + returnTime + " updated_time: " + updatedTime + " traffic_latency: " + traffic_latency);
// meterRegistry.timer(prometheus_query).record(Duration.ofMillis(traffic_latency));
// time[0] = System.currentTimeMillis();
// } else {
long updatedTime = Long.parseLong(event[0].getData()[event[0].getData().length - 1].toString());
long traffic_latency = current - updatedTime;
meterRegistry.timer(prometheus_query).record(Duration.ofMillis(traffic_latency));
System.out.println("current: " + current + " updated_time: " + updatedTime + " traffic_latency: " + traffic_latency);
long current = System.currentTimeMillis();
// if (Objects.equals(initial, "false")) {
if (System.currentTimeMillis() > time[0] + 3.6e+6) {
TimeInfo timeInfo = timeClient.getTime(inetAddress);
long returnTime = timeInfo.getMessage().getTransmitTimeStamp().getTime();
long updatedTime = Long.parseLong(event[0].getData()[event[0].getData().length - 1].toString());
long traffic_latency = returnTime - updatedTime;
System.out.println("current: " + current + " sync: " + returnTime + " updated_time: " + updatedTime + " traffic_latency: " + traffic_latency);
meterRegistry.timer(prometheus_query).record(Duration.ofMillis(traffic_latency));
time[0] = System.currentTimeMillis();
} else {

String timestamp = (String) event[0].getData()[event[0].getData().length - 1];
System.out.println("Timestamp: " + timestamp);
long updatedTime = Long.parseLong(timestamp);
// System.out.println("Updated Time: " + updatedTime);
long traffic_latency = current - updatedTime;
// System.out.println("Traffic Latency: " + traffic_latency);
meterRegistry.timer(prometheus_query).record(Duration.ofMillis(traffic_latency));
System.out.println("current: " + current + " updated_time: " + updatedTime + " traffic_latency: " + traffic_latency);
}
// }
}

Expand Down Expand Up @@ -208,7 +215,8 @@ public void run() {
Event[] event = linkedBlockingQueue.take();
// responses.add(event[0].getData());
String initial = "event[0].getData()[event[0].getData().length-1].toString()";
// calculateLatency(event, initial, time,uniqueId);
// System.out.println("Initial: " + initial);
calculateLatency(event, initial, time,uniqueId);
// System.out.println("Event in Backend: " + event[0].getData());
// if (responses.size() == 5) {
// sseEmitter.send(event);
Expand Down

0 comments on commit 6998d40

Please sign in to comment.