Skip to content

Commit

Permalink
Refactor jobs test
Browse files Browse the repository at this point in the history
  • Loading branch information
sfodagain committed Nov 22, 2023
1 parent 8ee62a5 commit ca3ccab
Showing 1 changed file with 92 additions and 110 deletions.
202 changes: 92 additions & 110 deletions servicetests/tests/jobs_execution/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,145 +116,127 @@ async function on_start_next_pending_job_execution_accepted(error? : iotjobs.Iot
}
}

async function get_available_jobs(jobs_client: iotjobs.IotJobsClient, argv: Args) {
// Subscribe to necessary topics and get pending jobs
try {
var pending_subscription_request : iotjobs.model.GetPendingJobExecutionsSubscriptionRequest = {
thingName: argv.thing_name
};
await jobs_client.subscribeToGetPendingJobExecutionsAccepted(pending_subscription_request, mqtt.QoS.AtLeastOnce, on_get_pending_job_execution_accepted);
await jobs_client.subscribeToGetPendingJobExecutionsRejected(pending_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error);

var pending_publish_request : iotjobs.model.GetPendingJobExecutionsRequest = {
thingName: argv.thing_name
};
await jobs_client.publishGetPendingJobExecutions(pending_publish_request, mqtt.QoS.AtLeastOnce);

await sleep(500);
} catch (error) {
console.log(error);
process.exit(-1)
}
}

async function describe_job(jobs_client: iotjobs.IotJobsClient, job_id: string, argv: Args) {
var description_subscription_request : iotjobs.model.DescribeJobExecutionRequest = {
thingName: argv.thing_name,
jobId: job_id
}
await jobs_client.subscribeToDescribeJobExecutionAccepted(description_subscription_request, mqtt.QoS.AtLeastOnce, on_describe_job_execution_accepted);
await jobs_client.subscribeToDescribeJobExecutionRejected(description_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error);

var description_publish_request : iotjobs.model.DescribeJobExecutionRequest = {
thingName: argv.thing_name,
jobId: job_id,
includeJobDocument: true,
executionNumber: 1
}
await jobs_client.publishDescribeJobExecution(description_publish_request, mqtt.QoS.AtLeastOnce);
}

async function start_next_pending_job(jobs_client: iotjobs.IotJobsClient, argv: Args) {
var start_next_subscription_request : iotjobs.model.StartNextPendingJobExecutionSubscriptionRequest = {
thingName: argv.thing_name
}

await jobs_client.subscribeToStartNextPendingJobExecutionAccepted(
start_next_subscription_request,
mqtt.QoS.AtLeastOnce,
on_start_next_pending_job_execution_accepted);
await jobs_client.subscribeToStartNextPendingJobExecutionRejected(
start_next_subscription_request,
mqtt.QoS.AtLeastOnce,
on_rejected_error);

var start_next_publish_request : iotjobs.model.StartNextPendingJobExecutionRequest = {
thingName: argv.thing_name,
stepTimeoutInMinutes: 15
}
await jobs_client.publishStartNextPendingJobExecution(start_next_publish_request, mqtt.QoS.AtLeastOnce);
}

async function update_current_job_status(jobs_client: iotjobs.IotJobsClient, status: iotjobs.model.JobStatus, argv: Args) {
var executing_subscription_request : iotjobs.model.UpdateJobExecutionSubscriptionRequest = {
thingName: argv.thing_name,
jobId: jobs_data.current_job_id
}
await jobs_client.subscribeToUpdateJobExecutionAccepted(executing_subscription_request, mqtt.QoS.AtLeastOnce,
(error?: iotjobs.IotJobsError, response?: iotjobs.model.UpdateJobExecutionResponse) => {
console.log("Marked job " + jobs_data.current_job_id + " " + status);
});
await jobs_client.subscribeToUpdateJobExecutionRejected(executing_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error);

var executing_publish_request : iotjobs.model.UpdateJobExecutionRequest = {
thingName: argv.thing_name,
jobId: jobs_data.current_job_id,
executionNumber: jobs_data.current_execution_number,
status: status,
expectedVersion: jobs_data.current_version_number++
};
await jobs_client.publishUpdateJobExecution(executing_publish_request, mqtt.QoS.AtLeastOnce);
}

async function main(argv: Args) {
common_args.apply_sample_arguments(argv);

var connection;
var client5;
var jobs;
let connection;
let client5;
let jobs_client;

console.log("Connecting...");
if (argv.mqtt_version == 5) {
client5 = common_args.build_mqtt5_client_from_cli_args(argv);
jobs = iotjobs.IotJobsClient.newFromMqtt5Client(client5);
jobs_client = iotjobs.IotJobsClient.newFromMqtt5Client(client5);

const connectionSuccess = once(client5, "connectionSuccess");
client5.start();
await connectionSuccess;
console.log("Connected with Mqtt5 Client!");
} else {
connection = common_args.build_connection_from_cli_args(argv);
jobs = new iotjobs.IotJobsClient(connection);
jobs_client = new iotjobs.IotJobsClient(connection);

await connection.connect()
console.log("Connected with Mqtt3 Client!");
}

// Subscribe to necessary topics and get pending jobs
try {
var pending_subscription_request : iotjobs.model.GetPendingJobExecutionsSubscriptionRequest = {
thingName: argv.thing_name
};
await jobs.subscribeToGetPendingJobExecutionsAccepted(pending_subscription_request, mqtt.QoS.AtLeastOnce, on_get_pending_job_execution_accepted);
await jobs.subscribeToGetPendingJobExecutionsRejected(pending_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error);

var pending_publish_request : iotjobs.model.GetPendingJobExecutionsRequest = {
thingName: argv.thing_name
};
await jobs.publishGetPendingJobExecutions(pending_publish_request, mqtt.QoS.AtLeastOnce);
await get_available_jobs(jobs_client, argv);

await sleep(500); // wait half a second
} catch (error) {
console.log(error);
process.exit(-1)
}

// Check if there are jobs to do
try {
if (available_jobs.length <= 0) {
if (available_jobs.length == 0) {
console.log("ERROR: No jobs queued in CI! At least one job should be queued!");
process.exit(-1);
}
} catch (error) {
console.log(error);
process.exit(-1)
}

// Get descriptions of each job
try {
for (var i = 0; i < available_jobs.length; ++i) {
var description_subscription_request : iotjobs.model.DescribeJobExecutionRequest = {
thingName: argv.thing_name,
jobId: available_jobs[i]
}
await jobs.subscribeToDescribeJobExecutionAccepted(description_subscription_request, mqtt.QoS.AtLeastOnce, on_describe_job_execution_accepted);
await jobs.subscribeToDescribeJobExecutionRejected(description_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error);

var description_publish_request : iotjobs.model.DescribeJobExecutionRequest = {
thingName: argv.thing_name,
jobId: available_jobs[i],
includeJobDocument: true,
executionNumber: 1
}
await jobs.publishDescribeJobExecution(description_publish_request, mqtt.QoS.AtLeastOnce);
await describe_job(jobs_client, available_jobs[i], argv);
}
} catch (error) {
console.log(error);
process.exit(-1)
}

// Pretend to do each job
try {
for (var job_idx = 0; job_idx < available_jobs.length; ++job_idx) {

// Start the next pending job
// ==================================================
var start_next_subscription_request : iotjobs.model.StartNextPendingJobExecutionSubscriptionRequest = {
thingName: argv.thing_name
}

await jobs.subscribeToStartNextPendingJobExecutionAccepted(start_next_subscription_request, mqtt.QoS.AtLeastOnce, on_start_next_pending_job_execution_accepted);
await jobs.subscribeToStartNextPendingJobExecutionRejected(start_next_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error);

var start_next_publish_request : iotjobs.model.StartNextPendingJobExecutionRequest = {
thingName: argv.thing_name,
stepTimeoutInMinutes: 15
}
await jobs.publishStartNextPendingJobExecution(start_next_publish_request, mqtt.QoS.AtLeastOnce);
// ==================================================

// Update the service to let it know we're executing
// ==================================================
var executing_subscription_request : iotjobs.model.UpdateJobExecutionSubscriptionRequest = {
thingName: argv.thing_name,
jobId: jobs_data.current_job_id
}
await jobs.subscribeToUpdateJobExecutionAccepted(executing_subscription_request, mqtt.QoS.AtLeastOnce,
(error?: iotjobs.IotJobsError, response?: iotjobs.model.UpdateJobExecutionResponse) => {
console.log("Marked job " + jobs_data.current_job_id + " IN_PROGRESS");
});
await jobs.subscribeToUpdateJobExecutionRejected(executing_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error);

var executing_publish_request : iotjobs.model.UpdateJobExecutionRequest = {
thingName: argv.thing_name,
jobId: jobs_data.current_job_id,
executionNumber: jobs_data.current_execution_number,
status: iotjobs.model.JobStatus.IN_PROGRESS,
expectedVersion: jobs_data.current_version_number++
};
await jobs.publishUpdateJobExecution(executing_publish_request, mqtt.QoS.AtLeastOnce);

// Update the service to let it know we are done
// ==================================================
var done_subscription_request : iotjobs.model.UpdateJobExecutionSubscriptionRequest = {
thingName: argv.thing_name,
jobId: jobs_data.current_job_id
}
await jobs.subscribeToUpdateJobExecutionAccepted(done_subscription_request, mqtt.QoS.AtLeastOnce,
(error?: iotjobs.IotJobsError, response?: iotjobs.model.UpdateJobExecutionResponse) => {
console.log("Marked job " + jobs_data.current_job_id + " SUCCEEDED");
});
await jobs.subscribeToUpdateJobExecutionRejected(done_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error);

var done_publish_request : iotjobs.model.UpdateJobExecutionRequest = {
thingName: argv.thing_name,
jobId: jobs_data.current_job_id,
executionNumber: jobs_data.current_execution_number,
status: iotjobs.model.JobStatus.SUCCEEDED,
expectedVersion: jobs_data.current_version_number++
};
await jobs.publishUpdateJobExecution(done_publish_request, mqtt.QoS.AtLeastOnce);
// ==================================================
await start_next_pending_job(jobs_client, argv);
await update_current_job_status(jobs_client, iotjobs.model.JobStatus.IN_PROGRESS, argv);
await sleep(1000);
await update_current_job_status(jobs_client, iotjobs.model.JobStatus.SUCCEEDED, argv);
}
} catch (error) {
console.log(error);
Expand Down

0 comments on commit ca3ccab

Please sign in to comment.