Skip to content

Commit

Permalink
Restore BpeVocab and SentencePieceVocab classes
Browse files Browse the repository at this point in the history
- Restored the BpeVocab class for handling BPE tokenization.
- Restored the SentencePieceVocab class for SentencePiece tokenization.

These classes are essential for maintaining the original behavior of the codebase.
  • Loading branch information
teleprint-me committed Jan 8, 2024
1 parent 15e1897 commit 3ca2b10
Showing 1 changed file with 129 additions and 0 deletions.
129 changes: 129 additions & 0 deletions convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,135 @@ def load(model_plus: ModelPlus) -> "Params":
return params


class BpeVocab: # GPT
def __init__(
self, fname_tokenizer: Path, fname_added_tokens: Optional[Path]
) -> None:
self.bpe_tokenizer = json.loads(
open(str(fname_tokenizer), encoding="utf-8").read()
)
added_tokens: dict[str, int]
if fname_added_tokens is not None:
# FIXME: Verify that added tokens here _cannot_ overlap with the main vocab.
added_tokens = json.load(open(fname_added_tokens, encoding="utf-8"))
else:
# Fall back to trying to find the added tokens in tokenizer.json
tokenizer_json_file = fname_tokenizer.parent / "tokenizer.json"
if not tokenizer_json_file.is_file():
added_tokens = {}
else:
tokenizer_json = json.load(open(tokenizer_json_file, encoding="utf-8"))
added_tokens = dict(
(item["content"], item["id"])
for item in tokenizer_json.get("added_tokens", [])
# Added tokens here can be duplicates of the main vocabulary.
if item["content"] not in self.bpe_tokenizer
)

vocab_size: int = len(self.bpe_tokenizer)
expected_ids = list(range(vocab_size, vocab_size + len(added_tokens)))
actual_ids = sorted(added_tokens.values())
if expected_ids != actual_ids:
expected_end_id = vocab_size + len(actual_ids) - 1
raise Exception(
f"Expected the {len(actual_ids)} added token ID(s) to be sequential in the range {vocab_size} - {expected_end_id}; got {actual_ids}"
)

items = sorted(added_tokens.items(), key=lambda text_idx: text_idx[1])
self.added_tokens_list = [text for (text, idx) in items]
self.vocab_size_base: int = vocab_size
self.vocab_size: int = self.vocab_size_base + len(self.added_tokens_list)
self.fname_tokenizer = fname_tokenizer
self.fname_added_tokens = fname_added_tokens

def bpe_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
tokenizer = self.bpe_tokenizer
reverse_vocab = {id: encoded_tok for encoded_tok, id in tokenizer.items()}

for i, _ in enumerate(tokenizer):
yield reverse_vocab[i], 0.0, gguf.TokenType.NORMAL

def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
for text in self.added_tokens_list:
score = -1000.0
yield text.encode("utf-8"), score, gguf.TokenType.CONTROL

def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
yield from self.bpe_tokens()
yield from self.added_tokens()

def __repr__(self) -> str:
return f"<BpeVocab with {self.vocab_size_base} base tokens and {len(self.added_tokens_list)} added tokens>"


class SentencePieceVocab: # LlaMa
def __init__(
self, fname_tokenizer: Path, fname_added_tokens: Optional[Path]
) -> None:
self.sentencepiece_tokenizer = SentencePieceProcessor(str(fname_tokenizer))
added_tokens: dict[str, int]
if fname_added_tokens is not None:
added_tokens = json.load(open(fname_added_tokens, encoding="utf-8"))
else:
added_tokens = {}

vocab_size: int = self.sentencepiece_tokenizer.vocab_size()

new_tokens = {
id: piece for piece, id in added_tokens.items() if id >= vocab_size
}
expected_new_ids = list(range(vocab_size, vocab_size + len(new_tokens)))
actual_new_ids = sorted(new_tokens.keys())

if expected_new_ids != actual_new_ids:
raise ValueError(
f"Expected new token IDs {expected_new_ids} to be sequential; got {actual_new_ids}"
)

# Token pieces that were added to the base vocabulary.
self.added_tokens_list = [new_tokens[id] for id in actual_new_ids]
self.vocab_size_base = vocab_size
self.vocab_size = self.vocab_size_base + len(self.added_tokens_list)
self.fname_tokenizer = fname_tokenizer
self.fname_added_tokens = fname_added_tokens

def sentencepiece_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
tokenizer = self.sentencepiece_tokenizer
for i in range(tokenizer.vocab_size()):
piece = tokenizer.id_to_piece(i)
text: bytes = piece.encode("utf-8")
score: float = tokenizer.get_score(i)

toktype = gguf.TokenType.NORMAL
if tokenizer.is_unknown(i):
toktype = gguf.TokenType.UNKNOWN
if tokenizer.is_control(i):
toktype = gguf.TokenType.CONTROL

# NOTE: I think added_tokens are user defined.
# ref: https://github.com/google/sentencepiece/blob/master/src/sentencepiece_model.proto
# if tokenizer.is_user_defined(i): toktype = gguf.TokenType.USER_DEFINED

if tokenizer.is_unused(i):
toktype = gguf.TokenType.UNUSED
if tokenizer.is_byte(i):
toktype = gguf.TokenType.BYTE

yield text, score, toktype

def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
for text in self.added_tokens_list:
score = -1000.0
yield text.encode("utf-8"), score, gguf.TokenType.USER_DEFINED

def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
yield from self.sentencepiece_tokens()
yield from self.added_tokens()

def __repr__(self) -> str:
return f"<SentencePieceVocab with {self.vocab_size_base} base tokens and {len(self.added_tokens_list)} added tokens>"


class VocabLoader:
def __init__(self, params: Params, fname_tokenizer: Path) -> None:
try:
Expand Down

0 comments on commit 3ca2b10

Please sign in to comment.