Skip to content

Commit

Permalink
AIRAVATA-3594: added getCpuUsages implementation in RegistryServerHan…
Browse files Browse the repository at this point in the history
…dler
  • Loading branch information
Saurav Kumar Jha committed Aug 25, 2023
1 parent c3e87e3 commit b0f1af5
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
*/
package org.apache.airavata.registry.core.repositories.expcatalog;

import org.apache.airavata.model.experiment.CpuUsage;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.JobState;
import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.registry.core.entities.expcatalog.JobEntity;
import org.apache.airavata.registry.core.entities.expcatalog.JobPK;
import org.apache.airavata.registry.core.entities.expcatalog.JobStatusEntity;
import org.apache.airavata.registry.core.entities.expcatalog.JobStatusPK;
Expand All @@ -36,6 +39,7 @@
import org.slf4j.LoggerFactory;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -113,4 +117,51 @@ public List<JobStatus> getDistinctListofJobStatus(String gatewayId,String status
return jobStatusRepository.selectWithNativeQuery(QueryConstants.FIND_JOB_COUNT_NATIVE_QUERY,
gatewayId,status,String.valueOf(time));
}

public List<CpuUsage> getCpuUsages(String gatewayId, long fromTime, long toTime) {
JobStatusRepository jobStatusRepository = new JobStatusRepository();
List<CpuUsage> cpuUsages = new ArrayList<>();
List<JobStatus> startedJobStatusList = jobStatusRepository.selectWithNativeQuery(
QueryConstants.FIND_JOB_STATUS_STARTED_BEFORE_TIME, gatewayId, JobState.ACTIVE.name(),
String.valueOf(toTime));
;
List<JobStatus> finishedJobStatusList = jobStatusRepository.selectWithNativeQuery(
QueryConstants.FIND_JOB_STATUS_FINISHED_AFTER_TIME, gatewayId, JobState.COMPLETE.name(),
JobState.FAILED.name(), JobState.CANCELED.name(), String.valueOf(fromTime));
Mapper mapper = ObjectMapperSingleton.getInstance();
for (JobStatus startedJobStatus : startedJobStatusList) {
for (JobStatus finishedJobStatus : finishedJobStatusList) {
JobStatusEntity startedJobStatusEntity = mapper.map(startedJobStatus, JobStatusEntity.class);
JobStatusEntity finishedJobStatusEntity = mapper.map(finishedJobStatus, JobStatusEntity.class);
if (startedJobStatusEntity.getJobId() == finishedJobStatusEntity.getJobId()) {
String jobId = startedJobStatusEntity.getJobId();
List<ExperimentModel> experiments = (new ExperimentRepository())
.selectWithNativeQuery(QueryConstants.FIND_EXPERIMENT_WITH_JOB_ID, jobId);
if (!experiments.isEmpty()) {
ExperimentModel experiment = experiments.get(0);
List<Integer> res = (new ExperimentRepository().selectWithNativeQuery(
QueryConstants.FIND_TOTAL_CPU_COUNT_WITH_EXPERIMENT_ID, experiment.getExperimentId()));
if (!res.isEmpty()) {
CpuUsage cpuUsage = new CpuUsage();
cpuUsage.setExperimentId(experiment.getExperimentId());
cpuUsage.setExecutionId(experiment.getExecutionId());
cpuUsage.setUserName(experiment.getUserName());
Timestamp fromTimestamp = new Timestamp(fromTime);
Timestamp toTimestamp = new Timestamp(toTime);
Timestamp startTime = startedJobStatusEntity.getTimeOfStateChange().after(fromTimestamp)
? startedJobStatusEntity.getTimeOfStateChange()
: fromTimestamp;
Timestamp finishTime = finishedJobStatusEntity.getTimeOfStateChange().before(toTimestamp)
? finishedJobStatusEntity.getTimeOfStateChange()
: toTimestamp;
long duration = finishTime.getTime() - startTime.getTime(); // milliseconds
int totalCPUCount = res.get(0);
cpuUsage.setCpuHours(duration * totalCPUCount / (1000 * 60 * 60));
}
}
}
}
}
return cpuUsages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,23 @@ public interface QueryConstants {
"(SELECT E.EXPERIMENT_ID FROM EXPERIMENT E where E.GATEWAY_ID= ?1))) " +
"AND JS.STATE = ?2 and JS.TIME_OF_STATE_CHANGE > now() - interval ?3 minute";

String FIND_JOB_STATUS_STARTED_BEFORE_TIME = "SELECT * FROM JOB_STATUS JS WHERE JS.JOB_ID IN " +
"(SELECT J.JOB_ID FROM JOB J where J.PROCESS_ID IN " +
"(SELECT P.PROCESS_ID FROM PROCESS P where P.EXPERIMENT_ID IN " +
"(SELECT E.EXPERIMENT_ID FROM EXPERIMENT E where E.GATEWAY_ID= ?1))) " +
"AND JS.STATE = ?2 and JS.TIME_OF_STATE_CHANGE <= ?3";

String FIND_JOB_STATUS_FINISHED_AFTER_TIME = "SELECT * FROM JOB_STATUS JS WHERE JS.JOB_ID IN " +
"(SELECT J.JOB_ID FROM JOB J where J.PROCESS_ID IN " +
"(SELECT P.PROCESS_ID FROM PROCESS P where P.EXPERIMENT_ID IN " +
"(SELECT E.EXPERIMENT_ID FROM EXPERIMENT E where E.GATEWAY_ID= ?1))) " +
"AND (JS.STATE = ?2 OR JS.STATE = ?3 OR JS.STATE = ?4) and JS.TIME_OF_STATE_CHANGE >= ?5";

String FIND_EXPERIMENT_WITH_JOB_ID = "SELECT * FROM EXPERIMENT E WHERE E.EXPERIMENT_ID IN " +
"(SELECT P.EXPERIMENT_ID FROM JOB J JOIN J.task T JOIN T.process P WHERE J.JOB_ID = ?1)";

String FIND_TOTAL_CPU_COUNT_WITH_EXPERIMENT_ID = "SELECT U.TOTAL_CPU_COUNT FROM EXPERIMENT E JOIN E.userConfigurationData U WHERE E.EXPERIMENT_ID = ?1";

String FIND_AVG_TIME_UPTO_METASCHEDULER_NATIVE_QUERY = "SELECT AVG(difference) FROM(select es.TIME_OF_STATE_CHANGE AS esTime1, ps.TIME_OF_STATE_CHANGE as psTime1, " +
" TIMESTAMPDIFF(MICROSECOND, es.TIME_OF_STATE_CHANGE, ps.TIME_OF_STATE_CHANGE) AS difference FROM EXPERIMENT_STATUS es, " +
" EXPERIMENT_STATUS ps WHERE es.EXPERIMENT_ID IN (select EXPERIMENT_ID FROM EXPERIMENT WHERE GATEWAY_ID= ?1) " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@
import org.apache.airavata.registry.core.repositories.replicacatalog.DataReplicaLocationRepository;
import org.apache.airavata.registry.core.repositories.workflowcatalog.WorkflowRepository;
import org.apache.airavata.registry.core.utils.DBConstants;
import org.apache.airavata.registry.core.utils.QueryConstants;
import org.apache.airavata.registry.cpi.*;
import org.apache.airavata.registry.cpi.utils.Constants;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Timestamp;
import java.util.*;

public class RegistryServerHandler implements RegistryService.Iface {
Expand Down Expand Up @@ -1263,8 +1265,14 @@ public List<CpuUsage> getCpuUsages(String gatewayId, long fromTime, long toTime)
throw new AiravataSystemException(AiravataErrorType.INTERNAL_ERROR);
}
try {
// TODO: implementation to be done, temporarily returning empty list
List<CpuUsage> cpuUsages = new ArrayList<>();
Timestamp fromTimeStamp = new Timestamp(fromTime);
Timestamp toTimestamp = new Timestamp(toTime);
if(fromTimeStamp.after(toTimestamp)) {
logger.error("fromTime must not be after toTime");
return cpuUsages;
}
cpuUsages = jobStatusRepository.getCpuUsages(gatewayId, fromTime, toTime);
return cpuUsages;

} catch (Exception e) {
Expand Down

0 comments on commit b0f1af5

Please sign in to comment.