Skip to content

Commit

Permalink
Refactor fleet provisioning sample further
Browse files Browse the repository at this point in the history
  • Loading branch information
sfodagain committed Jul 12, 2024
1 parent 69e7aba commit 3dd5dfb
Showing 1 changed file with 173 additions and 130 deletions.
303 changes: 173 additions & 130 deletions samples/fleet_provisioning/fleet_provisioning/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

/**
* A sample application demonstrating usage of AWS IoT Fleet provisioning.
*
* In a real world application you probably don't want to enforce synchronous behavior. But this is a sample console
* application, so all actions, like creating a certificate or registering a thing, are performed in synchronous manner.
*/

#include <aws/crt/Api.h>
#include <aws/crt/JsonObject.h>

Expand Down Expand Up @@ -34,6 +42,15 @@ static std::string getFileData(std::string const &fileName)
return str;
}

/**
* Auxiliary structure for holding data used by MQTT connection.
*/
struct ConnectionContext
{
std::promise<bool> connectionCompletedPromise;
std::promise<void> connectionClosedPromise;
};

/**
* Auxiliary structure for holding data used when creating a certificate.
*/
Expand All @@ -46,14 +63,93 @@ struct CreateCertificateContext
String token;
};

/**
* Auxiliary structure for holding data used when registering a thing.
*/
struct RegisterThingContext
{
std::promise<void> pubAckPromise;
std::promise<void> acceptedSubAckPromise;
std::promise<void> rejectedSubAckPromise;
std::promise<void> thingCreatedPromise;
};

/**
* Create MQTT3 connection.
*/
std::shared_ptr<Mqtt::MqttConnection> createConnection(const Utils::cmdData &cmdData, ConnectionContext &ctx)
{
/**
* In a real world application you probably don't want to enforce synchronous behavior
* but this is a sample console application, so we'll just do that with a promise.
*/

// Invoked when a MQTT connect has completed or failed
auto onConnectionCompleted = [&ctx](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) {
if (errorCode)
{
fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode));
ctx.connectionCompletedPromise.set_value(false);
}
else
{
fprintf(stdout, "Connection completed with return code %d\n", returnCode);
ctx.connectionCompletedPromise.set_value(true);
}
};

// Invoked when a disconnect has been completed
auto onDisconnect = [&ctx](Mqtt::MqttConnection & /*conn*/) {
{
fprintf(stdout, "Disconnect completed\n");
ctx.connectionClosedPromise.set_value();
}
};

// Create the MQTT builder and populate it with data from cmdData.
auto clientConfigBuilder =
Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str());
clientConfigBuilder.WithEndpoint(cmdData.input_endpoint);
if (cmdData.input_ca != "")
{
clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str());
}

// Create the MQTT connection from the MQTT builder
auto clientConfig = clientConfigBuilder.Build();
if (!clientConfig)
{
fprintf(
stderr,
"Client Configuration initialization failed with error %s\n",
Aws::Crt::ErrorDebugString(clientConfig.LastError()));
exit(-1);
}
Aws::Iot::MqttClient client = Aws::Iot::MqttClient();
auto connection = client.NewConnection(clientConfig);
if (!*connection)
{
fprintf(
stderr,
"MQTT Connection Creation failed with error %s\n",
Aws::Crt::ErrorDebugString(connection->LastError()));
exit(-1);
}

connection->OnConnectionCompleted = std::move(onConnectionCompleted);
connection->OnDisconnect = std::move(onDisconnect);

return connection;
}

/**
* Keys-and-Certificate workflow.
*
* @note Subscriptions created here will be active even after the function completes. So, all variables accessed in the
* callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of
* CreateCertificateContext is used to store variables used by the callbacks.
*/
void useKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateContext &ctx)
void createKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateContext &ctx)
{
auto onKeysPublishPubAck = [&ctx](int ioErr) {
if (ioErr != AWS_OP_SUCCESS)
Expand Down Expand Up @@ -143,7 +239,7 @@ void useKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateC
* callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of
* CreateCertificateContext is used to store variables used by the callbacks.
*/
void useCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, const String &csrFile)
void createCertificateFromCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, const String &csrFile)
{
auto onCsrPublishPubAck = [&ctx](int ioErr) {
if (ioErr != AWS_OP_SUCCESS)
Expand Down Expand Up @@ -229,119 +325,22 @@ void useCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, co
ctx.tokenReceivedPromise.get_future().wait();
}

int main(int argc, char *argv[])
/**
* Provision an AWS IoT thing using a pre-defined template.
*/
void registerThing(
IotIdentityClient &identityClient,
RegisterThingContext &ctx,
const Utils::cmdData &cmdData,
const String &token)
{
/************************ Setup ****************************/

// Do the global initialization for the API
ApiHandle apiHandle;
// Variables for the sample
String csrFile;
RegisterThingResponse registerThingResponse;

/**
* cmdData is the arguments/input from the command line placed into a single struct for
* use in this sample. This handles all of the command line parsing, validating, etc.
* See the Utils/CommandLineUtils for more information.
*/
Utils::cmdData cmdData = Utils::parseSampleInputFleetProvisioning(argc, argv, &apiHandle);

if (cmdData.input_csrPath != "")
{
csrFile = getFileData(cmdData.input_csrPath.c_str()).c_str();
}

/**
* In a real world application you probably don't want to enforce synchronous behavior
* but this is a sample console application, so we'll just do that with a promise.
*/
std::promise<bool> connectionCompletedPromise;
std::promise<void> connectionClosedPromise;

// Invoked when a MQTT connect has completed or failed
auto onConnectionCompleted = [&](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) {
if (errorCode)
{
fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode));
connectionCompletedPromise.set_value(false);
}
else
{
fprintf(stdout, "Connection completed with return code %d\n", returnCode);
connectionCompletedPromise.set_value(true);
}
};

// Invoked when a disconnect has been completed
auto onDisconnect = [&](Mqtt::MqttConnection & /*conn*/) {
{
fprintf(stdout, "Disconnect completed\n");
connectionClosedPromise.set_value();
}
};

// Create the MQTT builder and populate it with data from cmdData.
auto clientConfigBuilder =
Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str());
clientConfigBuilder.WithEndpoint(cmdData.input_endpoint);
if (cmdData.input_ca != "")
{
clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str());
}

// Create the MQTT connection from the MQTT builder
auto clientConfig = clientConfigBuilder.Build();
if (!clientConfig)
{
fprintf(
stderr,
"Client Configuration initialization failed with error %s\n",
Aws::Crt::ErrorDebugString(clientConfig.LastError()));
exit(-1);
}
Aws::Iot::MqttClient client = Aws::Iot::MqttClient();
auto connection = client.NewConnection(clientConfig);
if (!*connection)
{
fprintf(
stderr,
"MQTT Connection Creation failed with error %s\n",
Aws::Crt::ErrorDebugString(connection->LastError()));
exit(-1);
}

connection->OnConnectionCompleted = std::move(onConnectionCompleted);
connection->OnDisconnect = std::move(onDisconnect);

/************************ Run the sample ****************************/

fprintf(stdout, "Connecting...\n");
if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0))
{
fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError()));
exit(-1);
}

if (!connectionCompletedPromise.get_future().get())
{
return -1;
}

IotIdentityClient identityClient(connection);

std::promise<void> registerPublishPubAckCompletedPromise;
std::promise<void> registerAcceptedSubAckCompletedPromise;
std::promise<void> registerRejectedSubAckCompletedPromise;
std::promise<void> registerAcceptedCompletedPromise;

auto onRegisterAcceptedSubAck = [&](int ioErr) {
if (ioErr != AWS_OP_SUCCESS)
{
fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr));
exit(-1);
}

registerAcceptedSubAckCompletedPromise.set_value();
ctx.acceptedSubAckPromise.set_value();
};

auto onRegisterRejectedSubAck = [&](int ioErr) {
Expand All @@ -350,14 +349,14 @@ int main(int argc, char *argv[])
fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr));
exit(-1);
}
registerRejectedSubAckCompletedPromise.set_value();
ctx.rejectedSubAckPromise.set_value();
};

auto onRegisterAccepted = [&](RegisterThingResponse *response, int ioErr) {
if (ioErr == AWS_OP_SUCCESS)
{
fprintf(stdout, "RegisterThingResponse ThingName: %s.\n", response->ThingName->c_str());
registerAcceptedCompletedPromise.set_value();
ctx.thingCreatedPromise.set_value();
}
else
{
Expand Down Expand Up @@ -389,22 +388,9 @@ int main(int argc, char *argv[])
fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr));
exit(-1);
}

registerPublishPubAckCompletedPromise.set_value();
ctx.pubAckPromise.set_value();
};

// Create certificate.
CreateCertificateContext certificateContext;
if (csrFile.empty())
{
useKeysAndCertificate(identityClient, certificateContext);
}
else
{
useCsr(identityClient, certificateContext, csrFile);
}

// After certificate is obtained, it's time to register a thing.
fprintf(stdout, "Subscribing to RegisterThing Accepted and Rejected topics\n");
RegisterThingSubscriptionRequest registerSubscriptionRequest;
registerSubscriptionRequest.TemplateName = cmdData.input_templateName;
Expand All @@ -416,8 +402,8 @@ int main(int argc, char *argv[])
registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterRejected, onRegisterRejectedSubAck);

// Wait for the subscriptions to the accept and reject RegisterThing topics to be established.
registerAcceptedSubAckCompletedPromise.get_future().wait();
registerRejectedSubAckCompletedPromise.get_future().wait();
ctx.acceptedSubAckPromise.get_future().wait();
ctx.rejectedSubAckPromise.get_future().wait();

fprintf(stdout, "Publishing to RegisterThing topic\n");
RegisterThingRequest registerThingRequest;
Expand All @@ -436,18 +422,75 @@ int main(int argc, char *argv[])
registerThingRequest.Parameters = params;
// NOTE: In a real application creating multiple certificates you'll probably need to protect token var with
// a critical section. This sample makes only one request for a certificate, so no data race is possible.
registerThingRequest.CertificateOwnershipToken = certificateContext.token;
registerThingRequest.CertificateOwnershipToken = token;

identityClient.PublishRegisterThing(registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishPubAck);
registerPublishPubAckCompletedPromise.get_future().wait();
ctx.pubAckPromise.get_future().wait();

// Wait for registering a thing to succeed.
registerAcceptedCompletedPromise.get_future().wait();
ctx.thingCreatedPromise.get_future().wait();
}

int main(int argc, char *argv[])
{
/************************ Setup ****************************/

// Do the global initialization for the API
ApiHandle apiHandle;
// Variables for the sample
String csrFile;

/**
* cmdData is the arguments/input from the command line placed into a single struct for
* use in this sample. This handles all of the command line parsing, validating, etc.
* See the Utils/CommandLineUtils for more information.
*/
Utils::cmdData cmdData = Utils::parseSampleInputFleetProvisioning(argc, argv, &apiHandle);

if (cmdData.input_csrPath != "")
{
csrFile = getFileData(cmdData.input_csrPath.c_str()).c_str();
}

ConnectionContext connectionContext;
auto connection = createConnection(cmdData, connectionContext);

/************************ Run the sample ****************************/

fprintf(stdout, "Connecting...\n");
if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0))
{
fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError()));
exit(-1);
}

if (!connectionContext.connectionCompletedPromise.get_future().get())
{
exit(-1);
}

// Create fleet provisioning client.
IotIdentityClient identityClient(connection);

// Create certificate.
CreateCertificateContext certificateContext;
if (csrFile.empty())
{
createKeysAndCertificate(identityClient, certificateContext);
}
else
{
createCertificateFromCsr(identityClient, certificateContext, csrFile);
}

// After certificate is obtained, it's time to register a thing.
RegisterThingContext registerThingContext;
registerThing(identityClient, registerThingContext, cmdData, certificateContext.token);

// Disconnect
if (connection->Disconnect())
{
connectionClosedPromise.get_future().wait();
connectionContext.connectionClosedPromise.get_future().wait();
}

return 0;
Expand Down

0 comments on commit 3dd5dfb

Please sign in to comment.