From ca3ccabfbc62602bd60d6d327d3e67427a823611 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Wed, 22 Nov 2023 15:25:06 -0800 Subject: [PATCH] Refactor jobs test --- servicetests/tests/jobs_execution/index.ts | 202 ++++++++++----------- 1 file changed, 92 insertions(+), 110 deletions(-) diff --git a/servicetests/tests/jobs_execution/index.ts b/servicetests/tests/jobs_execution/index.ts index 53624c9c..31ad4479 100644 --- a/servicetests/tests/jobs_execution/index.ts +++ b/servicetests/tests/jobs_execution/index.ts @@ -116,17 +116,97 @@ async function on_start_next_pending_job_execution_accepted(error? : iotjobs.Iot } } +async function get_available_jobs(jobs_client: iotjobs.IotJobsClient, argv: Args) { + // Subscribe to necessary topics and get pending jobs + try { + var pending_subscription_request : iotjobs.model.GetPendingJobExecutionsSubscriptionRequest = { + thingName: argv.thing_name + }; + await jobs_client.subscribeToGetPendingJobExecutionsAccepted(pending_subscription_request, mqtt.QoS.AtLeastOnce, on_get_pending_job_execution_accepted); + await jobs_client.subscribeToGetPendingJobExecutionsRejected(pending_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error); + + var pending_publish_request : iotjobs.model.GetPendingJobExecutionsRequest = { + thingName: argv.thing_name + }; + await jobs_client.publishGetPendingJobExecutions(pending_publish_request, mqtt.QoS.AtLeastOnce); + + await sleep(500); + } catch (error) { + console.log(error); + process.exit(-1) + } +} + +async function describe_job(jobs_client: iotjobs.IotJobsClient, job_id: string, argv: Args) { + var description_subscription_request : iotjobs.model.DescribeJobExecutionRequest = { + thingName: argv.thing_name, + jobId: job_id + } + await jobs_client.subscribeToDescribeJobExecutionAccepted(description_subscription_request, mqtt.QoS.AtLeastOnce, on_describe_job_execution_accepted); + await jobs_client.subscribeToDescribeJobExecutionRejected(description_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error); + + var description_publish_request : iotjobs.model.DescribeJobExecutionRequest = { + thingName: argv.thing_name, + jobId: job_id, + includeJobDocument: true, + executionNumber: 1 + } + await jobs_client.publishDescribeJobExecution(description_publish_request, mqtt.QoS.AtLeastOnce); +} + +async function start_next_pending_job(jobs_client: iotjobs.IotJobsClient, argv: Args) { + var start_next_subscription_request : iotjobs.model.StartNextPendingJobExecutionSubscriptionRequest = { + thingName: argv.thing_name + } + + await jobs_client.subscribeToStartNextPendingJobExecutionAccepted( + start_next_subscription_request, + mqtt.QoS.AtLeastOnce, + on_start_next_pending_job_execution_accepted); + await jobs_client.subscribeToStartNextPendingJobExecutionRejected( + start_next_subscription_request, + mqtt.QoS.AtLeastOnce, + on_rejected_error); + + var start_next_publish_request : iotjobs.model.StartNextPendingJobExecutionRequest = { + thingName: argv.thing_name, + stepTimeoutInMinutes: 15 + } + await jobs_client.publishStartNextPendingJobExecution(start_next_publish_request, mqtt.QoS.AtLeastOnce); +} + +async function update_current_job_status(jobs_client: iotjobs.IotJobsClient, status: iotjobs.model.JobStatus, argv: Args) { + var executing_subscription_request : iotjobs.model.UpdateJobExecutionSubscriptionRequest = { + thingName: argv.thing_name, + jobId: jobs_data.current_job_id + } + await jobs_client.subscribeToUpdateJobExecutionAccepted(executing_subscription_request, mqtt.QoS.AtLeastOnce, + (error?: iotjobs.IotJobsError, response?: iotjobs.model.UpdateJobExecutionResponse) => { + console.log("Marked job " + jobs_data.current_job_id + " " + status); + }); + await jobs_client.subscribeToUpdateJobExecutionRejected(executing_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error); + + var executing_publish_request : iotjobs.model.UpdateJobExecutionRequest = { + thingName: argv.thing_name, + jobId: jobs_data.current_job_id, + executionNumber: jobs_data.current_execution_number, + status: status, + expectedVersion: jobs_data.current_version_number++ + }; + await jobs_client.publishUpdateJobExecution(executing_publish_request, mqtt.QoS.AtLeastOnce); +} + async function main(argv: Args) { common_args.apply_sample_arguments(argv); - var connection; - var client5; - var jobs; + let connection; + let client5; + let jobs_client; console.log("Connecting..."); if (argv.mqtt_version == 5) { client5 = common_args.build_mqtt5_client_from_cli_args(argv); - jobs = iotjobs.IotJobsClient.newFromMqtt5Client(client5); + jobs_client = iotjobs.IotJobsClient.newFromMqtt5Client(client5); const connectionSuccess = once(client5, "connectionSuccess"); client5.start(); @@ -134,127 +214,29 @@ async function main(argv: Args) { console.log("Connected with Mqtt5 Client!"); } else { connection = common_args.build_connection_from_cli_args(argv); - jobs = new iotjobs.IotJobsClient(connection); + jobs_client = new iotjobs.IotJobsClient(connection); await connection.connect() console.log("Connected with Mqtt3 Client!"); } - // Subscribe to necessary topics and get pending jobs try { - var pending_subscription_request : iotjobs.model.GetPendingJobExecutionsSubscriptionRequest = { - thingName: argv.thing_name - }; - await jobs.subscribeToGetPendingJobExecutionsAccepted(pending_subscription_request, mqtt.QoS.AtLeastOnce, on_get_pending_job_execution_accepted); - await jobs.subscribeToGetPendingJobExecutionsRejected(pending_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error); - - var pending_publish_request : iotjobs.model.GetPendingJobExecutionsRequest = { - thingName: argv.thing_name - }; - await jobs.publishGetPendingJobExecutions(pending_publish_request, mqtt.QoS.AtLeastOnce); + await get_available_jobs(jobs_client, argv); - await sleep(500); // wait half a second - } catch (error) { - console.log(error); - process.exit(-1) - } - - // Check if there are jobs to do - try { - if (available_jobs.length <= 0) { + if (available_jobs.length == 0) { console.log("ERROR: No jobs queued in CI! At least one job should be queued!"); process.exit(-1); } - } catch (error) { - console.log(error); - process.exit(-1) - } - // Get descriptions of each job - try { for (var i = 0; i < available_jobs.length; ++i) { - var description_subscription_request : iotjobs.model.DescribeJobExecutionRequest = { - thingName: argv.thing_name, - jobId: available_jobs[i] - } - await jobs.subscribeToDescribeJobExecutionAccepted(description_subscription_request, mqtt.QoS.AtLeastOnce, on_describe_job_execution_accepted); - await jobs.subscribeToDescribeJobExecutionRejected(description_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error); - - var description_publish_request : iotjobs.model.DescribeJobExecutionRequest = { - thingName: argv.thing_name, - jobId: available_jobs[i], - includeJobDocument: true, - executionNumber: 1 - } - await jobs.publishDescribeJobExecution(description_publish_request, mqtt.QoS.AtLeastOnce); + await describe_job(jobs_client, available_jobs[i], argv); } - } catch (error) { - console.log(error); - process.exit(-1) - } - // Pretend to do each job - try { for (var job_idx = 0; job_idx < available_jobs.length; ++job_idx) { - - // Start the next pending job - // ================================================== - var start_next_subscription_request : iotjobs.model.StartNextPendingJobExecutionSubscriptionRequest = { - thingName: argv.thing_name - } - - await jobs.subscribeToStartNextPendingJobExecutionAccepted(start_next_subscription_request, mqtt.QoS.AtLeastOnce, on_start_next_pending_job_execution_accepted); - await jobs.subscribeToStartNextPendingJobExecutionRejected(start_next_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error); - - var start_next_publish_request : iotjobs.model.StartNextPendingJobExecutionRequest = { - thingName: argv.thing_name, - stepTimeoutInMinutes: 15 - } - await jobs.publishStartNextPendingJobExecution(start_next_publish_request, mqtt.QoS.AtLeastOnce); - // ================================================== - - // Update the service to let it know we're executing - // ================================================== - var executing_subscription_request : iotjobs.model.UpdateJobExecutionSubscriptionRequest = { - thingName: argv.thing_name, - jobId: jobs_data.current_job_id - } - await jobs.subscribeToUpdateJobExecutionAccepted(executing_subscription_request, mqtt.QoS.AtLeastOnce, - (error?: iotjobs.IotJobsError, response?: iotjobs.model.UpdateJobExecutionResponse) => { - console.log("Marked job " + jobs_data.current_job_id + " IN_PROGRESS"); - }); - await jobs.subscribeToUpdateJobExecutionRejected(executing_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error); - - var executing_publish_request : iotjobs.model.UpdateJobExecutionRequest = { - thingName: argv.thing_name, - jobId: jobs_data.current_job_id, - executionNumber: jobs_data.current_execution_number, - status: iotjobs.model.JobStatus.IN_PROGRESS, - expectedVersion: jobs_data.current_version_number++ - }; - await jobs.publishUpdateJobExecution(executing_publish_request, mqtt.QoS.AtLeastOnce); - - // Update the service to let it know we are done - // ================================================== - var done_subscription_request : iotjobs.model.UpdateJobExecutionSubscriptionRequest = { - thingName: argv.thing_name, - jobId: jobs_data.current_job_id - } - await jobs.subscribeToUpdateJobExecutionAccepted(done_subscription_request, mqtt.QoS.AtLeastOnce, - (error?: iotjobs.IotJobsError, response?: iotjobs.model.UpdateJobExecutionResponse) => { - console.log("Marked job " + jobs_data.current_job_id + " SUCCEEDED"); - }); - await jobs.subscribeToUpdateJobExecutionRejected(done_subscription_request, mqtt.QoS.AtLeastOnce, on_rejected_error); - - var done_publish_request : iotjobs.model.UpdateJobExecutionRequest = { - thingName: argv.thing_name, - jobId: jobs_data.current_job_id, - executionNumber: jobs_data.current_execution_number, - status: iotjobs.model.JobStatus.SUCCEEDED, - expectedVersion: jobs_data.current_version_number++ - }; - await jobs.publishUpdateJobExecution(done_publish_request, mqtt.QoS.AtLeastOnce); - // ================================================== + await start_next_pending_job(jobs_client, argv); + await update_current_job_status(jobs_client, iotjobs.model.JobStatus.IN_PROGRESS, argv); + await sleep(1000); + await update_current_job_status(jobs_client, iotjobs.model.JobStatus.SUCCEEDED, argv); } } catch (error) { console.log(error);