Skip to content

Commit

Permalink
refactor: Standardize vocabulary handling with HfVocab
Browse files Browse the repository at this point in the history
- Replaced VocabLoader with HfVocab, aligning vocabulary handling across classes.
- Updated initialization of HfVocab with local_files_only=True for AutoTokenizer.
- Introduced optional parameter fname_added_tokens for flexible added token management.
- Streamlined added token handling for clarity and conciseness.
- Maintained special tokens and IDs, enhancing token management.
- Simplified token processing methods for improved readability.
- Added a placeholder for score computation with a default value of -1000.0.
- Optimized newline token check for efficiency.
- Updated __repr__ function for clarity in representation.
- Adjusted type alias Vocab to include BpeVocab, SentencePieceVocab, and HfVocab.
- Removed redundant code related to special token handling, reverse vocabulary mapping, and vocabulary file detection.

This refactoring promotes a standardized and modular approach to vocabulary management, facilitating future integration with a VocabFactory and improving code maintainability and scalability.
  • Loading branch information
teleprint-me committed Jan 8, 2024
1 parent 3ca2b10 commit db4b8ac
Showing 1 changed file with 66 additions and 102 deletions.
168 changes: 66 additions & 102 deletions convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,92 +508,83 @@ def __repr__(self) -> str:
return f"<SentencePieceVocab with {self.vocab_size_base} base tokens and {len(self.added_tokens_list)} added tokens>"


class VocabLoader:
def __init__(self, params: Params, fname_tokenizer: Path) -> None:
try:
from transformers import AutoTokenizer
except ImportError as e:
raise ImportError(
"To use VocabLoader, please install the `transformers` package. "
"You can install it with `pip install transformers`."
) from e

try:
self.tokenizer = AutoTokenizer.from_pretrained(str(fname_tokenizer), trust_remote_code=True)
except ValueError:
self.tokenizer = AutoTokenizer.from_pretrained(str(fname_tokenizer), use_fast=False, trust_remote_code=True)

self.added_tokens_dict: OrderedDict[str, int] = OrderedDict()

for tok, tokidx in sorted(self.tokenizer.get_added_vocab().items(), key=lambda x: x[1]):
if tokidx >= params.n_vocab or tokidx < self.tokenizer.vocab_size:
continue

self.added_tokens_dict[tok] = tokidx
class HfVocab:
def __init__(
self,
fname_tokenizer: Path,
fname_added_tokens: Optional[Path] = None,
) -> None:
print("fname_tokenizer:", fname_tokenizer)
# Allow the tokenizer to default to slow or fast versions.
# Explicitly set tokenizer to use local paths.
self.tokenizer = AutoTokenizer.from_pretrained(
fname_tokenizer,
cache_dir=fname_tokenizer,
local_files_only=True,
)

self.unk_token_id: int = self.tokenizer.unk_token_id
self.specials: dict[str, int] = {
# Initialize lists and dictionaries for added tokens
self.added_tokens_list = []
self.added_tokens_dict = dict()
self.added_tokens_ids = set()

# Process added tokens
for tok, tokidx in sorted(
self.tokenizer.get_added_vocab().items(), key=lambda x: x[1]
):
# Only consider added tokens that are not in the base vocabulary
if tokidx >= self.tokenizer.vocab_size:
self.added_tokens_list.append(tok)
self.added_tokens_dict[tok] = tokidx
self.added_tokens_ids.add(tokidx)

# Store special tokens and their IDs
self.specials = {
tok: self.tokenizer.get_vocab()[tok]
for tok in self.tokenizer.all_special_tokens
}
self.special_ids: set[int] = set(self.tokenizer.all_special_ids)
self.reverse_vocab = {id: encoded_tok for encoded_tok, id in self.tokenizer.get_vocab().items()}
self.vocab_size_base: int = self.tokenizer.vocab_size
self.vocab_size: int = self.vocab_size_base + len(self.added_tokens_dict)
self.fname_tokenizer: Path = fname_tokenizer

vocab_file = "tokenizer.model"
path_candidate = find_vocab_file_path(self.fname_tokenizer, vocab_file)
if path_candidate is not None:
self.spm = SentencePieceProcessor(str(path_candidate))
print(self.spm.vocab_size(), self.vocab_size_base)
else:
self.spm = None
self.special_ids = set(self.tokenizer.all_special_ids)

def hf_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
added_tokens_ids = set(self.added_tokens_dict.values())
# Set vocabulary sizes
self.vocab_size_base = self.tokenizer.vocab_size
self.vocab_size = self.vocab_size_base + len(self.added_tokens_list)

for i in range(self.vocab_size_base):
if i in added_tokens_ids:
continue
self.fname_tokenizer = fname_tokenizer
self.fname_added_tokens = fname_added_tokens

text = self.reverse_vocab[i].encode("utf-8")
yield text, self.get_token_score(i), self.get_token_type(i)
def hf_tokens(self) -> Iterable[Tuple[bytes, float, gguf.TokenType]]:
reverse_vocab = {
id: encoded_tok for encoded_tok, id in self.tokenizer.get_vocab().items()
}

def get_token_type(self, token_id: int) -> gguf.TokenType:
toktype = gguf.TokenType.NORMAL
for token_id in range(self.vocab_size_base):
# Skip processing added tokens here
if token_id in self.added_tokens_ids:
continue

if self.spm is not None and token_id < self.spm.vocab_size():
if self.spm.is_unknown(token_id):
toktype = gguf.TokenType.UNKNOWN
if self.spm.is_control(token_id):
toktype = gguf.TokenType.CONTROL
if self.spm.is_unused(token_id):
toktype = gguf.TokenType.UNUSED
if self.spm.is_byte(token_id):
toktype = gguf.TokenType.BYTE
else:
token = self.reverse_vocab[token_id]
if token_id == self.unk_token_id:
toktype = gguf.TokenType.UNKNOWN
elif token_id in self.special_ids:
toktype = gguf.TokenType.CONTROL
elif len(token) == 6 and token.startswith("<0x") and token.endswith(">"):
toktype = gguf.TokenType.BYTE
# Convert token text to bytes
token_text = reverse_vocab[token_id].encode("utf-8")

# Yield token text, score, and type
yield token_text, self.get_token_score(token_id), self.get_token_type(
token_id, self.special_ids # Reuse already stored special IDs
)

return toktype
def get_token_type(self, token_id: int, special_ids: set) -> gguf.TokenType:
# Determine token type based on whether it's a special token
return (
gguf.TokenType.CONTROL if token_id in special_ids else gguf.TokenType.NORMAL
)

def get_token_score(self, token_id: int) -> float:
if self.spm is not None and token_id < self.spm.vocab_size():
return cast(float, self.spm.get_score(token_id))
return 0.0
# Placeholder for actual logic to determine the token's score
# This needs to be implemented based on specific requirements
return -1000.0 # Default score

def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:

for text in self.added_tokens_dict:
for text in self.added_tokens_list:
if text in self.specials:

toktype = self.get_token_type(self.specials[text])
toktype = self.get_token_type(self.specials[text], self.special_ids)
score = self.get_token_score(self.specials[text])

else:
Expand All @@ -602,45 +593,18 @@ def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:

yield text.encode("utf-8"), score, toktype

def has_newline_token(self) -> bool:
return '<0x0A>' in self.tokenizer.vocab or '\n' in self.tokenizer.vocab
def has_newline_token(self):
return "<0x0A>" in self.tokenizer.vocab or "\n" in self.tokenizer.vocab

def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
yield from self.hf_tokens()
yield from self.added_tokens()

def get_vocab_type(self) -> str:
path_candidates = []
vocab_file = "tokenizer.model"
path_candidates.append(vocab_file)
path_candidate = find_vocab_file_path(self.fname_tokenizer, vocab_file)
if path_candidate is not None:
return "llama"

vocab_file = "vocab.json"
path_candidates.append(vocab_file)
path_candidate = find_vocab_file_path(self.fname_tokenizer, vocab_file)
if path_candidate is not None:
return "gpt2"

vocab_file = "tokenizer.json"
path_candidates.append(vocab_file)
path_candidate = find_vocab_file_path(self.fname_tokenizer, vocab_file)
if path_candidate:
if not self.has_newline_token():
return "gpt2"
return "llama"

raise FileNotFoundError(
f"Could not find {path_candidates} in {self.fname_tokenizer} or its parent; "
"if it's in another directory, pass the directory as --vocab-dir"
)

def __repr__(self) -> str:
return f"<VocabLoader with {self.vocab_size_base} base tokens and {len(self.added_tokens_dict)} added tokens>"
return f"<HfVocab with {self.vocab_size_base} base tokens and {len(self.added_tokens_list)} added tokens>"


Vocab: TypeAlias = 'VocabLoader'
Vocab: TypeAlias = "BpeVocab | SentencePieceVocab | HfVocab"


#
Expand Down

0 comments on commit db4b8ac

Please sign in to comment.