From 8a1d7ec5936461a436e714f6eaf0154dc699be4a Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Wed, 31 Jul 2024 16:22:38 +0200 Subject: [PATCH 1/7] add doc for Python CustomStreamer with buffer --- src/README.md | 64 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/src/README.md b/src/README.md index 53076803c5..d4235f22c5 100644 --- a/src/README.md +++ b/src/README.md @@ -210,6 +210,70 @@ int main(int argc, char* argv[]) { } ``` +This Python example demonstrates custom detokenization with buferisation. The streamer receives +integer tokens corresponding to each word or subword, one by one. If tokens are decoded individually, +subwords will not be concatenated correctly, and the resulting text will lack appropriate spaces. +To address this, we accumulate tokens in a tokens_cache buffer and decode multiple tokens together, +returning the text only when a complete decoded chunk is ready. + +```py +import openvino_genai as ov_genai + +class TextPrintStreamer(ov_genai.StreamerBase): + def __init__(self, tokenizer): + super().__init__() + self.tokenizer = tokenizer + self.tokens_cache = [] + self.print_len = 0 + + def get_stop_flag(self): + return False + + def process_word(self, word: str): + print(word, end='', flush=True) + + def put(self, token_id): + self.tokens_cache.append(token_id) + text = self.tokenizer.decode(self.tokens_cache) + + word = '' + if len(text) > self.print_len and '\n' == text[-1]: + # Flush the cache after the new line symbol. + word = text[self.print_len:] + self.tokens_cache = [] + self.print_len = 0 + elif len(text) >= 3 and text[-3:] == "�": + # Don't print incomplete text. + pass + elif len(text) > self.print_len: + # It is possible to have a shorter text after adding new token. + # Print to output only if text lengh is increaesed. + word = text[self.print_len:] + self.print_len = len(text) + self.process_word(word) + + if self.get_stop_flag(): + # When generation is stopped from streamer then end is not called, need to call it here manually. + self.end() + return True # True means stop generation + else: + return False # False means continue generation + + def end(self): + # Flush residual tokens from the buffer. + text = self.tokenizer.decode(self.tokens_cache) + if len(text) > self.print_len: + word = text[self.print_len:] + self.process_word(word) + self.tokens_cache = [] + self.print_len = 0 + +pipe = ov_genai.LLMPipeline(model_path, "CPU") +text_print_streamer = TextPrintStreamer(pipe.get_tokenizer()) + +pipe.generate("The Sun is yellow because", max_new_tokens=15, streamer=text_print_streamer) +``` + ### Performance Metrics `openvino_genai.PerfMetrics` (referred as `PerfMetrics` for simplicity) is a structure that holds performance metrics for each generate call. `PerfMetrics` holds fields with mean and standard deviations for the following metrics: From 960bf0ac547cc260eb150a312da6c3850723bc0d Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 5 Aug 2024 13:25:43 +0200 Subject: [PATCH 2/7] make streamer itrable, move to multinomial sample --- .../python/multinomial_causal_lm/README.md | 8 ++ .../multinomial_causal_lm.py | 83 +++++++++++++++++-- src/README.md | 71 ++++++---------- 3 files changed, 108 insertions(+), 54 deletions(-) diff --git a/samples/python/multinomial_causal_lm/README.md b/samples/python/multinomial_causal_lm/README.md index 0778868e6a..8d2018358b 100644 --- a/samples/python/multinomial_causal_lm/README.md +++ b/samples/python/multinomial_causal_lm/README.md @@ -2,6 +2,8 @@ This example showcases inference of text-generation Large Language Models (LLMs): `chatglm`, `LLaMA`, `Qwen` and other models with the same signature. The application doesn't have many configuration options to encourage the reader to explore and modify the source code. For example, change the device for inference to GPU. The sample fearures `ov::genai::LLMPipeline` and configures it to run random sampling algorithm. There is also a Jupyter [notebook](https://github.com/openvinotoolkit/openvino_notebooks/tree/latest/notebooks/llm-chatbot) which provides an example of LLM-powered Chatbot in Python. +This sample also contains example implementation of an iterable streamer with bufferisation. + ## Download and convert the model and tokenizers The `--upgrade-strategy eager` option is needed to ensure `optimum-intel` is upgraded to the latest version. @@ -22,6 +24,12 @@ Discrete GPUs (dGPUs) usually provide better performance compared to CPUs. It is See https://github.com/openvinotoolkit/openvino.genai/blob/master/src/README.md#supported-models for the list of supported models. +## Streaming + +This Python example demonstrates custom detokenization with bufferization. The streamer receives integer tokens corresponding to each word or subword, one by one. If tokens are decoded individually, because of detokenize(tokenize(" a")) == "a" the resulting text will miss necessary spaces. + +To address this, the detokenizer needs a larger context. We accumulate tokens in a tokens_cache buffer and decode multiple tokens together, adding the text to the streaming queue only when a complete decoded chunk is ready. We run a separate thread to print all new elements arriving in this queue from the generation pipeline. Each generated chunk of text is put into a synchronized queue, ensuring that all put and get operations are thread-safe and blocked until they can proceed. + ### Troubleshooting #### Unicode characters encoding error on Windows diff --git a/samples/python/multinomial_causal_lm/multinomial_causal_lm.py b/samples/python/multinomial_causal_lm/multinomial_causal_lm.py index 6feb8a2b85..6f6f615eda 100755 --- a/samples/python/multinomial_causal_lm/multinomial_causal_lm.py +++ b/samples/python/multinomial_causal_lm/multinomial_causal_lm.py @@ -4,11 +4,70 @@ import argparse import openvino_genai +import queue +import threading -def streamer(subword): - print(subword, end='', flush=True) - return False +class IterableStreamer(openvino_genai.StreamerBase): + def __init__(self, tokenizer): + super().__init__() + self.tokenizer = tokenizer + self.tokens_cache = [] + self.text_queue = queue.Queue() + self.print_len = 0 + + def __iter__(self): + return self + + def __next__(self): + # get() will be blocked until a token is available. + value = self.text_queue.get() + if value is None: + raise StopIteration + return value + + def get_stop_flag(self): + return False + + def put_word(self, word: str): + self.text_queue.put(word) + + def put(self, token_id: int) -> bool: + self.tokens_cache.append(token_id) + text = self.tokenizer.decode(self.tokens_cache) + + word = '' + if len(text) > self.print_len and '\n' == text[-1]: + # Flush the cache after the new line symbol. + word = text[self.print_len:] + self.tokens_cache = [] + self.print_len = 0 + elif len(text) >= 3 and text[-3:] == "�": + # Don't print incomplete text. + pass + elif len(text) > self.print_len: + # It is possible to have a shorter text after adding new token. + # Print to output only if text lengh is increaesed. + word = text[self.print_len:] + self.print_len = len(text) + self.put_word(word) + + if self.get_stop_flag(): + # When generation is stopped from streamer then end is not called, need to call it here manually. + self.end() + return True # True means stop generation + else: + return False # False means continue generation + + def end(self): + # Flush residual tokens from the buffer. + text = self.tokenizer.decode(self.tokens_cache) + if len(text) > self.print_len: + word = text[self.print_len:] + self.put_word(word) + self.tokens_cache = [] + self.print_len = 0 + self.put_word(None) def main(): @@ -19,17 +78,25 @@ def main(): device = 'CPU' # GPU can be used as well pipe = openvino_genai.LLMPipeline(args.model_dir, device) - + + text_print_streamer = IterableStreamer(pipe.get_tokenizer()) + def token_printer(): + # Getting next elements from iterable will be blocked until a new token is available. + for word in text_print_streamer: + print(word, end='', flush=True) + printer_thread = threading.Thread(target=token_printer, daemon=True) + printer_thread.start() + config = openvino_genai.GenerationConfig() config.max_new_tokens = 100 config.do_sample = True config.top_p = 0.9 config.top_k = 30 - # Since the streamer is set, the results will - # be printed each time a new token is generated. - pipe.generate(args.prompt, config, streamer) - + # Since the streamer is set, the results will be printed + # every time a new token is generated and put into the streamer queue. + pipe.generate(args.prompt, config, text_print_streamer) + printer_thread.join() if '__main__' == __name__: main() diff --git a/src/README.md b/src/README.md index d4235f22c5..1b86145d54 100644 --- a/src/README.md +++ b/src/README.md @@ -178,6 +178,8 @@ int main(int argc, char* argv[]) { ``` Streaming with a custom class: + +C++ template for a stremer. ```cpp #include "openvino/genai/streamer_base.hpp" #include "openvino/genai/llm_pipeline.hpp" @@ -210,69 +212,46 @@ int main(int argc, char* argv[]) { } ``` -This Python example demonstrates custom detokenization with buferisation. The streamer receives -integer tokens corresponding to each word or subword, one by one. If tokens are decoded individually, -subwords will not be concatenated correctly, and the resulting text will lack appropriate spaces. -To address this, we accumulate tokens in a tokens_cache buffer and decode multiple tokens together, -returning the text only when a complete decoded chunk is ready. - +Python template for a streamer. ```py import openvino_genai as ov_genai -class TextPrintStreamer(ov_genai.StreamerBase): +class CustomStreamer(ov_genai.StreamerBase): def __init__(self, tokenizer): super().__init__() self.tokenizer = tokenizer + # Initialize a cache to store tokens self.tokens_cache = [] - self.print_len = 0 - - def get_stop_flag(self): - return False - - def process_word(self, word: str): - print(word, end='', flush=True) - def put(self, token_id): + def put(self, token_id) -> bool: + # Process a token ID and determine if the generation should stop. + # Rerturn a boolean flag indicating whether the generation should stop. + stop_flag = False + + # Add the token to the cache and decode the tokens to get the text self.tokens_cache.append(token_id) text = self.tokenizer.decode(self.tokens_cache) - word = '' - if len(text) > self.print_len and '\n' == text[-1]: - # Flush the cache after the new line symbol. - word = text[self.print_len:] - self.tokens_cache = [] - self.print_len = 0 - elif len(text) >= 3 and text[-3:] == "�": - # Don't print incomplete text. - pass - elif len(text) > self.print_len: - # It is possible to have a shorter text after adding new token. - # Print to output only if text lengh is increaesed. - word = text[self.print_len:] - self.print_len = len(text) - self.process_word(word) - - if self.get_stop_flag(): - # When generation is stopped from streamer then end is not called, need to call it here manually. - self.end() - return True # True means stop generation - else: - return False # False means continue generation + # Custom processing logic (if any) + # For example, you might want to stop generation if a certain condition is met + if some_condition: + stop_flag = True + + return stop_flag def end(self): - # Flush residual tokens from the buffer. - text = self.tokenizer.decode(self.tokens_cache) - if len(text) > self.print_len: - word = text[self.print_len:] - self.process_word(word) - self.tokens_cache = [] - self.print_len = 0 + # Custom finalization logic (if any) + # For example, you might want to process the final text or clear the cache + final_text = self.tokenizer.decode(self.tokens_cache) + self.tokens_cache = [] + pipe = ov_genai.LLMPipeline(model_path, "CPU") -text_print_streamer = TextPrintStreamer(pipe.get_tokenizer()) +custom_streamer = TextPrintStreamer(pipe.get_tokenizer()) -pipe.generate("The Sun is yellow because", max_new_tokens=15, streamer=text_print_streamer) +pipe.generate("The Sun is yellow because", max_new_tokens=15, streamer=custom_streamer) ``` +For fully implemented iterable CustomStreamer please refer to [multinomial_causal_lm](https://github.com/openvinotoolkit/openvino.genai/tree/releases/2024/3/samples/python/multinomial_causal_lm/README.md) sample. ### Performance Metrics From dac164fafa9fab7143b1a7700eb6ff1ceff71ce3 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 5 Aug 2024 13:48:25 +0200 Subject: [PATCH 3/7] add docstrings --- .../multinomial_causal_lm.py | 56 ++++++++++++++++++- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/samples/python/multinomial_causal_lm/multinomial_causal_lm.py b/samples/python/multinomial_causal_lm/multinomial_causal_lm.py index 6f6f615eda..3df7206a54 100755 --- a/samples/python/multinomial_causal_lm/multinomial_causal_lm.py +++ b/samples/python/multinomial_causal_lm/multinomial_causal_lm.py @@ -9,7 +9,23 @@ class IterableStreamer(openvino_genai.StreamerBase): + """ + A custom streamer class for handling token streaming and detokenization with buffering. + + Attributes: + tokenizer (Tokenizer): The tokenizer used for encoding and decoding tokens. + tokens_cache (list): A buffer to accumulate tokens for detokenization. + text_queue (Queue): A synchronized queue for storing decoded text chunks. + print_len (int): The length of the printed text to manage incremental decoding. + """ + def __init__(self, tokenizer): + """ + Initializes the IterableStreamer with the given tokenizer. + + Args: + tokenizer (Tokenizer): The tokenizer to use for encoding and decoding tokens. + """ super().__init__() self.tokenizer = tokenizer self.tokens_cache = [] @@ -17,22 +33,54 @@ def __init__(self, tokenizer): self.print_len = 0 def __iter__(self): + """ + Returns the iterator object itself. + """ return self def __next__(self): - # get() will be blocked until a token is available. - value = self.text_queue.get() + """ + Returns the next value from the text queue. + + Returns: + str: The next decoded text chunk. + + Raises: + StopIteration: If there are no more elements in the queue. + """ + value = self.text_queue.get() # get() will be blocked until a token is available. if value is None: raise StopIteration return value def get_stop_flag(self): + """ + Checks whether the generation process should be stopped. + + Returns: + bool: Always returns False in this implementation. + """ return False def put_word(self, word: str): + """ + Puts a word into the text queue. + + Args: + word (str): The word to put into the queue. + """ self.text_queue.put(word) def put(self, token_id: int) -> bool: + """ + Processes a token and manages the decoding buffer. Adds decoded text to the queue. + + Args: + token_id (int): The token_id to process. + + Returns: + bool: True if generation should be stopped, False otherwise. + """ self.tokens_cache.append(token_id) text = self.tokenizer.decode(self.tokens_cache) @@ -60,7 +108,9 @@ def put(self, token_id: int) -> bool: return False # False means continue generation def end(self): - # Flush residual tokens from the buffer. + """ + Flushes residual tokens from the buffer and puts a None value in the queue to signal the end. + """ text = self.tokenizer.decode(self.tokens_cache) if len(text) > self.print_len: word = text[self.print_len:] From f137ba9da9b3860b576efdb19b982cd429753393 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 5 Aug 2024 14:22:34 +0200 Subject: [PATCH 4/7] cleanup src/Readme --- src/README.md | 48 ++++++++++++++---------------------------------- 1 file changed, 14 insertions(+), 34 deletions(-) diff --git a/src/README.md b/src/README.md index 1b86145d54..15cc4a3fb5 100644 --- a/src/README.md +++ b/src/README.md @@ -188,18 +188,14 @@ C++ template for a stremer. class CustomStreamer: public ov::genai::StreamerBase { public: bool put(int64_t token) { - bool stop_flag = false; - /* - custom decoding/tokens processing code - tokens_cache.push_back(token); - std::string text = m_tokenizer.decode(tokens_cache); - ... - */ - return stop_flag; // flag whether generation should be stoped, if true generation stops. + // Custom decoding/tokens processing logic. + + // Returns a flag whether generation should be stoped, if true generation stops. + return false; }; void end() { - /* custom finalization */ + // Custom finalization logic. }; }; @@ -208,7 +204,7 @@ int main(int argc, char* argv[]) { std::string model_path = argv[1]; ov::genai::LLMPipeline pipe(model_path, "CPU"); - std::cout << pipe.generate("The Sun is yellow because", ov::genai::streamer(custom_streamer), ov::genai::max_new_tokens(200)); + std::cout << pipe.generate("The Sun is yellow because", , ov::genai::max_new_tokens(15), ov::genai::streamer(custom_streamer)); } ``` @@ -217,37 +213,21 @@ Python template for a streamer. import openvino_genai as ov_genai class CustomStreamer(ov_genai.StreamerBase): - def __init__(self, tokenizer): + def __init__(self): super().__init__() - self.tokenizer = tokenizer - # Initialize a cache to store tokens - self.tokens_cache = [] + # Initialization logic. def put(self, token_id) -> bool: - # Process a token ID and determine if the generation should stop. - # Rerturn a boolean flag indicating whether the generation should stop. - stop_flag = False - - # Add the token to the cache and decode the tokens to get the text - self.tokens_cache.append(token_id) - text = self.tokenizer.decode(self.tokens_cache) - - # Custom processing logic (if any) - # For example, you might want to stop generation if a certain condition is met - if some_condition: - stop_flag = True - - return stop_flag + # Custom decoding/tokens processing logic. + + # Returns a flag whether generation should be stoped, if true generation stops. + return False def end(self): - # Custom finalization logic (if any) - # For example, you might want to process the final text or clear the cache - final_text = self.tokenizer.decode(self.tokens_cache) - self.tokens_cache = [] - + # Custom finalization logic. pipe = ov_genai.LLMPipeline(model_path, "CPU") -custom_streamer = TextPrintStreamer(pipe.get_tokenizer()) +custom_streamer = CustomStreamer() pipe.generate("The Sun is yellow because", max_new_tokens=15, streamer=custom_streamer) ``` From 05b681273e6e6df3dd196bfa3d7de5b217181e62 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 5 Aug 2024 14:24:24 +0200 Subject: [PATCH 5/7] Apply suggestions from code review --- src/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/README.md b/src/README.md index 15cc4a3fb5..e54f12ee83 100644 --- a/src/README.md +++ b/src/README.md @@ -204,7 +204,7 @@ int main(int argc, char* argv[]) { std::string model_path = argv[1]; ov::genai::LLMPipeline pipe(model_path, "CPU"); - std::cout << pipe.generate("The Sun is yellow because", , ov::genai::max_new_tokens(15), ov::genai::streamer(custom_streamer)); + std::cout << pipe.generate("The Sun is yellow because", ov::genai::max_new_tokens(15), ov::genai::streamer(custom_streamer)); } ``` From 8c4cec9c2edae0d34e86633ee40ae82d72d6665a Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 5 Aug 2024 15:36:43 +0200 Subject: [PATCH 6/7] Update samples/python/multinomial_causal_lm/README.md Co-authored-by: Zlobin Vladimir --- samples/python/multinomial_causal_lm/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/python/multinomial_causal_lm/README.md b/samples/python/multinomial_causal_lm/README.md index 8d2018358b..351773ec0d 100644 --- a/samples/python/multinomial_causal_lm/README.md +++ b/samples/python/multinomial_causal_lm/README.md @@ -26,7 +26,7 @@ See https://github.com/openvinotoolkit/openvino.genai/blob/master/src/README.md# ## Streaming -This Python example demonstrates custom detokenization with bufferization. The streamer receives integer tokens corresponding to each word or subword, one by one. If tokens are decoded individually, because of detokenize(tokenize(" a")) == "a" the resulting text will miss necessary spaces. +This Python example demonstrates custom detokenization with bufferization. The streamer receives integer tokens corresponding to each word or subword, one by one. If tokens are decoded individually, the resulting text misses necessary spaces because of detokenize(tokenize(" a")) == "a". To address this, the detokenizer needs a larger context. We accumulate tokens in a tokens_cache buffer and decode multiple tokens together, adding the text to the streaming queue only when a complete decoded chunk is ready. We run a separate thread to print all new elements arriving in this queue from the generation pipeline. Each generated chunk of text is put into a synchronized queue, ensuring that all put and get operations are thread-safe and blocked until they can proceed. From 90f3bc30d01c396cd007ebe618003d678fbca366 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Tue, 6 Aug 2024 10:38:46 +0200 Subject: [PATCH 7/7] Update samples/python/multinomial_causal_lm/multinomial_causal_lm.py Co-authored-by: Ekaterina Aidova --- samples/python/multinomial_causal_lm/multinomial_causal_lm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/python/multinomial_causal_lm/multinomial_causal_lm.py b/samples/python/multinomial_causal_lm/multinomial_causal_lm.py index 3df7206a54..6300320264 100755 --- a/samples/python/multinomial_causal_lm/multinomial_causal_lm.py +++ b/samples/python/multinomial_causal_lm/multinomial_causal_lm.py @@ -90,7 +90,7 @@ def put(self, token_id: int) -> bool: word = text[self.print_len:] self.tokens_cache = [] self.print_len = 0 - elif len(text) >= 3 and text[-3:] == "�": + elif len(text) >= 3 and text[-3:] == chr(65533): # Don't print incomplete text. pass elif len(text) > self.print_len: