Skip to content

Commit

Permalink
feat: Added timeout to wait options (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
ddiakiteaneo authored Oct 9, 2023
2 parents f572dd5 + 0390077 commit e187295
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 13 deletions.
4 changes: 4 additions & 0 deletions ArmoniK.SDK.Client.Test/include/End2EndHandlers.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include "armonik/common/logger/logger.h"
#include "armonik/sdk/client/IServiceInvocationHandler.h"
#include <memory>

class PythonTestWorkerHandler final : public ArmoniK::Sdk::Client::IServiceInvocationHandler {
public:
Expand All @@ -26,8 +28,10 @@ class AddServiceHandler : public ArmoniK::Sdk::Client::IServiceInvocationHandler

class EchoServiceHandler final : public ArmoniK::Sdk::Client::IServiceInvocationHandler {
public:
explicit EchoServiceHandler(armonik::api::common::logger::Logger &logger);
void HandleResponse(const std::string &result_payload, const std::string &taskId) override;
void HandleError(const std::exception &e, const std::string &taskId) override;
bool received = false;
bool is_error = false;
armonik::api::common::logger::LocalLogger logger;
};
2 changes: 1 addition & 1 deletion ArmoniK.SDK.Client.Test/src/ArmoniK.SDK.Client.Test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ TEST(testSDK, testEcho) {
ASSERT_EQ(args[0], 'A');

// Create the handler
auto handler = std::make_shared<EchoServiceHandler>();
auto handler = std::make_shared<EchoServiceHandler>(logger);

// Submit a task
auto tasks = service.Submit({ArmoniK::Sdk::Common::TaskPayload("EchoService", args)}, handler);
Expand Down
17 changes: 11 additions & 6 deletions ArmoniK.SDK.Client.Test/src/End2EndHandlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,23 @@ void AddServiceHandler::HandleError(const std::exception &e, const std::string &
std::cerr << "HANDLE ERROR : Error for task id " << taskId << " : " << e.what() << std::endl;
}
void EchoServiceHandler::HandleResponse(const std::string &result_payload, const std::string &taskId) {
std::cout << "HANDLE RESPONSE : Received result of size " << result_payload.size() << " for taskId " << taskId
<< "\nContent : ";
std::cout.write(result_payload.data(), result_payload.size()) << "\nRaw : ";
std::stringstream ss;
ss << "HANDLE RESPONSE : Received result of size " << result_payload.size() << " for taskId " << taskId
<< "\nContent : ";
ss.write(result_payload.data(), result_payload.size()) << "\nRaw : ";
for (char c : result_payload) {
std::cout << static_cast<int>(c) << ' ';
ss << static_cast<int>(c) << ' ';
}
std::cout << std::endl;
ss << std::endl;
logger.log(armonik::api::common::logger::Level::Debug, ss.str());
received = true;
is_error = false;
}
void EchoServiceHandler::HandleError(const std::exception &e, const std::string &taskId) {
std::cerr << "HANDLE ERROR : Error for task id " << taskId << " : " << e.what() << std::endl;
std::stringstream ss;
ss << "HANDLE ERROR : Error for task id " << taskId << " : " << e.what() << std::endl;
logger.log(armonik::api::common::logger::Level::Debug, ss.str());
received = true;
is_error = true;
}
EchoServiceHandler::EchoServiceHandler(armonik::api::common::logger::Logger &logger) : logger(logger.local()) {}
38 changes: 33 additions & 5 deletions ArmoniK.SDK.Client.Test/src/SessionHandlingTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ TEST(SessionService, reopen_test) {
response.clear_tasks();

// Send 1 task
auto handler = std::make_shared<EchoServiceHandler>();
auto handler = std::make_shared<EchoServiceHandler>(logger);
auto task_ids = service.Submit(generate_payloads(1), handler);
ASSERT_EQ(task_ids.size(), 1);

Expand Down Expand Up @@ -151,7 +151,7 @@ TEST(SessionService, drop_after_done_test) {
const auto &session = service.getSession();
ASSERT_FALSE(session.empty());

auto handler = std::make_shared<EchoServiceHandler>();
auto handler = std::make_shared<EchoServiceHandler>(logger);
auto task_ids = service.Submit(generate_payloads(1), handler);
ASSERT_EQ(task_ids.size(), 1);

Expand Down Expand Up @@ -197,7 +197,7 @@ TEST(SessionService, drop_before_done_test) {
ASSERT_FALSE(session.empty());

// Submit 100 tasks
auto handler = std::make_shared<EchoServiceHandler>();
auto handler = std::make_shared<EchoServiceHandler>(logger);
auto task_ids = service.Submit(generate_payloads(100), handler);
ASSERT_EQ(task_ids.size(), 100);

Expand Down Expand Up @@ -225,7 +225,7 @@ TEST(SessionService, cleanup_tasks) {
ASSERT_FALSE(session.empty());

// Submit 2 tasks
auto handler = std::make_shared<EchoServiceHandler>();
auto handler = std::make_shared<EchoServiceHandler>(logger);
auto task_ids = service.Submit(generate_payloads(2), handler);
ASSERT_EQ(task_ids.size(), 2);

Expand Down Expand Up @@ -261,9 +261,37 @@ TEST(SessionService, cleanup_tasks) {
// Should be empty for the cleaned up one
ASSERT_TRUE(download_response.data_chunk().empty());
} else {
// Shouldn't be empty for the non cleaned up one
// Shouldn't be empty for the non-cleaned up one
ASSERT_FALSE(download_response.data_chunk().empty());
}
download_response.clear_data_chunk();
}
}

TEST(WaitOption, timeout_test) {
auto p = init();
auto properties = std::move(std::get<0>(p));
auto logger = std::move(std::get<1>(p));
ArmoniK::Sdk::Client::Internal::ChannelPool pool(properties, logger);
auto channel_guard = pool.GetChannel();

// Create service
ArmoniK::Sdk::Client::SessionService service(properties, logger);
const auto &session = service.getSession();
ASSERT_FALSE(session.empty());

// Submit 500 tasks
auto handler = std::make_shared<EchoServiceHandler>(logger);
auto task_ids = service.Submit(generate_payloads(500), handler);
ASSERT_EQ(task_ids.size(), 500);

// Wait with a very short delay
ArmoniK::Sdk::Client::WaitOptions waitOptions;
waitOptions.timeout = 100;
service.WaitResults({}, ArmoniK::Sdk::Client::All, waitOptions);
handler->received = false;

// Wait for the rest, should still have tasks to retrieve
service.WaitResults({}, ArmoniK::Sdk::Client::All);
ASSERT_TRUE(handler->received);
}
6 changes: 6 additions & 0 deletions ArmoniK.SDK.Client/include/armonik/sdk/client/WaitBehavior.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <climits>

namespace ArmoniK {
namespace Sdk {
Expand All @@ -11,6 +12,11 @@ struct WaitOptions {
* @brief Time in milliseconds for result status polling
*/
unsigned int polling_ms = 500;

/**
* @brief Timeout before returning in milliseconds
*/
unsigned int timeout = UINT_MAX;
};

enum WaitBehavior {
Expand Down
6 changes: 5 additions & 1 deletion ArmoniK.SDK.Client/src/SessionServiceImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <armonik/sdk/common/ArmoniKSdkException.h>
#include <armonik/sdk/common/Properties.h>
#include <armonik/sdk/common/TaskPayload.h>
#include <chrono>
#include <grpcpp/client_context.h>
#include <submitter_service.grpc.pb.h>
#include <thread>
Expand Down Expand Up @@ -127,6 +128,7 @@ std::vector<std::string> SessionServiceImpl::Submit(const std::vector<Common::Ta

void SessionServiceImpl::WaitResults(std::set<std::string> task_ids, WaitBehavior behavior,
const WaitOptions &options) {
auto function_stop = std::chrono::steady_clock::now() + std::chrono::milliseconds(options.timeout);

bool hasWaitList = !task_ids.empty();
size_t initialTaskIds_size = task_ids.size();
Expand Down Expand Up @@ -241,7 +243,9 @@ void SessionServiceImpl::WaitResults(std::set<std::string> task_ids, WaitBehavio
if ((stopOnFirst && task_ids.size() < initialTaskIds_size) || (breakOnError && hasError)) {
break;
}

if (std::chrono::steady_clock::now() > function_stop) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(options.polling_ms));
}
}
Expand Down

0 comments on commit e187295

Please sign in to comment.