From eebac1feb1f09901f95ec152cd384082cf0d8e0e Mon Sep 17 00:00:00 2001 From: Anastasiia Pnevskaia Date: Tue, 24 Dec 2024 13:57:49 +0100 Subject: [PATCH] Applied comments. --- src/cpp/src/block_manager.hpp | 16 +-- src/cpp/src/cache_manager.hpp | 126 ++++++++++------------- src/cpp/src/continuous_batching_impl.cpp | 3 - src/cpp/src/continuous_batching_impl.hpp | 9 -- src/cpp/src/scheduler.hpp | 61 ++++++----- tests/cpp/cache_manager.cpp | 5 - 6 files changed, 90 insertions(+), 130 deletions(-) diff --git a/src/cpp/src/block_manager.hpp b/src/cpp/src/block_manager.hpp index d964032db0..4ca263777b 100644 --- a/src/cpp/src/block_manager.hpp +++ b/src/cpp/src/block_manager.hpp @@ -195,7 +195,6 @@ class BlockAllocator { size_t m_num_layers; bool m_enable_prefix_caching; ov::genai::OverwritableBlocksHashStore m_overwriteable_blocks; - bool m_initialized = false; public: /** * Constructs the BlockAllocator. @@ -216,7 +215,9 @@ class BlockAllocator { per_layer_block_list.push_back(std::make_shared(block_id)); } } - m_initialized = true; + } + else { + m_free_blocks_num = std::vector(m_num_layers, 0); } } @@ -227,10 +228,6 @@ class BlockAllocator { void increase_kv_blocks_number(size_t new_kv_blocks_count) { OPENVINO_ASSERT(new_kv_blocks_count > m_total_num_blocks, "New blocks number should be more than previous blocks number."); - if (!m_initialized) { - m_free_blocks_num = std::vector(m_num_layers, 0); - m_initialized = true; - } size_t added_blocks = new_kv_blocks_count - m_total_num_blocks; for (auto idx = 0; idx < m_free_blocks_num.size(); idx++) { m_free_blocks_num[idx] += added_blocks; @@ -243,9 +240,6 @@ class BlockAllocator { m_total_num_blocks = new_kv_blocks_count; } - bool is_inilialized() const { - return m_initialized; - } /** * Returns the number of free blocks for a given layer. @@ -665,10 +659,6 @@ class BlockManager { return m_allocator.num_free_blocks(0); // relying on the invariant that all layers have identical number of blocks } - bool block_allocator_initialized() const { - return m_allocator.is_inilialized(); - } - /** * @param num_blocks A number of KV cache blocks * @return Whether this number of KV cache blocks may be assigned to new sequences. diff --git a/src/cpp/src/cache_manager.hpp b/src/cpp/src/cache_manager.hpp index 7358a4574d..0c04823f4f 100644 --- a/src/cpp/src/cache_manager.hpp +++ b/src/cpp/src/cache_manager.hpp @@ -44,111 +44,91 @@ class CacheManager { if (m_num_allocated_kv_blocks >= num_kv_blocks) { return; } - if (m_num_allocated_kv_blocks > 0) { - increase_cache(num_kv_blocks); - return; - } + OPENVINO_ASSERT(m_key_cache.size() == m_value_cache.size()); m_num_allocated_kv_blocks = num_kv_blocks; ov::Shape value_cache_shape = set_first_dim_and_make_static(m_device_config.get_value_cache_shape(), num_kv_blocks); ov::Shape key_cache_shape = set_first_dim_and_make_static(m_device_config.get_key_cache_shape(), num_kv_blocks); const std::string device_name = m_device_config.get_device(); + ov::Coordinate start_key{0,0,0,0}; + ov::Coordinate start_value{0,0,0,0}; + if (device_name.find("GPU") == std::string::npos) {// Allocate KV caches for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) { ov::Tensor key_cache(m_device_config.get_cache_precision(), key_cache_shape); ov::Tensor value_cache(m_device_config.get_cache_precision(), value_cache_shape); - // Some optimizations like AVX2, AVX512, AMX require a minimal shape and - // perform multiplying by zero on the excess data. Uninitialized tensor data contain NAN's, - // so NAN * 0 returns non-zero invalid data. - // So we need to set zeros to all newly allocated tensors data. - std::memset(key_cache.data(), 0, key_cache.get_byte_size()); - std::memset(value_cache.data(), 0, value_cache.get_byte_size()); - - m_key_cache.emplace_back(key_cache); - m_value_cache.emplace_back(value_cache); - - update_request_tensor(decoder_layer_id); - } - } else { - auto remote_context = m_core.get_default_context(device_name); - for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) { - ov::Tensor key_cache = remote_context.create_tensor(m_device_config.get_cache_precision(), - key_cache_shape); - ov::Tensor value_cache = remote_context.create_tensor(m_device_config.get_cache_precision(), - value_cache_shape); - - m_key_cache.emplace_back(key_cache); - m_value_cache.emplace_back(value_cache); - - update_request_tensor(decoder_layer_id); - } - } - } + auto key_cache_roi_end = static_cast(key_cache.data()); + auto value_cache_roi_end = static_cast(value_cache.data()); + size_t key_roi_size_byte = 0; + size_t value_roi_size_byte = 0; - void increase_cache(size_t num_kv_blocks) { - OPENVINO_ASSERT(num_kv_blocks > m_num_allocated_kv_blocks); - ov::Shape new_value_cache_shape = set_first_dim_and_make_static(m_device_config.get_value_cache_shape(), num_kv_blocks); - ov::Shape new_key_cache_shape = set_first_dim_and_make_static(m_device_config.get_key_cache_shape(), num_kv_blocks); + if (m_key_cache.size() > decoder_layer_id) { + ov::Coordinate end_key = m_key_cache[decoder_layer_id].get_shape(); + ov::Coordinate end_value = m_value_cache[decoder_layer_id].get_shape(); - const std::string device_name = m_device_config.get_device(); - ov::Coordinate start_key{0,0,0,0}; - ov::Coordinate start_value{0,0,0,0}; + key_roi_size_byte = m_key_cache[decoder_layer_id].get_byte_size(); + value_roi_size_byte = m_value_cache[decoder_layer_id].get_byte_size(); + key_cache_roi_end = static_cast(key_cache.data()) + key_roi_size_byte; + value_cache_roi_end = static_cast(value_cache.data()) + value_roi_size_byte; + + // copy current cache data + ov::Tensor dst_key_roi(key_cache, start_key, end_key); + ov::Tensor dst_value_roi(value_cache, start_value, end_value); - if (device_name.find("GPU") == std::string::npos) { - for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) { - ov::Coordinate end_key(m_key_cache[decoder_layer_id].get_shape()); - ov::Coordinate end_value(m_value_cache[decoder_layer_id].get_shape()); + m_key_cache[decoder_layer_id].copy_to(dst_key_roi); + m_value_cache[decoder_layer_id].copy_to(dst_value_roi); - ov::Tensor key_cache(m_device_config.get_cache_precision(), new_key_cache_shape); - ov::Tensor value_cache(m_device_config.get_cache_precision(), new_value_cache_shape); - - // copy current cache data - ov::Tensor dst_key_roi(key_cache, start_key, end_key); - ov::Tensor dst_value_roi(value_cache, start_value, end_value); - m_key_cache[decoder_layer_id].copy_to(dst_key_roi); - m_value_cache[decoder_layer_id].copy_to(dst_value_roi); + } // Some optimizations like AVX2, AVX512, AMX require a minimal shape and // perform multiplying by zero on the excess data. Uninitialized tensor data contain NAN's, // so NAN * 0 returns non-zero invalid data. // So we need to set zeros to all newly allocated tensors data. - auto key_cache_roi_end = static_cast(key_cache.data()) + dst_key_roi.get_byte_size(); - auto value_cache_roi_end = static_cast(value_cache.data()) + dst_value_roi.get_byte_size(); - std::memset(key_cache_roi_end, 0, key_cache.get_byte_size() - dst_key_roi.get_byte_size()); - std::memset(value_cache_roi_end, 0, value_cache.get_byte_size() - dst_value_roi.get_byte_size()); + std::memset(key_cache_roi_end, 0, key_cache.get_byte_size() - key_roi_size_byte); + std::memset(value_cache_roi_end, 0, value_cache.get_byte_size() - value_roi_size_byte); // set new cache tensors - m_key_cache[decoder_layer_id] = key_cache; - m_value_cache[decoder_layer_id] = value_cache; + if (m_key_cache.size() > decoder_layer_id) { + m_key_cache[decoder_layer_id] = key_cache; + m_value_cache[decoder_layer_id] = value_cache; + } + else { + m_key_cache.emplace_back(key_cache); + m_value_cache.emplace_back(value_cache); + } update_request_tensor(decoder_layer_id); } } else { auto remote_context = m_core.get_default_context(device_name); for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) { - ov::Coordinate end_key(m_key_cache[decoder_layer_id].get_shape()); - ov::Coordinate end_value(m_value_cache[decoder_layer_id].get_shape()); - - ov::RemoteTensor key_cache = remote_context.create_tensor(m_device_config.get_cache_precision(), new_key_cache_shape); - ov::RemoteTensor value_cache = remote_context.create_tensor(m_device_config.get_cache_precision(), new_value_cache_shape); - - // copy current cache data - ov::RemoteTensor dst_key_roi(key_cache, start_key, end_key); - ov::RemoteTensor dst_value_roi(value_cache, start_value, end_value); - - dst_key_roi.copy_from(m_key_cache[decoder_layer_id]); - dst_value_roi.copy_from(m_value_cache[decoder_layer_id]); - - // set new cache tensors - m_key_cache[decoder_layer_id] = key_cache; - m_value_cache[decoder_layer_id] = value_cache; + ov::Tensor key_cache = remote_context.create_tensor(m_device_config.get_cache_precision(), + key_cache_shape); + ov::Tensor value_cache = remote_context.create_tensor(m_device_config.get_cache_precision(), + value_cache_shape); + if (m_key_cache.size() > decoder_layer_id) { + ov::Coordinate end_key = m_key_cache[decoder_layer_id].get_shape(); + ov::Coordinate end_value = m_value_cache[decoder_layer_id].get_shape(); + + // copy current cache data + ov::RemoteTensor dst_key_roi(key_cache, start_key, end_key); + ov::RemoteTensor dst_value_roi(value_cache, start_value, end_value); + dst_key_roi.copy_from(m_key_cache[decoder_layer_id]); + dst_value_roi.copy_from(m_value_cache[decoder_layer_id]); + + m_key_cache[decoder_layer_id] = key_cache; + m_value_cache[decoder_layer_id] = value_cache; + } + else { + m_key_cache.emplace_back(key_cache); + m_value_cache.emplace_back(value_cache); + } update_request_tensor(decoder_layer_id); } } - m_num_allocated_kv_blocks = num_kv_blocks; } ov::Tensor get_key_cache(size_t decoder_layer_id) const { diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index a5eaa6e49b..52ec6a8302 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -32,7 +32,6 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::ContinuousBatchingImpl( bool is_need_per_layer_cache_control = scheduler_config.use_cache_eviction; utils::apply_paged_attention_transformations(model, device_config, is_need_per_layer_cache_control); - m_core = std::make_shared(core); init(model, scheduler_config, compile_properties, device_config, core); } @@ -78,8 +77,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::init( // If eos_token_id was not provided, take value if (m_generation_config.eos_token_id == -1) m_generation_config.set_eos_token_id(m_tokenizer.get_eos_token_id()); - - m_device_config = std::make_shared(device_config); }; diff --git a/src/cpp/src/continuous_batching_impl.hpp b/src/cpp/src/continuous_batching_impl.hpp index ad0000ee68..8da05c6dfa 100644 --- a/src/cpp/src/continuous_batching_impl.hpp +++ b/src/cpp/src/continuous_batching_impl.hpp @@ -14,7 +14,6 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc std::shared_ptr m_cache_manager; std::shared_ptr m_model_runner; std::shared_ptr m_sampler; - std::shared_ptr m_device_config; // current requests to process std::vector m_requests; @@ -31,14 +30,6 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc // flag to enable validation mode for sampler bool m_is_validation_mode_enabled = false; - // dynamic kv-cache allocation params - const size_t m_kv_blocks_initial_multiplier = 2; - const float m_cache_growth_factor = 2; // commmon values 1.5 or 2 - const float m_percentage_threshold_for_cache_increase = 100; - - bool m_dynamic_memory_allocation = false; - std::shared_ptr m_core; - #ifdef DEBUG_CACHE_STATE_DUMP size_t step_count = 0; #endif diff --git a/src/cpp/src/scheduler.hpp b/src/cpp/src/scheduler.hpp index cbd8ef64a2..7ae7a40a51 100644 --- a/src/cpp/src/scheduler.hpp +++ b/src/cpp/src/scheduler.hpp @@ -51,17 +51,12 @@ class Scheduler { m_config(config), m_block_manager(m_config.num_kv_blocks, m_config.enable_prefix_caching, block_size, num_layers) { - // allocate kv-cache if the number of kv blocks is determined, - // otherwise cache will be allocated dynamically - if (m_block_manager.block_allocator_initialized()) { - m_cache_manager->allocate_cache_if_needed(m_block_manager.get_total_number_of_kv_blocks()); - } OPENVINO_ASSERT(num_layers != 0, "num_layers must be non-zero"); } Output schedule(std::vector& sequence_groups) { Output scheduler_output; - if (!m_block_manager.block_allocator_initialized()) { + if (m_block_manager.get_total_number_of_kv_blocks() == 0) { _initialize_cache(sequence_groups); } @@ -83,6 +78,7 @@ class Scheduler { } } + m_cache_manager->allocate_cache_if_needed(m_block_manager.get_total_number_of_kv_blocks()); _clear_waiting_sequences(sequence_groups); scheduler_output.m_cache_usage = m_block_manager.get_used_percentage(); return scheduler_output; @@ -255,8 +251,10 @@ class Scheduler { size_t available_slots = currently_allocated_token_slots - occupied_token_slots, required_slots = num_scheduled_tokens > available_slots ? num_scheduled_tokens - available_slots : 0; size_t num_required_blocks = (required_slots + block_size - 1) / block_size, num_free_blocks = m_block_manager.num_free_blocks(); - if (num_free_blocks == 0) { - _try_increase_cache(); + while (num_required_blocks > num_free_blocks) { + if (!_try_increase_cache()) { + break; + } } size_t num_scheduled_blocks = std::min(num_required_blocks, num_free_blocks); // some scheduled blocks can be no fully occupied, so we need to take min between num_scheduled_blocks @@ -310,15 +308,18 @@ class Scheduler { size_t num_scheduled_tokens_per_seq = std::min(available_tokens_per_seq_in_megabatch, num_available_tokens_per_seq); sequence_group->schedule_tokens(num_scheduled_tokens_per_seq); + while (!m_block_manager.can_append_slots(sequence_group)){ + if (!_try_increase_cache()) { + break; + } + } + _apply_preemption(sequence_group_id, sequence_groups); // if we can't preemt any more sequences, clear scheduled tokens and move to next sequence - if (!m_block_manager.can_append_slots(sequence_group)){ - _try_increase_cache(); - if (!m_block_manager.can_append_slots(sequence_group)) { - sequence_group->clear_scheduled_tokens(); - continue; - } + if (!m_block_manager.can_append_slots(sequence_group)) { + sequence_group->clear_scheduled_tokens(); + continue; } // allocate new slots @@ -394,11 +395,13 @@ class Scheduler { // apply KV cache limitations size_t block_size = get_block_size(); const size_t num_required_blocks = (sequence_len + block_size - 1) / block_size; - if (!m_block_manager.can_allocate_blocks(num_required_blocks)) { - _try_increase_cache(); - if (!m_block_manager.can_allocate_blocks(num_required_blocks)) + while (!m_block_manager.can_allocate_blocks(num_required_blocks)){ + if (!_try_increase_cache()) { break; + } } + if (!m_block_manager.can_allocate_blocks(num_required_blocks)) + break; // add scheduling information { @@ -465,25 +468,26 @@ class Scheduler { } void _initialize_cache(const std::vector& sequence_groups) { - size_t seq_length_sum = 0; + size_t blocks_sum = 0; for (auto idx = 0; idx < sequence_groups.size(); idx++) { auto seq_length = sequence_groups[idx]->get_prompt_len() * m_kv_blocks_initial_multiplier; auto gen_config = sequence_groups[idx]->get_sampling_parameters(); seq_length = std::min(seq_length, sequence_groups[idx]->get_prompt_len() + gen_config.get_max_new_tokens(sequence_groups[idx]->get_prompt_len())); - if (sequence_groups[idx]->get_sampling_parameters().is_beam_search()) { - - seq_length *= sequence_groups[idx]->get_sampling_parameters().num_beams; + size_t blocks_num = std::ceil((float)seq_length / m_block_manager.get_block_size()); + if (gen_config.do_sample && gen_config.is_beam_search()) { + blocks_num *= gen_config.num_beams; + } else if (gen_config.do_sample && gen_config.is_multinomial()) { + blocks_num *= gen_config.num_return_sequences; } - seq_length_sum += seq_length; + blocks_sum += blocks_num; } - m_block_manager.increase_kv_blocks_number(seq_length_sum); - m_cache_manager->allocate_cache_if_needed(m_block_manager.get_total_number_of_kv_blocks()); + m_block_manager.increase_kv_blocks_number(blocks_sum); m_dynamic_memory_allocation = true; } - void _try_increase_cache() { + bool _try_increase_cache() { if (!m_dynamic_memory_allocation) { - return; + return false; } auto device_config = m_cache_manager->get_device_config(); auto device = device_config->get_device(); @@ -503,9 +507,12 @@ class Scheduler { if (possible_blocks_to_add > 0) { m_block_manager.increase_kv_blocks_number(current_num_of_kv_blocks + possible_blocks_to_add); } + else { + return false; + } } } - m_cache_manager->allocate_cache_if_needed(m_block_manager.get_total_number_of_kv_blocks()); + return true; } }; diff --git a/tests/cpp/cache_manager.cpp b/tests/cpp/cache_manager.cpp index a3be00b226..7f07980389 100644 --- a/tests/cpp/cache_manager.cpp +++ b/tests/cpp/cache_manager.cpp @@ -59,7 +59,6 @@ TEST(TestCacheManager, test_cache_size_param) { ov::InferRequest request = core.compile_model(get_dummy_model(num_decoder_layers)).create_infer_request(); auto cache_manager = std::make_shared(device_config, request, core); auto block_manager = BlockManager(device_config.get_num_kv_blocks(), false, device_config.get_block_size(), device_config.get_num_layers()); - OPENVINO_ASSERT(block_manager.block_allocator_initialized()); cache_manager->allocate_cache_if_needed(block_manager.get_total_number_of_kv_blocks()); ASSERT_EQ(get_total_allocated_bytes(cache_manager, num_decoder_layers), 2146959360); @@ -82,7 +81,6 @@ TEST(TestCacheManager, test_kv_blocks_param) { ov::InferRequest request = core.compile_model(get_dummy_model(num_decoder_layers)).create_infer_request(); auto cache_manager = std::make_shared(device_config, request, core); auto block_manager = BlockManager(device_config.get_num_kv_blocks(), false, device_config.get_block_size(), device_config.get_num_layers()); - OPENVINO_ASSERT(block_manager.block_allocator_initialized()); OPENVINO_ASSERT(block_manager.get_total_number_of_kv_blocks(), scheduler_config.num_kv_blocks); } @@ -107,11 +105,9 @@ TEST(TestCacheManager, test_dynamic_cache_increase) { ov::InferRequest request = core.compile_model(get_dummy_model(num_decoder_layers)).create_infer_request(); auto cache_manager = std::make_shared(device_config, request, core); auto block_manager = BlockManager(device_config.get_num_kv_blocks(), false, device_config.get_block_size(), device_config.get_num_layers()); - OPENVINO_ASSERT(!block_manager.block_allocator_initialized()); // check initial cache allocation block_manager.increase_kv_blocks_number(100); - OPENVINO_ASSERT(block_manager.block_allocator_initialized()); OPENVINO_ASSERT(block_manager.get_total_number_of_kv_blocks(), 100); cache_manager->allocate_cache_if_needed(block_manager.get_total_number_of_kv_blocks()); @@ -120,7 +116,6 @@ TEST(TestCacheManager, test_dynamic_cache_increase) { // check cache increase block_manager.increase_kv_blocks_number(200); - OPENVINO_ASSERT(block_manager.block_allocator_initialized()); OPENVINO_ASSERT(block_manager.get_total_number_of_kv_blocks(), 200); cache_manager->allocate_cache_if_needed(block_manager.get_total_number_of_kv_blocks());