From 510022ec1d1085aa6c916dcff0f2c68269ac3960 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Wed, 7 Aug 2024 07:47:29 +0200 Subject: [PATCH] cherry-pick from release: Add custom streamer doc (#749) cherry-pick https://github.com/openvinotoolkit/openvino.genai/pull/717 --------- Co-authored-by: Zlobin Vladimir Co-authored-by: Ekaterina Aidova --- .../python/multinomial_causal_lm/README.md | 8 ++ .../multinomial_causal_lm.py | 133 ++++++++++++++++-- src/README.md | 43 ++++-- 3 files changed, 166 insertions(+), 18 deletions(-) diff --git a/samples/python/multinomial_causal_lm/README.md b/samples/python/multinomial_causal_lm/README.md index 0778868e6a..351773ec0d 100644 --- a/samples/python/multinomial_causal_lm/README.md +++ b/samples/python/multinomial_causal_lm/README.md @@ -2,6 +2,8 @@ This example showcases inference of text-generation Large Language Models (LLMs): `chatglm`, `LLaMA`, `Qwen` and other models with the same signature. The application doesn't have many configuration options to encourage the reader to explore and modify the source code. For example, change the device for inference to GPU. The sample fearures `ov::genai::LLMPipeline` and configures it to run random sampling algorithm. There is also a Jupyter [notebook](https://github.com/openvinotoolkit/openvino_notebooks/tree/latest/notebooks/llm-chatbot) which provides an example of LLM-powered Chatbot in Python. +This sample also contains example implementation of an iterable streamer with bufferisation. + ## Download and convert the model and tokenizers The `--upgrade-strategy eager` option is needed to ensure `optimum-intel` is upgraded to the latest version. @@ -22,6 +24,12 @@ Discrete GPUs (dGPUs) usually provide better performance compared to CPUs. It is See https://github.com/openvinotoolkit/openvino.genai/blob/master/src/README.md#supported-models for the list of supported models. +## Streaming + +This Python example demonstrates custom detokenization with bufferization. The streamer receives integer tokens corresponding to each word or subword, one by one. If tokens are decoded individually, the resulting text misses necessary spaces because of detokenize(tokenize(" a")) == "a". + +To address this, the detokenizer needs a larger context. We accumulate tokens in a tokens_cache buffer and decode multiple tokens together, adding the text to the streaming queue only when a complete decoded chunk is ready. We run a separate thread to print all new elements arriving in this queue from the generation pipeline. Each generated chunk of text is put into a synchronized queue, ensuring that all put and get operations are thread-safe and blocked until they can proceed. + ### Troubleshooting #### Unicode characters encoding error on Windows diff --git a/samples/python/multinomial_causal_lm/multinomial_causal_lm.py b/samples/python/multinomial_causal_lm/multinomial_causal_lm.py index 6feb8a2b85..6300320264 100755 --- a/samples/python/multinomial_causal_lm/multinomial_causal_lm.py +++ b/samples/python/multinomial_causal_lm/multinomial_causal_lm.py @@ -4,11 +4,120 @@ import argparse import openvino_genai +import queue +import threading -def streamer(subword): - print(subword, end='', flush=True) - return False +class IterableStreamer(openvino_genai.StreamerBase): + """ + A custom streamer class for handling token streaming and detokenization with buffering. + + Attributes: + tokenizer (Tokenizer): The tokenizer used for encoding and decoding tokens. + tokens_cache (list): A buffer to accumulate tokens for detokenization. + text_queue (Queue): A synchronized queue for storing decoded text chunks. + print_len (int): The length of the printed text to manage incremental decoding. + """ + + def __init__(self, tokenizer): + """ + Initializes the IterableStreamer with the given tokenizer. + + Args: + tokenizer (Tokenizer): The tokenizer to use for encoding and decoding tokens. + """ + super().__init__() + self.tokenizer = tokenizer + self.tokens_cache = [] + self.text_queue = queue.Queue() + self.print_len = 0 + + def __iter__(self): + """ + Returns the iterator object itself. + """ + return self + + def __next__(self): + """ + Returns the next value from the text queue. + + Returns: + str: The next decoded text chunk. + + Raises: + StopIteration: If there are no more elements in the queue. + """ + value = self.text_queue.get() # get() will be blocked until a token is available. + if value is None: + raise StopIteration + return value + + def get_stop_flag(self): + """ + Checks whether the generation process should be stopped. + + Returns: + bool: Always returns False in this implementation. + """ + return False + + def put_word(self, word: str): + """ + Puts a word into the text queue. + + Args: + word (str): The word to put into the queue. + """ + self.text_queue.put(word) + + def put(self, token_id: int) -> bool: + """ + Processes a token and manages the decoding buffer. Adds decoded text to the queue. + + Args: + token_id (int): The token_id to process. + + Returns: + bool: True if generation should be stopped, False otherwise. + """ + self.tokens_cache.append(token_id) + text = self.tokenizer.decode(self.tokens_cache) + + word = '' + if len(text) > self.print_len and '\n' == text[-1]: + # Flush the cache after the new line symbol. + word = text[self.print_len:] + self.tokens_cache = [] + self.print_len = 0 + elif len(text) >= 3 and text[-3:] == chr(65533): + # Don't print incomplete text. + pass + elif len(text) > self.print_len: + # It is possible to have a shorter text after adding new token. + # Print to output only if text lengh is increaesed. + word = text[self.print_len:] + self.print_len = len(text) + self.put_word(word) + + if self.get_stop_flag(): + # When generation is stopped from streamer then end is not called, need to call it here manually. + self.end() + return True # True means stop generation + else: + return False # False means continue generation + + def end(self): + """ + Flushes residual tokens from the buffer and puts a None value in the queue to signal the end. + """ + text = self.tokenizer.decode(self.tokens_cache) + if len(text) > self.print_len: + word = text[self.print_len:] + self.put_word(word) + self.tokens_cache = [] + self.print_len = 0 + self.put_word(None) def main(): @@ -19,17 +128,25 @@ def main(): device = 'CPU' # GPU can be used as well pipe = openvino_genai.LLMPipeline(args.model_dir, device) - + + text_print_streamer = IterableStreamer(pipe.get_tokenizer()) + def token_printer(): + # Getting next elements from iterable will be blocked until a new token is available. + for word in text_print_streamer: + print(word, end='', flush=True) + printer_thread = threading.Thread(target=token_printer, daemon=True) + printer_thread.start() + config = openvino_genai.GenerationConfig() config.max_new_tokens = 100 config.do_sample = True config.top_p = 0.9 config.top_k = 30 - # Since the streamer is set, the results will - # be printed each time a new token is generated. - pipe.generate(args.prompt, config, streamer) - + # Since the streamer is set, the results will be printed + # every time a new token is generated and put into the streamer queue. + pipe.generate(args.prompt, config, text_print_streamer) + printer_thread.join() if '__main__' == __name__: main() diff --git a/src/README.md b/src/README.md index 53076803c5..e54f12ee83 100644 --- a/src/README.md +++ b/src/README.md @@ -178,6 +178,8 @@ int main(int argc, char* argv[]) { ``` Streaming with a custom class: + +C++ template for a stremer. ```cpp #include "openvino/genai/streamer_base.hpp" #include "openvino/genai/llm_pipeline.hpp" @@ -186,18 +188,14 @@ Streaming with a custom class: class CustomStreamer: public ov::genai::StreamerBase { public: bool put(int64_t token) { - bool stop_flag = false; - /* - custom decoding/tokens processing code - tokens_cache.push_back(token); - std::string text = m_tokenizer.decode(tokens_cache); - ... - */ - return stop_flag; // flag whether generation should be stoped, if true generation stops. + // Custom decoding/tokens processing logic. + + // Returns a flag whether generation should be stoped, if true generation stops. + return false; }; void end() { - /* custom finalization */ + // Custom finalization logic. }; }; @@ -206,10 +204,35 @@ int main(int argc, char* argv[]) { std::string model_path = argv[1]; ov::genai::LLMPipeline pipe(model_path, "CPU"); - std::cout << pipe.generate("The Sun is yellow because", ov::genai::streamer(custom_streamer), ov::genai::max_new_tokens(200)); + std::cout << pipe.generate("The Sun is yellow because", ov::genai::max_new_tokens(15), ov::genai::streamer(custom_streamer)); } ``` +Python template for a streamer. +```py +import openvino_genai as ov_genai + +class CustomStreamer(ov_genai.StreamerBase): + def __init__(self): + super().__init__() + # Initialization logic. + + def put(self, token_id) -> bool: + # Custom decoding/tokens processing logic. + + # Returns a flag whether generation should be stoped, if true generation stops. + return False + + def end(self): + # Custom finalization logic. + +pipe = ov_genai.LLMPipeline(model_path, "CPU") +custom_streamer = CustomStreamer() + +pipe.generate("The Sun is yellow because", max_new_tokens=15, streamer=custom_streamer) +``` +For fully implemented iterable CustomStreamer please refer to [multinomial_causal_lm](https://github.com/openvinotoolkit/openvino.genai/tree/releases/2024/3/samples/python/multinomial_causal_lm/README.md) sample. + ### Performance Metrics `openvino_genai.PerfMetrics` (referred as `PerfMetrics` for simplicity) is a structure that holds performance metrics for each generate call. `PerfMetrics` holds fields with mean and standard deviations for the following metrics: