diff --git a/transformer_ranker/datacleaner.py b/transformer_ranker/datacleaner.py index c323b97..79a4065 100644 --- a/transformer_ranker/datacleaner.py +++ b/transformer_ranker/datacleaner.py @@ -1,5 +1,5 @@ import logging -from typing import Dict, List, Optional, Set, Tuple, Type, Union +from typing import Dict, List, Optional, Tuple, Type, Union import datasets import torch @@ -8,7 +8,7 @@ from .utils import configure_logger -logger = configure_logger('transformer_ranker', logging.INFO) +logger = configure_logger("transformer_ranker", logging.INFO) class DatasetCleaner: @@ -26,18 +26,19 @@ def __init__( text_pair_column: Optional[str] = None, ): """ - Prepare huggingface dataset. Identify task type, find text and label columns, down-sample, merge data splits. + Prepare huggingface dataset. Identify task category, find text and label columns, + merge data splits, down-sample, prepare texts and labels. - :param pre_tokenizer: Pre-tokenizer to use, such as Whitespace from huggingface pre-tokenizers. + :param pre_tokenizer: Pre-tokenizer to use, such as Whitespace from huggingface. :param merge_data_splits: Whether to merge train, dev, and test splits into one. - :param change_bio_encoding: Change BIO encoding to single class labels by removing B-, I-, O- prefixes + :param change_bio_encoding: Convert BIO to single-class labels, removing B-, I-, O- prefix. :param remove_empty_sentences: Whether to remove empty sentences. :param dataset_downsample: Fraction to reduce the dataset size. - :param task_type: Task category (e.g., 'token classification', 'text classification', 'text regression'). + :param task_type: Task category "token classification", "text classification", "text regression". :param text_column: Column name for texts. :param label_column: Column name for labels. :param label_map: A dictionary which maps label names to integers. - :param text_pair_column: Column name where the second text pair is stored (for entailment-like tasks) + :param text_pair_column: Column name for second text (for entailment tasks). """ self.pre_tokenizer = pre_tokenizer self.merge_data_splits = merge_data_splits @@ -51,17 +52,21 @@ def __init__( self.text_pair_column = text_pair_column self.dataset_size = 0 - def prepare_dataset(self, dataset: Union[str, DatasetDict, Dataset]) -> Union[Dataset, DatasetDict]: + def prepare_dataset( + self, dataset: Union[str, DatasetDict, Dataset] + ) -> Union[Dataset, DatasetDict]: """Preprocess a dataset, leave only needed columns, down-sample :param dataset: dataset from huggingface. It can be one of the following: a DatasetDict (containing multiple splits) or a single dataset split (e.g., Dataset) - :return: Return clean and preprocessed dataset, that can be later used in the transformer-ranker + :return: Return clean and preprocessed dataset, that can be used in the transformer-ranker """ # Load huggingface dataset - dataset = datasets.load_dataset(dataset, trust_remote_code=True) if isinstance(dataset, str) else dataset + if isinstance(dataset, str): + dataset = datasets.load_dataset(dataset, trust_remote_code=True) + else: + dataset = dataset - # Ensure the dataset is either a DatasetDict (multiple splits) or a Dataset (single split) if not isinstance(dataset, (DatasetDict, Dataset)): raise ValueError( "The dataset must be an instance of either DatasetDict (for multiple splits) " @@ -69,24 +74,25 @@ def prepare_dataset(self, dataset: Union[str, DatasetDict, Dataset]) -> Union[Da ) # Clone the dataset to avoid changing the original one - dataset = dataset.map(lambda x: x, load_from_cache_file=False, desc="Cloning the dataset") + dataset = dataset.map(lambda x: x, load_from_cache_file=False, desc="Cloning dataset") if self.merge_data_splits and isinstance(dataset, DatasetDict): dataset = self._merge_data_splits(dataset) # Find text and label columns - text_column, label_column, label_type = self._find_text_and_label_columns(dataset, - self.text_column, - self.label_column) + text_column, label_column, label_type = self._find_text_and_label_columns( + dataset, self.text_column, self.label_column + ) # Find task category based on label type - task_type = self._find_task_type(label_column, label_type) if not self.task_type else self.task_type + if not self.task_type: + task_type = self._find_task_type(label_column, label_type) + else: + task_type = self.task_type - # Clean the dataset by removing empty sentences and empty/negative labels if self.remove_empty_sentences: dataset = self._remove_empty_rows(dataset, text_column, label_column) - # Down-sample the original dataset if self.dataset_downsample: dataset = self._downsample(dataset, ratio=self.dataset_downsample) @@ -96,7 +102,9 @@ def prepare_dataset(self, dataset: Union[str, DatasetDict, Dataset]) -> Union[Da # Concatenate text columns for text-pair tasks if self.text_pair_column: - dataset, text_column = self._merge_textpairs(dataset, text_column, self.text_pair_column) + dataset, text_column = self._merge_textpairs( + dataset, text_column, self.text_pair_column + ) # Convert string labels to integers if label_type == str: @@ -108,9 +116,11 @@ def prepare_dataset(self, dataset: Union[str, DatasetDict, Dataset]) -> Union[Da # Remove BIO prefixes for ner or chunking tasks if task_type == "token classification" and self.change_bio_encoding: - dataset, self.label_map = self._change_bio_encoding(dataset, label_column, self.label_map) + dataset, self.label_map = self._change_bio_encoding( + dataset, label_column, self.label_map + ) - logger.info(f"Label map: {self.label_map}") + logger.info("Label map: %s", self.label_map) # Set updated attributes and log them self.text_column = text_column @@ -130,7 +140,11 @@ def prepare_labels(self, dataset: Dataset) -> torch.Tensor: """Prepare labels as tensors. Flatten labels if they contain lists (for token classification)""" labels = dataset[self.label_column] - labels = [item for sublist in labels for item in sublist] if isinstance(labels[0], list) else labels + labels = ( + [item for sublist in labels for item in sublist] + if isinstance(labels[0], list) + else labels + ) return torch.tensor(labels) def prepare_sentences(self, dataset: Dataset) -> List[str]: @@ -144,49 +158,61 @@ def _downsample(dataset: Dataset, ratio: float) -> Dataset: return dataset @staticmethod - def _find_text_and_label_columns(dataset: Dataset, text_column: Optional[str] = None, - label_column: Optional[str] = None) -> Tuple[str, str, Type]: + def _find_text_and_label_columns( + dataset: Dataset, text_column: Optional[str] = None, label_column: Optional[str] = None + ) -> Tuple[str, str, Type]: """Find text and label columns in hf datasets based on common keywords""" text_columns = [ - 'text', 'sentence', 'token', 'tweet', 'document', 'paragraph', 'description', 'comment', - 'utterance', 'question', 'story', 'context', 'passage', + "text", "sentence", "token", "tweet", "document", "paragraph", "description", + "comment", "utterance", "question", "story", "context", "passage", ] label_columns = [ - 'label', 'ner_tag', 'named_entities', 'entities', 'tag', 'target', 'category', 'class', - 'sentiment', 'polarity', 'emotion', 'rating', 'stance' + "label", "ner_tag", "named_entities", "entities", "tag", "target", "category", + "class", "sentiment", "polarity", "emotion", "rating", "stance", ] column_names = dataset.column_names if not text_column: # Iterate over keywords and check if it exists in the dataset - text_column = next((col for keyword in text_columns for col in column_names if keyword in col), None) + text_column = next( + (col for keyword in text_columns for col in column_names if keyword in col), None + ) if not label_column: - label_column = next((col for keyword in label_columns for col in column_names if keyword in col), None) + label_column = next( + (col for keyword in label_columns for col in column_names if keyword in col), None + ) if not text_column or not label_column: - missing = 'text' if not text_column else 'label' - raise KeyError(f"Can not determine the {missing} column. Specify {missing}_column=\"...\" " - f"from available columns: {column_names}.") + missing = "text" if not text_column else "label" + raise KeyError( + f'Can not determine the {missing} column. Specify {missing}_column="..." ' + f"from available columns: {column_names}." + ) label_type = type(dataset[label_column][0]) return text_column, label_column, label_type @staticmethod - def _merge_textpairs(dataset: Dataset, text_column: str, text_pair_column: str) -> Tuple[Dataset, str]: + def _merge_textpairs( + dataset: Dataset, text_column: str, text_pair_column: str + ) -> Tuple[Dataset, str]: """Concatenate text pairs into a single text using separator token""" - new_text_column_name = text_column + '+' + text_pair_column - print(dataset.column_names) + new_text_column_name = text_column + "+" + text_pair_column if text_pair_column not in dataset.column_names: raise ValueError( f"Text pair column name '{text_pair_column}' can not be found in the dataset. " f"Use one of the following names for tex pair: {dataset.column_names}." ) + def merge_texts(dataset_entry: Dict[str, str]) -> Dict[str, str]: - dataset_entry[text_column] = dataset_entry[text_column] + " [SEP] " + dataset_entry[text_pair_column] + dataset_entry[text_column] = ( + dataset_entry[text_column] + " [SEP] " + dataset_entry[text_pair_column] + ) dataset_entry[new_text_column_name] = dataset_entry.pop(text_column) return dataset_entry + dataset = dataset.map(merge_texts, num_proc=None, desc="Merging text pair columns") return dataset, new_text_column_name @@ -206,18 +232,19 @@ def _find_task_type(label_column: str, label_type: type) -> str: raise ValueError( f"Cannot determine the task type for the label column '{label_column}'. " - f"Expected label types are {list(label_type_to_task_type.keys())}, but got {label_type}." + f"Label types are {list(label_type_to_task_type.keys())}, but got {label_type}." ) @staticmethod def _tokenize(dataset: Dataset, pre_tokenizer: Whitespace, text_column: str) -> Dataset: """Tokenize a dataset using hf pre-tokenizer (e.g. Whitespace)""" + def pre_tokenize(example): encoding = pre_tokenizer.pre_tokenize_str(example[text_column]) example[text_column] = [token for token, _ in encoding] return example - dataset = dataset.map(pre_tokenize, num_proc=None, desc="Pre-tokenizing texts using Whitespace") + dataset = dataset.map(pre_tokenize, num_proc=None, desc="Pre-tokenizing texts with Whitespace") return dataset @staticmethod @@ -227,23 +254,28 @@ def _merge_data_splits(dataset: DatasetDict) -> Dataset: @staticmethod def _remove_empty_rows(dataset: Dataset, text_column: str, label_column: str) -> Dataset: - """Filter out entries with empty or invalid text or labels.""" + """Filter out entries with empty or noisy text or labels.""" + def is_valid_entry(sample) -> bool: text, label = sample[text_column], sample[label_column] - # Check if text is non-empty and does not contain emoji variation char '\uFE0F' - has_text = bool(text) and (not isinstance(text, list) or '\uFE0F' not in text) + # Check if text is non-empty and does not contain emoji variation character '\uFE0F' + has_text = bool(text) and (not isinstance(text, list) or "\uFE0F" not in text) # Check if label is non-null and all elements are non-negative - valid_label = label is not None and (all(l >= 0 for l in label) if isinstance(label, list) else label >= 0) + valid_label = label is not None and ( + all(l >= 0 for l in label) if isinstance(label, list) else label >= 0 + ) return has_text and valid_label - return dataset.filter(is_valid_entry, desc="Removing empty or invalid entries") + return dataset.filter(is_valid_entry, desc="Removing empty rows") @staticmethod - def _make_labels_categorical(dataset: Dataset, label_column: str) -> Tuple[Dataset, Dict[str, int]]: - """Convert string labels in the dataset to categorical integer labels, for classification tasks.""" + def _make_labels_categorical( + dataset: Dataset, label_column: str + ) -> Tuple[Dataset, Dict[str, int]]: + """Convert string labels to integers""" unique_labels = sorted(set(dataset[label_column])) label_map = {label: idx for idx, label in enumerate(unique_labels)} @@ -257,50 +289,56 @@ def map_labels(dataset_entry): @staticmethod def _create_label_map(dataset: Dataset, label_column: str) -> Dict[str, int]: """Try to find feature names in a hf dataset.""" - label_names = ( - getattr(getattr(dataset.features[label_column], 'feature', None), 'names', None) - or getattr(dataset.features[label_column], 'names', None) - ) + label_names = getattr( + getattr(dataset.features[label_column], "feature", None), "names", None + ) or getattr(dataset.features[label_column], "names", None) # If label names are missing, create them manually if not label_names: label_names = sorted( - {str(label) for sublist in dataset[label_column] - for label in (sublist if isinstance(sublist, list) else [sublist])} + { + str(label) + for sublist in dataset[label_column] + for label in (sublist if isinstance(sublist, list) else [sublist]) + } ) return {label: idx for idx, label in enumerate(label_names)} @staticmethod def _change_bio_encoding( - dataset: Dataset, - label_column: str, - label_map: Dict[str, int] + dataset: Dataset, label_column: str, label_map: Dict[str, int] ) -> Tuple[Dataset, Dict[str, int]]: """Remove BIO prefixes from NER labels, update the dataset, and create a new label map.""" # Get unique labels without BIO prefixes and create new label map - unique_labels = set(label.split('-')[-1] for label in label_map) + unique_labels = set(label.split("-")[-1] for label in label_map) new_label_map = {label: idx for idx, label in enumerate(unique_labels)} # Map old ids to new ids - reverse_map = {old_idx: new_label_map[label.split('-')[-1]] for label, old_idx in label_map.items()} + reverse_map = { + old_idx: new_label_map[label.split("-")[-1]] for label, old_idx in label_map.items() + } dataset = dataset.map( - lambda sample: {label_column: [reverse_map[old_idx] for old_idx in sample[label_column]]}, - desc="Removing BIO prefixes" + lambda sample: { + label_column: [reverse_map[old_idx] for old_idx in sample[label_column]] + }, + desc="Removing BIO prefixes", ) # Check if label map was changed if label_map == new_label_map: - logger.warning("Could not remove BIO prefixes for this tagging dataset. " - "Please add the label map as parameter label_map: Dict[str, int] = ... manually.") + logger.warning( + "Could not remove BIO prefixes for this tagging dataset. " + "Please add the label map as parameter label_map: Dict[str, int] = ... manually." + ) return dataset, new_label_map def log_dataset_info(self) -> None: """Log information about dataset""" - logger.info(f"Texts and labels: '{self.text_column}', '{self.label_column}'") - logger.info(f"Task category: '{self.task_type}'") + logger.info("Texts and labels: '%s', '%s'", self.text_column, self.label_column) + logger.info("Task category: '%s'", self.task_type) is_downsampled = self.dataset_downsample and self.dataset_downsample < 1.0 downsample_info = f"(down-sampled to {self.dataset_downsample})" if is_downsampled else "" - logger.info(f"Dataset size: {self.dataset_size} texts {downsample_info}") + logger.info("Dataset size: %s texts %s", self.dataset_size, downsample_info) diff --git a/transformer_ranker/embedder.py b/transformer_ranker/embedder.py index bb9db49..5db5fcd 100644 --- a/transformer_ranker/embedder.py +++ b/transformer_ranker/embedder.py @@ -20,17 +20,17 @@ def __init__( device: Optional[str] = None, ): """ - Embed texts using a pre-trained transformer model. This embedder works at the word level, where - each text is a list of word vectors. It supports different sub-word pooling and sentence pooling options. + Embed texts using a pre-trained transformer model. It's a word-level embedder, where + each text is a list of word embeddings. Supports sub-word and sentence pooling options. ♻️ Feel free to use it if you need a simple implementation for word or text embeddings. - :param model: Model name 'bert-base-uncased' or a model instance e.g. AutoModel.from_pretrained(...) + :param model: Model name 'bert-base-uncased' or a model instance loaded with AutoModel :param tokenizer: Optional tokenizer, either a string name or a tokenizer instance. :param subword_pooling: Method for pooling sub-words into word embeddings. - :param layer_ids: Layers to use e.g., '-1' for the top layer, '-1,-2' for multiple, or 'all'. Default is 'all'. + :param layer_ids: Layers to use e.g., '-1' for the top-most layer or 'all'. :param layer_pooling: Optional method for pooling across selected layers. :param use_pretokenizer: Whether to pre-tokenize texts using whitespace. - :param device: Device for computations, either 'cpu' or 'cuda:0'. Defaults to the available device. + :param device: Device option, either 'cpu' or 'cuda:0'. Defaults to the available device. """ # Load transformer model if isinstance(model, torch.nn.Module): @@ -54,7 +54,7 @@ def __init__( # Add padding token for models that do not have it (e.g. GPT2) if self.tokenizer.pad_token is None: - self.tokenizer.add_special_tokens({'pad_token': '[PAD]'}) + self.tokenizer.add_special_tokens({"pad_token": "[PAD]"}) self.model.resize_token_embeddings(len(self.tokenizer)) # Use whitespace pre-tokenizer if specified @@ -81,14 +81,18 @@ def __init__( def tokenize(self, sentences): """Tokenize sentences using auto tokenizer""" # Handle tokenizers with wrong model_max_length in hf configuration - max_sequence_length = self.tokenizer.model_max_length if self.tokenizer.model_max_length < 1000000 else 512 + max_sequence_length = ( + self.tokenizer.model_max_length if self.tokenizer.model_max_length < 1000000 else 512 + ) # Pre-tokenize sentences using hf whitespace tokenizer if self.pre_tokenizer and isinstance(sentences[0], str): - sentences = [[word for word, word_offsets in self.pre_tokenizer.pre_tokenize_str(sentence)] - for sentence in sentences] + sentences = [ + [word for word, word_offsets in self.pre_tokenizer.pre_tokenize_str(sentence)] + for sentence in sentences + ] - is_split_into_words = False if isinstance(sentences[0], str) else True + is_split_into_words = not isinstance(sentences[0], str) # Use model-specific tokenizer and return output as tensors return self.tokenizer( @@ -105,21 +109,21 @@ def embed( sentences, batch_size: int = 32, show_loading_bar: bool = True, - move_embeddings_to_cpu: bool = True + move_embeddings_to_cpu: bool = True, ) -> List[torch.Tensor]: """Split sentences into batches and embedd the full dataset""" if not isinstance(sentences, list): sentences = [sentences] - batches = [sentences[i:i + batch_size] for i in range(0, len(sentences), batch_size)] + batches = [sentences[i : i + batch_size] for i in range(0, len(sentences), batch_size)] embeddings = [] - tqdm_bar_format = '{l_bar}{bar:10}{r_bar}{bar:-10b}' + tqdm_bar_format = "{l_bar}{bar:10}{r_bar}{bar:-10b}" for batch in tqdm( batches, desc="Retrieving Embeddings", disable=not show_loading_bar, - bar_format=tqdm_bar_format + bar_format=tqdm_bar_format, ): embeddings.extend(self.embed_batch(batch, move_embeddings_to_cpu)) @@ -149,7 +153,7 @@ def embed_batch(self, sentences, move_embeddings_to_cpu: bool = True) -> List[to # Multiply embeddings by attention mask to have padded tokens as 0 embeddings = embeddings * attention_mask.unsqueeze(-1).unsqueeze(-1) - # Extract layers defined by layer_ids, average all layers for a batch of sentences if specified + # Extract and average specified layers embeddings = self._extract_relevant_layers(embeddings) # Go through each sentence separately @@ -163,37 +167,47 @@ def embed_batch(self, sentences, move_embeddings_to_cpu: bool = True) -> List[to word_embeddings = torch.stack(word_embedding_list, dim=0) # Pool word-level embeddings into a sentence embedding - sentence_embedding = self._pool_words(word_embeddings) if self.sentence_pooling else word_embeddings + sentence_embedding = ( + self._pool_words(word_embeddings) if self.sentence_pooling else word_embeddings + ) sentence_embeddings.append(sentence_embedding) if move_embeddings_to_cpu: - sentence_embeddings = [sentence_embedding.cpu() for sentence_embedding in sentence_embeddings] + sentence_embeddings = [ + sentence_embedding.cpu() for sentence_embedding in sentence_embeddings + ] return sentence_embeddings def _filter_layer_ids(self, layer_ids) -> List[int]: """Transform a string with layer ids into a list of ints and - remove ids that are out of bound of the actual transformer size""" + remove ids that are out of bound of the actual transformer size""" if layer_ids == "all": return [-i for i in range(1, self.num_transformer_layers + 1)] layer_ids = [int(number) for number in layer_ids.split(",")] - layer_ids = [layer_id for layer_id in layer_ids if self.num_transformer_layers >= abs(layer_id)] + layer_ids = [ + layer_id for layer_id in layer_ids if self.num_transformer_layers >= abs(layer_id) + ] if not layer_ids: - raise ValueError(f"\"layer_ids\" are out of bounds for the model size. " - f"Num layers in model {self.model_name}: {self.num_transformer_layers}") + raise ValueError( + f'"layer_ids" are out of bounds for the model size. ' + f"Num layers in model {self.model_name}: {self.num_transformer_layers}" + ) return layer_ids def _extract_relevant_layers(self, batched_embeddings: torch.Tensor) -> torch.Tensor: """Keep only relevant layers in each embedding and apply layer-wise pooling if required""" # Use positive layer ids ('-1 -> 23' is the last layer in a 24 layer model) - layer_ids = sorted((layer_id if layer_id >= 0 else self.num_transformer_layers + layer_id) - for layer_id in self.layer_ids) + layer_ids = sorted( + (layer_id if layer_id >= 0 else self.num_transformer_layers + layer_id) + for layer_id in self.layer_ids + ) - # A batch of embeddings is in this shape (batch_size, sequence_length, num_layers, hidden_size) + # Embeddings shape: (batch_size, seq_len, num_layers, hidden_size) batched_embeddings = batched_embeddings[:, :, layer_ids, :] # keep only selected layers # average all layers @@ -233,7 +247,9 @@ def _pool_subwords(self, sentence_embedding, sentence_word_ids) -> List[torch.Te word_embeddings = [word_embedding[-1] for word_embedding in word_embeddings] if self.subword_pooling == "mean": - word_embeddings = [torch.mean(word_embedding, dim=0) for word_embedding in word_embeddings] + word_embeddings = [ + torch.mean(word_embedding, dim=0) for word_embedding in word_embeddings + ] return word_embeddings @@ -242,7 +258,7 @@ def _pool_words(self, word_embeddings: torch.Tensor) -> torch.Tensor: Subword pooling methods: 'first', 'last', 'mean', 'weighted_mean'""" sentence_embedding = torch.zeros_like(word_embeddings[0]) - # Use the first word as sentence embedding: for models that use CLS token in pre-training + # Use first word as sentence embedding: for models that use CLS in pre-training if self.sentence_pooling == "first": sentence_embedding = word_embeddings[0] diff --git a/transformer_ranker/ranker.py b/transformer_ranker/ranker.py index ff94ff2..584d247 100644 --- a/transformer_ranker/ranker.py +++ b/transformer_ranker/ranker.py @@ -10,7 +10,7 @@ from .estimators import KNN, HScore, LogME from .utils import Result, configure_logger -logger = configure_logger('transformer_ranker', logging.INFO) +logger = configure_logger("transformer_ranker", logging.INFO) class TransformerRanker: @@ -20,23 +20,24 @@ def __init__( dataset_downsample: Optional[float] = None, text_column: Optional[str] = None, label_column: Optional[str] = None, - **kwargs + **kwargs, ): """ Rank language models for different NLP tasks. Embed a part of the dataset and estimate embedding suitability with transferability metrics like hscore or logme. - Embeddings can either be averaged across all layers or selected from the best-performing layer. + Embeddings can be averaged across all layers or selected from the best-performing layer. :param dataset: a dataset from huggingface, containing texts and label columns. :param dataset_downsample: a fraction to which the dataset should be reduced. :param kwargs: Additional dataset-specific parameters for data cleaning. """ # Clean the original dataset and keep only needed columns - self.data_handler = DatasetCleaner(dataset_downsample=dataset_downsample, - text_column=text_column, - label_column=label_column, - **kwargs, - ) + self.data_handler = DatasetCleaner( + dataset_downsample=dataset_downsample, + text_column=text_column, + label_column=label_column, + **kwargs, + ) self.dataset = self.data_handler.prepare_dataset(dataset) @@ -55,20 +56,20 @@ def run( sentence_pooling: str = "mean", device: Optional[str] = None, gpu_estimation: bool = True, - **kwargs + **kwargs, ): """ - The run method loads the models, gathers embeddings for each, scores them, and sorts the results to rank them. + Load models, get embeddings, score, and rank results. :param models: A list of model names string identifiers :param batch_size: The number of samples to process in each batch, defaults to 32. - :param estimator: A metric to assess model performance (e.g., 'hscore', 'logme', 'knn'). + :param estimator: Transferability metric (e.g., 'hscore', 'logme', 'knn'). :param layer_aggregator: Which layer to select (e.g., 'layermean', 'bestlayer'). - :param sentence_pooling: Parameter for embedder class, telling how to pool words into a sentence embedding for + :param sentence_pooling: Embedder parameter for pooling words into a sentence embedding for text classification tasks. Defaults to "mean" to average of all words. - :param device: Device used to embed, defaults to gpu if available (e.g. 'cpu', 'cuda', 'cuda:2'). - :param gpu_estimation: If to store embeddings on gpu and run estimation using gpu for speedup. - :param kwargs: Additional parameters for the embedder class (e.g. subword-pooling) + :param device: Device for embedding, defaults to GPU if available ('cpu', 'cuda', 'cuda:2'). + :param gpu_estimation: Store and score embeddings on GPU for speedup. + :param kwargs: Additional parameters for the embedder class (e.g. subword pooling) :return: Returns the sorted dictionary of model names and their scores """ self._confirm_ranker_setup(estimator=estimator, layer_aggregator=layer_aggregator) @@ -82,12 +83,14 @@ def run( # Iterate over each transformer model and score it for model in models: - # Select transformer layers to be used: last layer (i.e. output layer) or all of the layers + # Select transformer layers: last layer or all layers layer_ids = "-1" if layer_aggregator == "lastlayer" else "all" layer_pooling = "mean" if "mean" in layer_aggregator else None # Sentence pooling is only applied for text classification tasks - effective_sentence_pooling = None if self.task_type == "token classification" else sentence_pooling + effective_sentence_pooling = ( + None if self.task_type == "token classification" else sentence_pooling + ) embedder = Embedder( model=model, @@ -95,14 +98,14 @@ def run( layer_pooling=layer_pooling, sentence_pooling=effective_sentence_pooling, device=device, - **kwargs + **kwargs, ) embeddings = embedder.embed( self.data_handler.prepare_sentences(self.dataset), batch_size=batch_size, show_loading_bar=True, - move_embeddings_to_cpu=False if gpu_estimation else True, + move_embeddings_to_cpu=not gpu_estimation, ) # Single list of embeddings for sequence tagging tasks @@ -122,23 +125,30 @@ def run( # Estimate scores for each layer layer_scores = [] - tqdm_bar_format = '{l_bar}{bar:10}{r_bar}{bar:-10b}' - for layer_id in tqdm(range(num_layers), desc="Transferability Score", bar_format=tqdm_bar_format): + tqdm_bar_format = "{l_bar}{bar:10}{r_bar}{bar:-10b}" + for layer_id in tqdm( + range(num_layers), desc="Transferability Score", bar_format=tqdm_bar_format + ): # Get the position of the layer index layer_index = embedded_layer_ids[layer_id] # Stack embeddings for that layer - layer_embeddings = torch.stack([word_embedding[layer_index] for word_embedding in embeddings]) + layer_embeddings = torch.stack( + [word_embedding[layer_index] for word_embedding in embeddings] + ) # Estimate score using layer embeddings and labels - score = self._estimate_score(estimator=estimator, - embeddings=layer_embeddings, - labels=labels, - ) + score = self._estimate_score( + estimator=estimator, + embeddings=layer_embeddings, + labels=labels, + ) layer_scores.append(score) # Store scores for each layer in the result dictionary - ranking_results.layerwise_scores[model_name] = dict(zip(embedded_layer_ids, layer_scores)) + ranking_results.layerwise_scores[model_name] = dict( + zip(embedded_layer_ids, layer_scores) + ) # Aggregate layer scores final_score = max(layer_scores) if layer_aggregator == "bestlayer" else layer_scores[0] @@ -147,7 +157,7 @@ def run( # Log the final score along with scores for each layer result_log = f"{model_name} estimation: {final_score} ({ranking_results.metric})" - if layer_aggregator == 'bestlayer': + if layer_aggregator == "bestlayer": result_log += f", layerwise scores: {ranking_results.layerwise_scores[model_name]}" logger.info(result_log) @@ -155,7 +165,9 @@ def run( return ranking_results @staticmethod - def _preload_transformers(models: List[Union[str, torch.nn.Module]], device: Optional[str] = None) -> None: + def _preload_transformers( + models: List[Union[str, torch.nn.Module]], device: Optional[str] = None + ) -> None: """Loads all models into HuggingFace cache""" cached_models, download_models = [], [] for model_name in models: @@ -175,23 +187,32 @@ def _confirm_ranker_setup(self, estimator, layer_aggregator) -> None: """Validate estimator and layer selection setup""" valid_estimators = ["hscore", "logme", "knn"] if estimator not in valid_estimators: - raise ValueError(f"Unsupported estimation method: {estimator}. " - f"Use one of the following {valid_estimators}") + raise ValueError( + f"Unsupported estimation method: {estimator}. " + f"Use one of the following {valid_estimators}" + ) valid_layer_aggregators = ["layermean", "lastlayer", "bestlayer"] if layer_aggregator not in valid_layer_aggregators: - raise ValueError(f"Unsupported layer pooling: {layer_aggregator}. " - f"Use one of the following {valid_layer_aggregators}") + raise ValueError( + f"Unsupported layer pooling: {layer_aggregator}. " + f"Use one of the following {valid_layer_aggregators}" + ) valid_task_types = ["text classification", "token classification", "text regression"] if self.task_type not in valid_task_types: - raise ValueError("Unable to determine task type of the dataset. Please specify it as a parameter: " - "task_type= \"text classification\", \"token classification\", or " - "\"text regression\"") + raise ValueError( + "Unable to determine task type of the dataset. Please specify it as a parameter: " + 'task_type= "text classification", "token classification", or ' + '"text regression"' + ) - if self.task_type == 'text regression' and estimator == 'hscore': - raise ValueError(f"\"{estimator}\" does not support text regression. " - f"Use one of the following estimators: {valid_estimators.remove('hscore')}") + if self.task_type == "text regression" and estimator == "hscore": + supported_estimators = [est for est in valid_estimators if est != "hscore"] + raise ValueError( + f'"{estimator}" does not support text regression. ' + f"Use one of the following estimators: {supported_estimators}" + ) def _estimate_score(self, estimator, embeddings: torch.Tensor, labels: torch.Tensor) -> float: """Use an estimator to score a transformer""" diff --git a/transformer_ranker/utils.py b/transformer_ranker/utils.py index 4c405d0..66b51ee 100644 --- a/transformer_ranker/utils.py +++ b/transformer_ranker/utils.py @@ -6,7 +6,7 @@ from transformers import logging as transformers_logging -def prepare_popular_models(model_size='base') -> List[str]: +def prepare_popular_models(model_size="base") -> List[str]: """Two lists of language models to try out""" base_models = [ # English models @@ -52,10 +52,12 @@ def prepare_popular_models(model_size='base') -> List[str]: "KISTI-AI/scideberta", ] - return large_models if model_size == 'large' else base_models + return large_models if model_size == "large" else base_models -def configure_logger(name: str, level: int = logging.INFO, log_to_console: bool = True) -> logging.Logger: +def configure_logger( + name: str, level: int = logging.INFO, log_to_console: bool = True +) -> logging.Logger: """ Configure transformer-ranker logger. @@ -70,7 +72,7 @@ def configure_logger(name: str, level: int = logging.INFO, log_to_console: bool if not logger.handlers and log_to_console: console_handler = logging.StreamHandler() console_handler.setLevel(level) - console_handler.setFormatter(logging.Formatter('transformer_ranker:%(message)s')) + console_handler.setFormatter(logging.Formatter("transformer_ranker:%(message)s")) logger.addHandler(console_handler) # Ignore specific warning messages from transformers and datasets libraries @@ -79,10 +81,12 @@ def configure_logger(name: str, level: int = logging.INFO, log_to_console: bool transformers_logging.set_verbosity_error() # Suppress transformers warning about unused prediction head weights if the model is frozen - logger.addFilter(lambda record: not ( - "Some weights of BertModel were not initialized" in record.getMessage() or - "You should probably TRAIN this model" in record.getMessage() - )) + logger.addFilter( + lambda record: not ( + "Some weights of BertModel were not initialized" in record.getMessage() + or "You should probably TRAIN this model" in record.getMessage() + ) + ) logger.propagate = False return logger @@ -113,11 +117,11 @@ def best_model(self) -> str: @property def top_three(self) -> Dict[str, float]: """Return three highest scoring models""" - return {k: self.results[k] for k in list(self.results.keys())[:min(3, len(self.results))]} + return {k: self.results[k] for k in list(self.results.keys())[: min(3, len(self.results))]} @property def best_layers(self) -> Dict[str, int]: - """Return a dictionary where each key is a model name and the value is the best layer's ID for that model.""" + """Return a dictionary mapping each model name to its best layer ID.""" best_layers_dict = {} for model, values in self.layerwise_scores.items(): best_layer = max(values.items(), key=operator.itemgetter(1))[0] @@ -132,13 +136,18 @@ def append(self, additional_results: "Result") -> None: self._results.update(additional_results.results) self.layerwise_scores.update(additional_results.layerwise_scores) else: - raise ValueError(f"Expected an instance of 'Result', but got {type(additional_results).__name__}. " - f"Only 'Result' instances can be appended.") + raise ValueError( + f"Expected an instance of 'Result', but got {type(additional_results).__name__}. " + f"Only 'Result' instances can be appended." + ) def _format_results(self) -> str: """Helper method to return sorted results as a formatted string.""" sorted_results = sorted(self._results.items(), key=lambda item: item[1], reverse=True) - result_lines = [f"Rank {i + 1}. {model_name}: {score}" for i, (model_name, score) in enumerate(sorted_results)] + result_lines = [ + f"Rank {i + 1}. {model_name}: {score}" + for i, (model_name, score) in enumerate(sorted_results) + ] return "\n".join(result_lines) def __str__(self) -> str: