Skip to content

Commit

Permalink
Merge pull request #3 from popovaan/preemtion_alg
Browse files Browse the repository at this point in the history
Preemption algorithm finalization
  • Loading branch information
ilya-lavrenov authored May 7, 2024
2 parents e4700f4 + 2488765 commit 17fdc12
Show file tree
Hide file tree
Showing 7 changed files with 659 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,18 @@ install(TARGETS ${TARGET_NAME}
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})

install(DIRECTORY include/ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} FILES_MATCHING PATTERN "*.hpp")


# gtest
FetchContent_Declare(
googletest
URL https://github.com/google/googletest/archive/03597a01ee50ed33e9dfd640b249b4be3799d395.zip
)
FetchContent_MakeAvailable(googletest)


set(TEST_TARGET_NAME "tests_continuous_batching")
add_executable(${TEST_TARGET_NAME} "src/tests/scheduler.cpp")
target_link_libraries(${TEST_TARGET_NAME} PUBLIC ${TARGET_NAME} openvino::runtime gtest_main)
target_include_directories(${TEST_TARGET_NAME} PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/src/"
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include")
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class KVCacheBlock {
bool copy_on_write() const {
return m_ref_count > 1;
}

int get_references_count() const {
return m_ref_count;
}
};


Expand Down Expand Up @@ -106,6 +110,10 @@ class BlockManager {
return m_block_table[seq_id];
}

const bool has_block_table(uint64_t seq_id) {
return m_block_table.count(seq_id) > 0;
}

size_t num_free_blocks() const {
return m_allocator.num_free_blocks();
}
Expand Down Expand Up @@ -141,16 +149,81 @@ class BlockManager {
OPENVINO_ASSERT(m_block_table.erase(seq_id) == 1);
}

bool can_append_slot(SequenceGroup::CPtr seq_group) {
// TODO: optimize this HEURISTIC
// it assumes that all sequences require new block, but maybe some of them
// don't share the same block
// let's count actual number of sequences, where last_block_id is the same
return seq_group->num_running_seqs() <= m_allocator.num_free_blocks();
void free_sequence_partially(size_t seq_id, size_t block_num) {
// currently this method is applicable only for groups with single sequences
// TODO: support for groups with multiple sequences
auto block_table = m_block_table[seq_id];

if (block_num == block_table.size())
return free_sequence(seq_id);

OPENVINO_ASSERT(block_table.size() >= block_num);
for (size_t idx = 0; idx < block_num; idx++) {
m_allocator.free(block_table.back());
OPENVINO_ASSERT(block_table.back()->is_free());
}
m_block_table[seq_id].resize(m_block_table[seq_id].size() - block_num);

if (m_block_table.size() == 0) {
OPENVINO_ASSERT(m_block_table.erase(seq_id) == 1);
}
}

bool can_append_slots(SequenceGroup::CPtr seq_group) {
return required_blocks_count(seq_group) <= m_allocator.num_free_blocks();
}

std::map<size_t, std::list<size_t>> append_slot(SequenceGroup::CPtr seq_group) {
OPENVINO_ASSERT(can_append_slot(seq_group));
size_t required_blocks_count(SequenceGroup::CPtr seq_group) {
std::vector<Sequence::CPtr> running_sequences = seq_group->get_running_sequences();
size_t blocks_count= 0; // totat number of needed blocks for sequence group
std::set<size_t> last_block_ids; // unique last block indices

for (auto seq: running_sequences) {
auto seq_id = seq->get_id();
if (m_block_table.find(seq_id) == m_block_table.end()) {
// the block table is empty, so we need to allocate the number of blocks equal to number of logical blocks
blocks_count += seq_group->get_num_logical_blocks();
continue;
}
auto& block_table = m_block_table[seq_id];
size_t num_physical_blocks = block_table.size();
OPENVINO_ASSERT(num_physical_blocks > 0);

if (num_physical_blocks > seq_group->get_num_logical_blocks())
// new blocks are not required
continue;

size_t last_block_id = block_table.back()->get_index();

if (last_block_ids.find(last_block_id) != last_block_ids.end())
// this block was already processed
continue;

size_t needed_blocks_per_sequence = seq_group->get_num_logical_blocks() - num_physical_blocks;

KVCacheBlock::Ptr last_block = block_table.back();
if (last_block->copy_on_write()) {
// block is used only by multiple sequences
auto references_count = last_block->get_references_count();

if (needed_blocks_per_sequence == 0) {
// case when last block is not completely filled and needs to be copied n - 1 times, where n - references count
blocks_count += references_count - 1;
}
else {
blocks_count += needed_blocks_per_sequence * references_count;
}
}
else {
// block is used only by one sequence
blocks_count += needed_blocks_per_sequence;
}
}
return blocks_count;
}

std::map<size_t, std::list<size_t>> append_slots(SequenceGroup::CPtr seq_group) {

size_t num_logical_blocks = seq_group->get_num_logical_blocks();
std::vector<Sequence::CPtr> running_sequences = seq_group->get_running_sequences();

Expand All @@ -162,8 +235,8 @@ class BlockManager {
size_t num_physical_blocks = block_table.size();

if (num_logical_blocks > num_physical_blocks) {
// we require to allocate a new physical block
block_table.push_back(m_allocator.allocate_block());
OPENVINO_ASSERT(can_allocate_blocks(num_logical_blocks - num_physical_blocks));
allocate(seq_id, num_logical_blocks - num_physical_blocks);
} else {
OPENVINO_ASSERT(num_logical_blocks == num_physical_blocks, "A number of physical and logic blocks must be the same in this code path");
KVCacheBlock::Ptr last_block = block_table.back();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,15 @@ SamplerOutput Sampler::sample(std::vector<SequenceGroup::Ptr> & sequence_groups,
if (m_beam_search_info.find(request_id) == m_beam_search_info.end()) {
m_beam_search_info.emplace(request_id, GroupBeamSearcher(sequence_group));
}
else {
// sequence group can be empty if returned after preemption
if (sequence_group->is_empty()) {
// clear beam search info
m_beam_search_info.erase(request_id);
m_beam_search_info.emplace(request_id, GroupBeamSearcher(sequence_group));
}
}


// current algorithm already adds new tokens to running sequences and
m_beam_search_info.at(request_id).select_next_tokens(sequence_group_logits, sampler_output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ class Scheduler {
Output schedule(std::vector<SequenceGroup::Ptr>& sequence_groups) {
Output scheduler_output;

// 1. perform balancing of running groups first to ensure we have some free KV blocks for generation
_apply_preemption(sequence_groups);

if (m_config.dynamic_split_fuse) {
// deepspeed-mii case
// generation phase is always scheduled first
Expand All @@ -66,6 +63,10 @@ class Scheduler {
return m_block_manager.get_block_table(seq.get_id());
}

const bool has_block_table(uint64_t seq_id) {
return m_block_manager.has_block_table(seq_id);
}

void free_sequence(uint64_t seq_id) {
m_block_manager.free_sequence(seq_id);
}
Expand All @@ -89,52 +90,99 @@ class Scheduler {
return num_running;
}

void _preempt_by_recompute(SequenceGroup::Ptr sequence_group) {
// currently, we support only preemption by (TODO: implement "partial") recompute
for (size_t s = 0; s < sequence_group->num_running_seqs(); ++s) {
// so, let's fully drop a sequence(s) from block_manager
m_block_manager.free_sequence((*sequence_group)[s]->get_id());

bool _preempt_by_recompute(SequenceGroup::Ptr sequence_group, size_t blocks_needed) {
size_t total_num_released_blocks = 0;
size_t processed_tokens = sequence_group->get_num_processed_tokens();
size_t block_size = m_config.block_size;
size_t prev_blocks_count = m_block_manager.num_free_blocks();
size_t num_running_sequences = sequence_group->num_running_seqs();
size_t preempted_tokens = 0;

if (num_running_sequences > 1) {
for (size_t s = 0; s < sequence_group->num_running_seqs(); ++s) {
auto seq_id = (*sequence_group)[s]->get_id();
m_block_manager.free_sequence(seq_id);
}
sequence_group->reset();
return m_block_manager.num_free_blocks() > prev_blocks_count;
}

// currently partial preemtion is enabled only for single running sequence case
// TODO: implement partial preemption for case with muliple sequences in group
for (size_t s = 0; s < num_running_sequences; ++s) {
auto seq_id = (*sequence_group)[s]->get_id();
if (!m_block_manager.has_block_table(seq_id)) {
// no blocks are allocated for this sequence, so it can't be preempted
return false;
}
auto block_table = m_block_manager.get_block_table(seq_id);
size_t required_blocks = blocks_needed - total_num_released_blocks;
if (required_blocks >= block_table.size()) {
// fully drop a sequence(s) from block_manager
m_block_manager.free_sequence(seq_id);
}
else {
m_block_manager.free_sequence_partially(seq_id, required_blocks);
}

// calculate the number of released blocks
auto released_blocks = m_block_manager.num_free_blocks() - prev_blocks_count;
total_num_released_blocks += released_blocks;
prev_blocks_count = m_block_manager.num_free_blocks();


// calculate the number of preempted tokens
auto tokens_in_last_block = processed_tokens % block_size;
if (tokens_in_last_block == 0) {
tokens_in_last_block = block_size;
}

preempted_tokens += tokens_in_last_block + std::max<size_t>((int)released_blocks - 1, 0) * block_size;
if (m_block_manager.num_free_blocks() >= blocks_needed) {
break;
}
}
// case when preemption requires preempt prompt tokens
if (!m_config.dynamic_split_fuse && processed_tokens - preempted_tokens < sequence_group->get_prompt_len()) {
// preempt prompt fully to not leave partially generated prompt
preempted_tokens = processed_tokens;
auto seq_id = (*sequence_group)[0]->get_id();
m_block_manager.free_sequence(seq_id);
}
// update computed part of each sequence
sequence_group->preempt_tokens(sequence_group->get_num_processed_tokens());
sequence_group->preempt_tokens(preempted_tokens);
return total_num_released_blocks > 0;
}

static size_t _get_low_priority_sequence_group_id(const std::vector<SequenceGroup::Ptr>& sequence_groups) {
for (size_t seq_group_id = 0, num_groups = sequence_groups.size(); seq_group_id < num_groups; ++seq_group_id) {
SequenceGroup::CPtr sequence_group = sequence_groups[num_groups - seq_group_id - 1];
size_t group_idx = num_groups - seq_group_id - 1;
SequenceGroup::CPtr sequence_group = sequence_groups[group_idx];
if (sequence_group->get_num_processed_tokens() > 0) {
// we are here, because current sequence group has some reserved KV blocks in block manager
// which can be freed
return seq_group_id;
return group_idx;
}
}

return std::numeric_limits<size_t>::max();
}

// current function iterates over all sequence groups and understands whether some KV blocks of existing
// sequences (either on generate or prompt phase) need to be freed to generation phase of higher priority
// sequence groups (priority here is index within vector of sequence groups)
void _apply_preemption(std::vector<SequenceGroup::Ptr>& sequence_groups) {
for (size_t sequence_group_id = 0; sequence_group_id < sequence_groups.size(); ++sequence_group_id) {
SequenceGroup::Ptr high_priority_sequence_group = sequence_groups[sequence_group_id];

// we don't consider sequence which is not in "generation" or "preempted" phases as high priority
if (!high_priority_sequence_group->can_generate_tokens())
continue;

// check whether current sequence requires a new slot / block
// TODO: we use simple HEURISTIC like in vLLM (see impl. of "can_append_slot"), but it can be implemented more precise
while (!m_block_manager.can_append_slot(high_priority_sequence_group)) {
// let's run a sequence for eviction
size_t evicted_sequence_group_id = _get_low_priority_sequence_group_id(sequence_groups);

if (evicted_sequence_group_id >= sequence_group_id) {
// we have a cycle when current group need to evict itself to be in a running state
return;
}

_preempt_by_recompute(sequence_groups[evicted_sequence_group_id]);
void _apply_preemption(size_t sequence_group_id, const std::vector<SequenceGroup::Ptr>& sequence_groups) {
SequenceGroup::Ptr sequence_group = sequence_groups[sequence_group_id];

// check whether current sequence requires a new slot / block
while (!m_block_manager.can_append_slots(sequence_group)) {
// let's run a sequence for eviction
size_t evicted_sequence_group_id = _get_low_priority_sequence_group_id(sequence_groups);

if (evicted_sequence_group_id <= sequence_group_id) {
// we have a cycle when current group need to evict itself to be in a running state
break;
}
size_t blocks_needed = m_block_manager.required_blocks_count(sequence_group);
if (!_preempt_by_recompute(sequence_groups[evicted_sequence_group_id], blocks_needed)){
break;
}
}
}
Expand Down Expand Up @@ -219,15 +267,16 @@ class Scheduler {
size_t num_scheduled_tokens_per_seq = std::min(available_tokens_per_seq_in_megabatch, num_available_tokens_per_seq);
sequence_group->schedule_tokens(num_scheduled_tokens_per_seq);

// TODO: below functions can_append_slot / append_slot can allocate just a single slot, while we require multiple ones in generic case
// (generic case is speculative deconding, where we can check multiple tokens at once)
// So, let's state this as current limitation of scheduler logic
OPENVINO_ASSERT(num_scheduled_tokens_per_seq == 1);
_apply_preemption(sequence_group_id, sequence_groups);

// we can ensure that preemption stage freed some KV blocks to allocate new slots for us
OPENVINO_ASSERT(m_block_manager.can_append_slot(sequence_group));
// if we can't preemt any more sequences, clear scheduled tokens and move to next sequence
if (!m_block_manager.can_append_slots(sequence_group)){
sequence_group->clear_scheduled_tokens();
continue;
}

// allocate new slots
std::map<size_t, std::list<size_t>> copy_blocks_map = m_block_manager.append_slot(sequence_group);
std::map<size_t, std::list<size_t>> copy_blocks_map = m_block_manager.append_slots(sequence_group);

// add information to scheduler_output
{
Expand Down
Loading

0 comments on commit 17fdc12

Please sign in to comment.