diff --git a/ArmoniK.SDK.Client.Test/include/End2EndHandlers.h b/ArmoniK.SDK.Client.Test/include/End2EndHandlers.h index 7bbb1f9..36c4256 100644 --- a/ArmoniK.SDK.Client.Test/include/End2EndHandlers.h +++ b/ArmoniK.SDK.Client.Test/include/End2EndHandlers.h @@ -1,6 +1,8 @@ #pragma once +#include "armonik/common/logger/logger.h" #include "armonik/sdk/client/IServiceInvocationHandler.h" +#include class PythonTestWorkerHandler final : public ArmoniK::Sdk::Client::IServiceInvocationHandler { public: @@ -26,8 +28,10 @@ class AddServiceHandler : public ArmoniK::Sdk::Client::IServiceInvocationHandler class EchoServiceHandler final : public ArmoniK::Sdk::Client::IServiceInvocationHandler { public: + explicit EchoServiceHandler(armonik::api::common::logger::Logger &logger); void HandleResponse(const std::string &result_payload, const std::string &taskId) override; void HandleError(const std::exception &e, const std::string &taskId) override; bool received = false; bool is_error = false; + armonik::api::common::logger::LocalLogger logger; }; diff --git a/ArmoniK.SDK.Client.Test/src/ArmoniK.SDK.Client.Test.cpp b/ArmoniK.SDK.Client.Test/src/ArmoniK.SDK.Client.Test.cpp index 8a3058e..c6d743b 100644 --- a/ArmoniK.SDK.Client.Test/src/ArmoniK.SDK.Client.Test.cpp +++ b/ArmoniK.SDK.Client.Test/src/ArmoniK.SDK.Client.Test.cpp @@ -66,7 +66,7 @@ TEST(testSDK, testEcho) { ASSERT_EQ(args[0], 'A'); // Create the handler - auto handler = std::make_shared(); + auto handler = std::make_shared(logger); // Submit a task auto tasks = service.Submit({ArmoniK::Sdk::Common::TaskPayload("EchoService", args)}, handler); diff --git a/ArmoniK.SDK.Client.Test/src/End2EndHandlers.cpp b/ArmoniK.SDK.Client.Test/src/End2EndHandlers.cpp index 24cb714..c300003 100644 --- a/ArmoniK.SDK.Client.Test/src/End2EndHandlers.cpp +++ b/ArmoniK.SDK.Client.Test/src/End2EndHandlers.cpp @@ -44,18 +44,23 @@ void AddServiceHandler::HandleError(const std::exception &e, const std::string & std::cerr << "HANDLE ERROR : Error for task id " << taskId << " : " << e.what() << std::endl; } void EchoServiceHandler::HandleResponse(const std::string &result_payload, const std::string &taskId) { - std::cout << "HANDLE RESPONSE : Received result of size " << result_payload.size() << " for taskId " << taskId - << "\nContent : "; - std::cout.write(result_payload.data(), result_payload.size()) << "\nRaw : "; + std::stringstream ss; + ss << "HANDLE RESPONSE : Received result of size " << result_payload.size() << " for taskId " << taskId + << "\nContent : "; + ss.write(result_payload.data(), result_payload.size()) << "\nRaw : "; for (char c : result_payload) { - std::cout << static_cast(c) << ' '; + ss << static_cast(c) << ' '; } - std::cout << std::endl; + ss << std::endl; + logger.log(armonik::api::common::logger::Level::Debug, ss.str()); received = true; is_error = false; } void EchoServiceHandler::HandleError(const std::exception &e, const std::string &taskId) { - std::cerr << "HANDLE ERROR : Error for task id " << taskId << " : " << e.what() << std::endl; + std::stringstream ss; + ss << "HANDLE ERROR : Error for task id " << taskId << " : " << e.what() << std::endl; + logger.log(armonik::api::common::logger::Level::Debug, ss.str()); received = true; is_error = true; } +EchoServiceHandler::EchoServiceHandler(armonik::api::common::logger::Logger &logger) : logger(logger.local()) {} diff --git a/ArmoniK.SDK.Client.Test/src/SessionHandlingTest.cpp b/ArmoniK.SDK.Client.Test/src/SessionHandlingTest.cpp index 1e7fe53..bfc9671 100644 --- a/ArmoniK.SDK.Client.Test/src/SessionHandlingTest.cpp +++ b/ArmoniK.SDK.Client.Test/src/SessionHandlingTest.cpp @@ -99,7 +99,7 @@ TEST(SessionService, reopen_test) { response.clear_tasks(); // Send 1 task - auto handler = std::make_shared(); + auto handler = std::make_shared(logger); auto task_ids = service.Submit(generate_payloads(1), handler); ASSERT_EQ(task_ids.size(), 1); @@ -151,7 +151,7 @@ TEST(SessionService, drop_after_done_test) { const auto &session = service.getSession(); ASSERT_FALSE(session.empty()); - auto handler = std::make_shared(); + auto handler = std::make_shared(logger); auto task_ids = service.Submit(generate_payloads(1), handler); ASSERT_EQ(task_ids.size(), 1); @@ -197,7 +197,7 @@ TEST(SessionService, drop_before_done_test) { ASSERT_FALSE(session.empty()); // Submit 100 tasks - auto handler = std::make_shared(); + auto handler = std::make_shared(logger); auto task_ids = service.Submit(generate_payloads(100), handler); ASSERT_EQ(task_ids.size(), 100); @@ -225,7 +225,7 @@ TEST(SessionService, cleanup_tasks) { ASSERT_FALSE(session.empty()); // Submit 2 tasks - auto handler = std::make_shared(); + auto handler = std::make_shared(logger); auto task_ids = service.Submit(generate_payloads(2), handler); ASSERT_EQ(task_ids.size(), 2); @@ -261,9 +261,37 @@ TEST(SessionService, cleanup_tasks) { // Should be empty for the cleaned up one ASSERT_TRUE(download_response.data_chunk().empty()); } else { - // Shouldn't be empty for the non cleaned up one + // Shouldn't be empty for the non-cleaned up one ASSERT_FALSE(download_response.data_chunk().empty()); } download_response.clear_data_chunk(); } } + +TEST(WaitOption, timeout_test) { + auto p = init(); + auto properties = std::move(std::get<0>(p)); + auto logger = std::move(std::get<1>(p)); + ArmoniK::Sdk::Client::Internal::ChannelPool pool(properties, logger); + auto channel_guard = pool.GetChannel(); + + // Create service + ArmoniK::Sdk::Client::SessionService service(properties, logger); + const auto &session = service.getSession(); + ASSERT_FALSE(session.empty()); + + // Submit 500 tasks + auto handler = std::make_shared(logger); + auto task_ids = service.Submit(generate_payloads(500), handler); + ASSERT_EQ(task_ids.size(), 500); + + // Wait with a very short delay + ArmoniK::Sdk::Client::WaitOptions waitOptions; + waitOptions.timeout = 100; + service.WaitResults({}, ArmoniK::Sdk::Client::All, waitOptions); + handler->received = false; + + // Wait for the rest, should still have tasks to retrieve + service.WaitResults({}, ArmoniK::Sdk::Client::All); + ASSERT_TRUE(handler->received); +} diff --git a/ArmoniK.SDK.Client/include/armonik/sdk/client/WaitBehavior.h b/ArmoniK.SDK.Client/include/armonik/sdk/client/WaitBehavior.h index c52ddfe..bf2315d 100644 --- a/ArmoniK.SDK.Client/include/armonik/sdk/client/WaitBehavior.h +++ b/ArmoniK.SDK.Client/include/armonik/sdk/client/WaitBehavior.h @@ -1,4 +1,5 @@ #pragma once +#include namespace ArmoniK { namespace Sdk { @@ -11,6 +12,11 @@ struct WaitOptions { * @brief Time in milliseconds for result status polling */ unsigned int polling_ms = 500; + + /** + * @brief Timeout before returning in milliseconds + */ + unsigned int timeout = UINT_MAX; }; enum WaitBehavior { diff --git a/ArmoniK.SDK.Client/src/SessionServiceImpl.cpp b/ArmoniK.SDK.Client/src/SessionServiceImpl.cpp index 5b60d81..6f1087b 100644 --- a/ArmoniK.SDK.Client/src/SessionServiceImpl.cpp +++ b/ArmoniK.SDK.Client/src/SessionServiceImpl.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -127,6 +128,7 @@ std::vector SessionServiceImpl::Submit(const std::vector task_ids, WaitBehavior behavior, const WaitOptions &options) { + auto function_stop = std::chrono::steady_clock::now() + std::chrono::milliseconds(options.timeout); bool hasWaitList = !task_ids.empty(); size_t initialTaskIds_size = task_ids.size(); @@ -241,7 +243,9 @@ void SessionServiceImpl::WaitResults(std::set task_ids, WaitBehavio if ((stopOnFirst && task_ids.size() < initialTaskIds_size) || (breakOnError && hasError)) { break; } - + if (std::chrono::steady_clock::now() > function_stop) { + break; + } std::this_thread::sleep_for(std::chrono::milliseconds(options.polling_ms)); } }