Skip to content

Commit

Permalink
Applied comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
popovaan committed Dec 24, 2024
1 parent f04c06d commit eebac1f
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 130 deletions.
16 changes: 3 additions & 13 deletions src/cpp/src/block_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ class BlockAllocator {
size_t m_num_layers;
bool m_enable_prefix_caching;
ov::genai::OverwritableBlocksHashStore m_overwriteable_blocks;
bool m_initialized = false;
public:
/**
* Constructs the BlockAllocator.
Expand All @@ -216,7 +215,9 @@ class BlockAllocator {
per_layer_block_list.push_back(std::make_shared<KVCacheBlock>(block_id));
}
}
m_initialized = true;
}
else {
m_free_blocks_num = std::vector<size_t>(m_num_layers, 0);
}
}

Expand All @@ -227,10 +228,6 @@ class BlockAllocator {

void increase_kv_blocks_number(size_t new_kv_blocks_count) {
OPENVINO_ASSERT(new_kv_blocks_count > m_total_num_blocks, "New blocks number should be more than previous blocks number.");
if (!m_initialized) {
m_free_blocks_num = std::vector<size_t>(m_num_layers, 0);
m_initialized = true;
}
size_t added_blocks = new_kv_blocks_count - m_total_num_blocks;
for (auto idx = 0; idx < m_free_blocks_num.size(); idx++) {
m_free_blocks_num[idx] += added_blocks;
Expand All @@ -243,9 +240,6 @@ class BlockAllocator {
m_total_num_blocks = new_kv_blocks_count;
}

bool is_inilialized() const {
return m_initialized;
}

/**
* Returns the number of free blocks for a given layer.
Expand Down Expand Up @@ -665,10 +659,6 @@ class BlockManager {
return m_allocator.num_free_blocks(0); // relying on the invariant that all layers have identical number of blocks
}

bool block_allocator_initialized() const {
return m_allocator.is_inilialized();
}

/**
* @param num_blocks A number of KV cache blocks
* @return Whether this number of KV cache blocks may be assigned to new sequences.
Expand Down
126 changes: 53 additions & 73 deletions src/cpp/src/cache_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,111 +44,91 @@ class CacheManager {
if (m_num_allocated_kv_blocks >= num_kv_blocks) {
return;
}
if (m_num_allocated_kv_blocks > 0) {
increase_cache(num_kv_blocks);
return;
}
OPENVINO_ASSERT(m_key_cache.size() == m_value_cache.size());
m_num_allocated_kv_blocks = num_kv_blocks;
ov::Shape value_cache_shape = set_first_dim_and_make_static(m_device_config.get_value_cache_shape(), num_kv_blocks);
ov::Shape key_cache_shape = set_first_dim_and_make_static(m_device_config.get_key_cache_shape(), num_kv_blocks);

const std::string device_name = m_device_config.get_device();

ov::Coordinate start_key{0,0,0,0};
ov::Coordinate start_value{0,0,0,0};

if (device_name.find("GPU") == std::string::npos) {// Allocate KV caches
for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) {
ov::Tensor key_cache(m_device_config.get_cache_precision(), key_cache_shape);
ov::Tensor value_cache(m_device_config.get_cache_precision(), value_cache_shape);

// Some optimizations like AVX2, AVX512, AMX require a minimal shape and
// perform multiplying by zero on the excess data. Uninitialized tensor data contain NAN's,
// so NAN * 0 returns non-zero invalid data.
// So we need to set zeros to all newly allocated tensors data.
std::memset(key_cache.data(), 0, key_cache.get_byte_size());
std::memset(value_cache.data(), 0, value_cache.get_byte_size());

m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);

update_request_tensor(decoder_layer_id);
}
} else {
auto remote_context = m_core.get_default_context(device_name);
for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) {
ov::Tensor key_cache = remote_context.create_tensor(m_device_config.get_cache_precision(),
key_cache_shape);
ov::Tensor value_cache = remote_context.create_tensor(m_device_config.get_cache_precision(),
value_cache_shape);

m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);

update_request_tensor(decoder_layer_id);
}
}
}
auto key_cache_roi_end = static_cast<unsigned char*>(key_cache.data());
auto value_cache_roi_end = static_cast<unsigned char*>(value_cache.data());
size_t key_roi_size_byte = 0;
size_t value_roi_size_byte = 0;

void increase_cache(size_t num_kv_blocks) {
OPENVINO_ASSERT(num_kv_blocks > m_num_allocated_kv_blocks);
ov::Shape new_value_cache_shape = set_first_dim_and_make_static(m_device_config.get_value_cache_shape(), num_kv_blocks);
ov::Shape new_key_cache_shape = set_first_dim_and_make_static(m_device_config.get_key_cache_shape(), num_kv_blocks);
if (m_key_cache.size() > decoder_layer_id) {
ov::Coordinate end_key = m_key_cache[decoder_layer_id].get_shape();
ov::Coordinate end_value = m_value_cache[decoder_layer_id].get_shape();

const std::string device_name = m_device_config.get_device();
ov::Coordinate start_key{0,0,0,0};
ov::Coordinate start_value{0,0,0,0};
key_roi_size_byte = m_key_cache[decoder_layer_id].get_byte_size();
value_roi_size_byte = m_value_cache[decoder_layer_id].get_byte_size();
key_cache_roi_end = static_cast<unsigned char*>(key_cache.data()) + key_roi_size_byte;
value_cache_roi_end = static_cast<unsigned char*>(value_cache.data()) + value_roi_size_byte;

// copy current cache data
ov::Tensor dst_key_roi(key_cache, start_key, end_key);
ov::Tensor dst_value_roi(value_cache, start_value, end_value);

if (device_name.find("GPU") == std::string::npos) {
for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) {
ov::Coordinate end_key(m_key_cache[decoder_layer_id].get_shape());
ov::Coordinate end_value(m_value_cache[decoder_layer_id].get_shape());
m_key_cache[decoder_layer_id].copy_to(dst_key_roi);
m_value_cache[decoder_layer_id].copy_to(dst_value_roi);

ov::Tensor key_cache(m_device_config.get_cache_precision(), new_key_cache_shape);
ov::Tensor value_cache(m_device_config.get_cache_precision(), new_value_cache_shape);

// copy current cache data
ov::Tensor dst_key_roi(key_cache, start_key, end_key);
ov::Tensor dst_value_roi(value_cache, start_value, end_value);
m_key_cache[decoder_layer_id].copy_to(dst_key_roi);
m_value_cache[decoder_layer_id].copy_to(dst_value_roi);
}

// Some optimizations like AVX2, AVX512, AMX require a minimal shape and
// perform multiplying by zero on the excess data. Uninitialized tensor data contain NAN's,
// so NAN * 0 returns non-zero invalid data.
// So we need to set zeros to all newly allocated tensors data.
auto key_cache_roi_end = static_cast<unsigned char*>(key_cache.data()) + dst_key_roi.get_byte_size();
auto value_cache_roi_end = static_cast<unsigned char*>(value_cache.data()) + dst_value_roi.get_byte_size();
std::memset(key_cache_roi_end, 0, key_cache.get_byte_size() - dst_key_roi.get_byte_size());
std::memset(value_cache_roi_end, 0, value_cache.get_byte_size() - dst_value_roi.get_byte_size());
std::memset(key_cache_roi_end, 0, key_cache.get_byte_size() - key_roi_size_byte);
std::memset(value_cache_roi_end, 0, value_cache.get_byte_size() - value_roi_size_byte);

// set new cache tensors
m_key_cache[decoder_layer_id] = key_cache;
m_value_cache[decoder_layer_id] = value_cache;
if (m_key_cache.size() > decoder_layer_id) {
m_key_cache[decoder_layer_id] = key_cache;
m_value_cache[decoder_layer_id] = value_cache;
}
else {
m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
}

update_request_tensor(decoder_layer_id);
}
} else {
auto remote_context = m_core.get_default_context(device_name);
for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) {
ov::Coordinate end_key(m_key_cache[decoder_layer_id].get_shape());
ov::Coordinate end_value(m_value_cache[decoder_layer_id].get_shape());

ov::RemoteTensor key_cache = remote_context.create_tensor(m_device_config.get_cache_precision(), new_key_cache_shape);
ov::RemoteTensor value_cache = remote_context.create_tensor(m_device_config.get_cache_precision(), new_value_cache_shape);

// copy current cache data
ov::RemoteTensor dst_key_roi(key_cache, start_key, end_key);
ov::RemoteTensor dst_value_roi(value_cache, start_value, end_value);

dst_key_roi.copy_from(m_key_cache[decoder_layer_id]);
dst_value_roi.copy_from(m_value_cache[decoder_layer_id]);

// set new cache tensors
m_key_cache[decoder_layer_id] = key_cache;
m_value_cache[decoder_layer_id] = value_cache;
ov::Tensor key_cache = remote_context.create_tensor(m_device_config.get_cache_precision(),
key_cache_shape);
ov::Tensor value_cache = remote_context.create_tensor(m_device_config.get_cache_precision(),
value_cache_shape);

if (m_key_cache.size() > decoder_layer_id) {
ov::Coordinate end_key = m_key_cache[decoder_layer_id].get_shape();
ov::Coordinate end_value = m_value_cache[decoder_layer_id].get_shape();

// copy current cache data
ov::RemoteTensor dst_key_roi(key_cache, start_key, end_key);
ov::RemoteTensor dst_value_roi(value_cache, start_value, end_value);
dst_key_roi.copy_from(m_key_cache[decoder_layer_id]);
dst_value_roi.copy_from(m_value_cache[decoder_layer_id]);

m_key_cache[decoder_layer_id] = key_cache;
m_value_cache[decoder_layer_id] = value_cache;
}
else {
m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
}
update_request_tensor(decoder_layer_id);
}
}
m_num_allocated_kv_blocks = num_kv_blocks;
}

ov::Tensor get_key_cache(size_t decoder_layer_id) const {
Expand Down
3 changes: 0 additions & 3 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::ContinuousBatchingImpl(

bool is_need_per_layer_cache_control = scheduler_config.use_cache_eviction;
utils::apply_paged_attention_transformations(model, device_config, is_need_per_layer_cache_control);
m_core = std::make_shared<Core>(core);

init(model, scheduler_config, compile_properties, device_config, core);
}
Expand Down Expand Up @@ -78,8 +77,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::init(
// If eos_token_id was not provided, take value
if (m_generation_config.eos_token_id == -1)
m_generation_config.set_eos_token_id(m_tokenizer.get_eos_token_id());

m_device_config = std::make_shared<DeviceConfig>(device_config);
};


Expand Down
9 changes: 0 additions & 9 deletions src/cpp/src/continuous_batching_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc
std::shared_ptr<CacheManager> m_cache_manager;
std::shared_ptr<ModelRunner> m_model_runner;
std::shared_ptr<Sampler> m_sampler;
std::shared_ptr<DeviceConfig> m_device_config;

// current requests to process
std::vector<SequenceGroup::Ptr> m_requests;
Expand All @@ -31,14 +30,6 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc
// flag to enable validation mode for sampler
bool m_is_validation_mode_enabled = false;

// dynamic kv-cache allocation params
const size_t m_kv_blocks_initial_multiplier = 2;
const float m_cache_growth_factor = 2; // commmon values 1.5 or 2
const float m_percentage_threshold_for_cache_increase = 100;

bool m_dynamic_memory_allocation = false;
std::shared_ptr<Core> m_core;

#ifdef DEBUG_CACHE_STATE_DUMP
size_t step_count = 0;
#endif
Expand Down
61 changes: 34 additions & 27 deletions src/cpp/src/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,12 @@ class Scheduler {
m_config(config),
m_block_manager(m_config.num_kv_blocks, m_config.enable_prefix_caching, block_size, num_layers) {

// allocate kv-cache if the number of kv blocks is determined,
// otherwise cache will be allocated dynamically
if (m_block_manager.block_allocator_initialized()) {
m_cache_manager->allocate_cache_if_needed(m_block_manager.get_total_number_of_kv_blocks());
}
OPENVINO_ASSERT(num_layers != 0, "num_layers must be non-zero");
}

Output schedule(std::vector<SequenceGroup::Ptr>& sequence_groups) {
Output scheduler_output;
if (!m_block_manager.block_allocator_initialized()) {
if (m_block_manager.get_total_number_of_kv_blocks() == 0) {
_initialize_cache(sequence_groups);
}

Expand All @@ -83,6 +78,7 @@ class Scheduler {
}
}

m_cache_manager->allocate_cache_if_needed(m_block_manager.get_total_number_of_kv_blocks());
_clear_waiting_sequences(sequence_groups);
scheduler_output.m_cache_usage = m_block_manager.get_used_percentage();
return scheduler_output;
Expand Down Expand Up @@ -255,8 +251,10 @@ class Scheduler {
size_t available_slots = currently_allocated_token_slots - occupied_token_slots,
required_slots = num_scheduled_tokens > available_slots ? num_scheduled_tokens - available_slots : 0;
size_t num_required_blocks = (required_slots + block_size - 1) / block_size, num_free_blocks = m_block_manager.num_free_blocks();
if (num_free_blocks == 0) {
_try_increase_cache();
while (num_required_blocks > num_free_blocks) {
if (!_try_increase_cache()) {
break;
}
}
size_t num_scheduled_blocks = std::min(num_required_blocks, num_free_blocks);
// some scheduled blocks can be no fully occupied, so we need to take min between num_scheduled_blocks
Expand Down Expand Up @@ -310,15 +308,18 @@ class Scheduler {
size_t num_scheduled_tokens_per_seq = std::min(available_tokens_per_seq_in_megabatch, num_available_tokens_per_seq);
sequence_group->schedule_tokens(num_scheduled_tokens_per_seq);

while (!m_block_manager.can_append_slots(sequence_group)){
if (!_try_increase_cache()) {
break;
}
}

_apply_preemption(sequence_group_id, sequence_groups);

// if we can't preemt any more sequences, clear scheduled tokens and move to next sequence
if (!m_block_manager.can_append_slots(sequence_group)){
_try_increase_cache();
if (!m_block_manager.can_append_slots(sequence_group)) {
sequence_group->clear_scheduled_tokens();
continue;
}
if (!m_block_manager.can_append_slots(sequence_group)) {
sequence_group->clear_scheduled_tokens();
continue;
}

// allocate new slots
Expand Down Expand Up @@ -394,11 +395,13 @@ class Scheduler {
// apply KV cache limitations
size_t block_size = get_block_size();
const size_t num_required_blocks = (sequence_len + block_size - 1) / block_size;
if (!m_block_manager.can_allocate_blocks(num_required_blocks)) {
_try_increase_cache();
if (!m_block_manager.can_allocate_blocks(num_required_blocks))
while (!m_block_manager.can_allocate_blocks(num_required_blocks)){
if (!_try_increase_cache()) {
break;
}
}
if (!m_block_manager.can_allocate_blocks(num_required_blocks))
break;

// add scheduling information
{
Expand Down Expand Up @@ -465,25 +468,26 @@ class Scheduler {
}

void _initialize_cache(const std::vector<SequenceGroup::Ptr>& sequence_groups) {
size_t seq_length_sum = 0;
size_t blocks_sum = 0;
for (auto idx = 0; idx < sequence_groups.size(); idx++) {
auto seq_length = sequence_groups[idx]->get_prompt_len() * m_kv_blocks_initial_multiplier;
auto gen_config = sequence_groups[idx]->get_sampling_parameters();
seq_length = std::min(seq_length, sequence_groups[idx]->get_prompt_len() + gen_config.get_max_new_tokens(sequence_groups[idx]->get_prompt_len()));
if (sequence_groups[idx]->get_sampling_parameters().is_beam_search()) {

seq_length *= sequence_groups[idx]->get_sampling_parameters().num_beams;
size_t blocks_num = std::ceil((float)seq_length / m_block_manager.get_block_size());
if (gen_config.do_sample && gen_config.is_beam_search()) {
blocks_num *= gen_config.num_beams;
} else if (gen_config.do_sample && gen_config.is_multinomial()) {
blocks_num *= gen_config.num_return_sequences;
}
seq_length_sum += seq_length;
blocks_sum += blocks_num;
}
m_block_manager.increase_kv_blocks_number(seq_length_sum);
m_cache_manager->allocate_cache_if_needed(m_block_manager.get_total_number_of_kv_blocks());
m_block_manager.increase_kv_blocks_number(blocks_sum);
m_dynamic_memory_allocation = true;
}

void _try_increase_cache() {
bool _try_increase_cache() {
if (!m_dynamic_memory_allocation) {
return;
return false;
}
auto device_config = m_cache_manager->get_device_config();
auto device = device_config->get_device();
Expand All @@ -503,9 +507,12 @@ class Scheduler {
if (possible_blocks_to_add > 0) {
m_block_manager.increase_kv_blocks_number(current_num_of_kv_blocks + possible_blocks_to_add);
}
else {
return false;
}
}
}
m_cache_manager->allocate_cache_if_needed(m_block_manager.get_total_number_of_kv_blocks());
return true;
}

};
Expand Down
Loading

0 comments on commit eebac1f

Please sign in to comment.