Skip to content

Commit

Permalink
Merge pull request #13 from FYP-Live-Query/writing_to_file
Browse files Browse the repository at this point in the history
Writing to file
  • Loading branch information
vinoja98 authored May 5, 2023
2 parents 6998d40 + 2e46e63 commit 68f25f2
Showing 1 changed file with 67 additions and 22 deletions.
89 changes: 67 additions & 22 deletions backend/src/main/java/com/livequery/prototype/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
import SiddhiAppComposites.Annotation.Source.LiveSource;
import SiddhiAppComposites.SiddhiApp;
import SiddhiAppComposites.SiddhiAppGenerator;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.io.FileWriter;

@RestController
public class Controller {
Expand All @@ -59,7 +65,8 @@ public class Controller {
NTPUDPClient timeClient = new NTPUDPClient();
private final PersistenceStore persistenceStore;
private final InetAddress inetAddress = InetAddress.getByName(TIME_SERVER);

private final List<Long> latencyValues = new ArrayList<>();
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private final AtomicInteger iterateID = new AtomicInteger(0);
public Controller(MeterRegistry meterRegistry) throws UnknownHostException {
this.persistenceStore = new InMemoryPersistenceStore();
Expand All @@ -71,6 +78,7 @@ public Controller(MeterRegistry meterRegistry) throws UnknownHostException {
this.trafficUsers = new HashMap<>();
this.browserUsers = new HashMap<>();
this.anyQueryUsers = new HashMap<>();
executorService.scheduleAtFixedRate(this::writeLatencyValuesToCsv, 1, 1, TimeUnit.MINUTES);
}

private static SseEmitter getSseEmitter() {
Expand Down Expand Up @@ -122,31 +130,65 @@ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
return siddhiAppRuntime;
}

private void calculateLatency(Event[] event, String initial, long[] time, String prometheus_query) throws IOException {
long current = System.currentTimeMillis();
// if (Objects.equals(initial, "false")) {
if (System.currentTimeMillis() > time[0] + 3.6e+6) {
TimeInfo timeInfo = timeClient.getTime(inetAddress);
long returnTime = timeInfo.getMessage().getTransmitTimeStamp().getTime();
long updatedTime = Long.parseLong(event[0].getData()[event[0].getData().length - 1].toString());
long traffic_latency = returnTime - updatedTime;
System.out.println("current: " + current + " sync: " + returnTime + " updated_time: " + updatedTime + " traffic_latency: " + traffic_latency);
meterRegistry.timer(prometheus_query).record(Duration.ofMillis(traffic_latency));
time[0] = System.currentTimeMillis();
} else {

String timestamp = (String) event[0].getData()[event[0].getData().length - 1];
System.out.println("Timestamp: " + timestamp);
long updatedTime = Long.parseLong(timestamp);
// System.out.println("Updated Time: " + updatedTime);
private void calculateLatency(Event[] event, String initial, long[] time, String prometheus_query,long start) throws IOException {
long current = System.currentTimeMillis();
// if (System.currentTimeMillis() > time[0] + 3.6e+6) {
// TimeInfo timeInfo = timeClient.getTime(inetAddress);
// long returnTime = timeInfo.getMessage().getTransmitTimeStamp().getTime();
// long updatedTime = Long.parseLong(event[0].getData()[event[0].getData().length - 1].toString());
// long traffic_latency = returnTime - updatedTime;
// System.out.println("current: " + current + " sync: " + returnTime + " updated_time: " + updatedTime + " traffic_latency: " + traffic_latency);
// meterRegistry.timer(prometheus_query).record(Duration.ofMillis(traffic_latency));
// time[0] = System.currentTimeMillis();
// } else {
long updatedTime = Long.parseLong(event[0].getData()[event[0].getData().length - 1].toString());
if(updatedTime>start){
long traffic_latency = current - updatedTime;
// System.out.println("Traffic Latency: " + traffic_latency);
latencyValues.add(traffic_latency);
meterRegistry.timer(prometheus_query).record(Duration.ofMillis(traffic_latency));
System.out.println("current: " + current + " updated_time: " + updatedTime + " traffic_latency: " + traffic_latency);
}
// }
}

private void writeLatencyValuesToCsv() {
try {
// Calculate average and 90th percentile of latency values
double averageLatency = latencyValues.stream()
.mapToLong(Long::longValue)
.average()
.orElse(Double.NaN);
double percentile95Latency = latencyValues.stream()
.sorted()
.skip((long) (latencyValues.size() * 0.95))
.findFirst()
.orElse(0L);
double percentile99Latency = latencyValues.stream()
.sorted()
.skip((long) (latencyValues.size() * 0.99))
.findFirst()
.orElse(0L);
// Write average and 90th percentile of latency values to CSV file
FileWriter csvWriter = new FileWriter("latency_values.csv", true);
csvWriter.append(Double.toString(averageLatency));
csvWriter.append(",");
csvWriter.append(Double.toString(percentile95Latency));
csvWriter.append(",");
csvWriter.append(Double.toString(percentile99Latency));
csvWriter.append("\n");
csvWriter.flush();
csvWriter.close();

// Clear the latency values list
latencyValues.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
public void shutdown() {
// Shutdown the executor service when the program is exiting
executorService.shutdown();
}

@PostMapping("/publish")
@CrossOrigin
Expand Down Expand Up @@ -184,6 +226,7 @@ public UserInfo setQuery(@RequestBody UserInfo userInfo) {
public SseEmitter trafficData() throws CredentialException, IOException, InterruptedException {
String userId = "ZXCVB";
StringBuilder str1 = new StringBuilder("id-");
long start =System.currentTimeMillis();
str1.append(iterateID.incrementAndGet());
String uniqueId = str1.toString();

Expand Down Expand Up @@ -216,7 +259,7 @@ public void run() {
// responses.add(event[0].getData());
String initial = "event[0].getData()[event[0].getData().length-1].toString()";
// System.out.println("Initial: " + initial);
calculateLatency(event, initial, time,uniqueId);
calculateLatency(event, initial, time,uniqueId,start);
// System.out.println("Event in Backend: " + event[0].getData());
// if (responses.size() == 5) {
// sseEmitter.send(event);
Expand Down Expand Up @@ -250,6 +293,7 @@ public void run() {
@CrossOrigin
public SseEmitter browserData(String userId) throws CredentialException, IOException, InterruptedException {
final long[] time = {System.currentTimeMillis()};
long start =System.currentTimeMillis();
LinkedBlockingQueue<Event[]> linkedBlockingQueue = new LinkedBlockingQueue<>();
SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);

Expand Down Expand Up @@ -279,7 +323,7 @@ public void run() {
responses.add(event[0].getData());
String initial = event[0].getData()[event[0].getData().length-1].toString();
System.out.println("Browser Latencies");
calculateLatency(event, initial, time,userId);
calculateLatency(event, initial, time,userId,start);
System.out.println("...............");
if (responses.size() == 4) {
sseEmitter.send(responses);
Expand Down Expand Up @@ -317,6 +361,7 @@ public void run() {
public SseEmitter anyQueryData(String userId) throws CredentialException, IOException, InterruptedException {

final long[] time = {System.currentTimeMillis()};
long start =System.currentTimeMillis();
LinkedBlockingQueue<Event[]> linkedBlockingQueue = new LinkedBlockingQueue<>();
SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);
Runnable siddhiAppRunner = new Runnable() {
Expand Down Expand Up @@ -344,7 +389,7 @@ public void run() {
Event[] event = linkedBlockingQueue.take();
System.out.println("event arrived");
// String initial = event[0].getData()[event[0].getData().length-1].toString();
calculateLatency(event, "", time,"query.latency");
calculateLatency(event, "", time,"query.latency",start);

sseEmitter.send(event[0].getData());
// emitter.complete();
Expand Down

0 comments on commit 68f25f2

Please sign in to comment.