From 654ea83729197b930236463512850f1224010f57 Mon Sep 17 00:00:00 2001 From: Lukas Garbas Date: Sun, 1 Dec 2024 22:32:53 +0100 Subject: [PATCH] Add abstract class for transferability metrics --- tests/estimators/test_knn.py | 6 +- transformer_ranker/estimators/__init__.py | 4 +- transformer_ranker/estimators/base.py | 20 ++++ transformer_ranker/estimators/hscore.py | 23 +++-- transformer_ranker/estimators/logme.py | 16 ++-- .../estimators/nearestneighbors.py | 44 ++++++--- transformer_ranker/ranker.py | 95 +++++++------------ 7 files changed, 113 insertions(+), 95 deletions(-) create mode 100644 transformer_ranker/estimators/base.py diff --git a/tests/estimators/test_knn.py b/tests/estimators/test_knn.py index f2d4029..53c4934 100644 --- a/tests/estimators/test_knn.py +++ b/tests/estimators/test_knn.py @@ -3,7 +3,7 @@ import pytest import torch -from transformer_ranker.estimators import KNN +from transformer_ranker.estimators import NearestNeighbors def sample_data( @@ -40,7 +40,7 @@ def sample_data( @pytest.mark.parametrize("k,dim", [(6, 1024), (10, 100), (100, 256), (1024, 16)]) def test_knn_on_constructed_data(k, dim): features, labels, expected_accuracy = sample_data(k=k, dim=dim) - estimator = KNN(k) + estimator = NearestNeighbors(k=k) accuracy = estimator.fit(features, labels) @@ -57,6 +57,6 @@ def test_knn_on_constructed_data(k, dim): ], ) def test_knn_on_iris(iris_dataset, k, expected_accuracy): - e = KNN(k) + e = NearestNeighbors(k=k) score = e.fit(iris_dataset["data"], iris_dataset["target"]) assert score == pytest.approx(expected_accuracy) diff --git a/transformer_ranker/estimators/__init__.py b/transformer_ranker/estimators/__init__.py index 76cfe3a..a221397 100644 --- a/transformer_ranker/estimators/__init__.py +++ b/transformer_ranker/estimators/__init__.py @@ -1,3 +1,5 @@ from .hscore import HScore from .logme import LogME -from .nearestneighbors import KNN +from .nearestneighbors import NearestNeighbors + +__all__ = ["HScore", "LogME", "NearestNeighbors"] diff --git a/transformer_ranker/estimators/base.py b/transformer_ranker/estimators/base.py new file mode 100644 index 0000000..7754f70 --- /dev/null +++ b/transformer_ranker/estimators/base.py @@ -0,0 +1,20 @@ +from abc import ABC, abstractmethod +from typing import Optional + +import torch + + +class Estimator(ABC): + """Abstract base class for transferability metrics.""" + def __init__(self, regression: bool, **kwargs): + self.regression: bool = regression + self.score: Optional[float] = None + + @abstractmethod + def fit(self, *, embeddings: torch.tensor, labels: torch.tensor, **kwargs) -> float: + """Compute score given embeddings and labels. + + :param embeddings: Embedding tensor (num_samples, num_features) + :param labels: label tensor (num_samples,) + """ + pass diff --git a/transformer_ranker/estimators/hscore.py b/transformer_ranker/estimators/hscore.py index 5ec3ccf..ab8c3d2 100644 --- a/transformer_ranker/estimators/hscore.py +++ b/transformer_ranker/estimators/hscore.py @@ -1,23 +1,30 @@ +import warnings + import torch +from .base import Estimator + -class HScore: - def __init__(self): +class HScore(Estimator): + def __init__(self, regression: bool = False): """ Regularized H-Score estimator. - Original H-score paper: https://arxiv.org/abs/2212.10082 + Paper: https://arxiv.org/abs/2212.10082 Shrinkage-based (regularized) H-Score: https://openreview.net/pdf?id=iz_Wwmfquno """ - self.score = None + if regression: + warnings.warn("HScore is not suitable for regression tasks.", UserWarning) + + super().__init__(regression=regression) def fit(self, embeddings: torch.Tensor, labels: torch.Tensor) -> float: """ - H-score intuition: Higher variance between embeddings of different classes + H-score intuition: higher variance between embeddings of different classes (mean vectors for each class) and lower feature redundancy (i.e. inverse of the covariance matrix for all data points) lead to better transferability. - :param embeddings: Embedding matrix of shape (num_samples, hidden_size) - :param labels: Label vector of shape (num_samples,) + :param embeddings: Embedding tensor (num_samples, hidden_size) + :param labels: Label tensor (num_samples,) :return: H-score, where higher is better. """ # Center all embeddings @@ -26,7 +33,7 @@ def fit(self, embeddings: torch.Tensor, labels: torch.Tensor) -> float: # Number of samples, hidden size (i.e. embedding length), number of classes num_samples, hidden_size = embeddings.size() - classes, class_counts = torch.unique(labels, return_counts=True) + classes, _ = torch.unique(labels, return_counts=True) num_classes = len(classes) # Feature covariance matrix (hidden_size x hidden_size) diff --git a/transformer_ranker/estimators/logme.py b/transformer_ranker/estimators/logme.py index e0de6c5..084ab7d 100644 --- a/transformer_ranker/estimators/logme.py +++ b/transformer_ranker/estimators/logme.py @@ -1,17 +1,15 @@ -from typing import Optional import torch +from .base import Estimator -class LogME: + +class LogME(Estimator): def __init__(self, regression: bool = False): """ LogME (Log of Maximum Evidence) estimator. Paper: https://arxiv.org/abs/2102.11005 - - :param regression: Boolean flag if the task is regression. """ - self.regression = regression - self.score: Optional[float] = None + super().__init__(regression=regression) def fit( self, @@ -27,8 +25,8 @@ def fit( the prior (alpha) and likelihood (beta), projecting the target labels onto the singular vectors of the feature matrix. - :param embeddings: Embedding matrix of shape (num_samples, hidden_dim) - :param labels: Label vector of shape (num_samples,) + :param embeddings: Embedding tensor (num_samples, hidden_dim) + :param labels: Label tensor (num_samples,) :param initial_alpha: Initial precision of the prior (controls the regularization strength) :param initial_beta: Initial precision of the likelihood (controls the noise in the data) :param tol: Tolerance for the optimization convergence @@ -44,7 +42,7 @@ def fit( # Get the number of samples, number of classes, and the hidden size num_samples, hidden_size = embeddings.shape - class_names, counts = torch.unique(labels, return_counts=True) + class_names, _ = torch.unique(labels, return_counts=True) num_classes = labels.shape[1] if self.regression else len(class_names) # SVD on the features diff --git a/transformer_ranker/estimators/nearestneighbors.py b/transformer_ranker/estimators/nearestneighbors.py index 3eda18c..4b4bbfe 100644 --- a/transformer_ranker/estimators/nearestneighbors.py +++ b/transformer_ranker/estimators/nearestneighbors.py @@ -1,13 +1,18 @@ -from typing import Union, Optional + +from typing import Optional, Union + import torch from torchmetrics.classification import BinaryF1Score, MulticlassF1Score +from torch.nn.functional import cosine_similarity +from .base import Estimator -class KNN: + +class NearestNeighbors(Estimator): def __init__( self, - k: int = 3, regression: bool = False, + k: int = 3, ): """ K-Nearest Neighbors estimator. @@ -15,29 +20,42 @@ def __init__( :param k: Number of nearest neighbors to consider. :param regression: Boolean flag if the task is regression. """ - self.k = k - self.regression = regression - self.score: Optional[float] = None + super().__init__(regression=regression) + + self.k = k # number of neighbors + self.distance_metrics = { + 'euclidean': lambda x, y: torch.cdist(x, y, p=2), + 'cosine': lambda x, y: 1 - cosine_similarity(x[:, None, :], y[None, :, :], dim=-1) + } - def fit(self, embeddings: torch.Tensor, labels: torch.Tensor, batch_size: int = 1024) -> float: + def fit( + self, + embeddings: torch.Tensor, + labels: torch.Tensor, + batch_size: int = 1024, + distance_metric: str = 'euclidean', + ) -> float: """ - Estimate embedding suitability for classification or regression using nearest neighbors + Evaluate embeddings using kNN. Distance and topk computations are done in batches. - :param embeddings: Embedding matrix of shape (n_samples, hidden_size) - :param labels: Label vector of shape (n_samples,) + :param embeddings: Embedding tensor (n_samples, hidden_size) + :param labels: Label tensor (n_samples,) :param batch_size: Batch size for distance and top-k computation in chunks - :return: Score (F1 score for classification or Pearson correlation for regression) + :param distance_metric: Metric to use for distance computation 'euclidean', 'cosine' + :return: F1-micro score (for classification) or Pearson correlation (for regression) """ num_samples = embeddings.size(0) num_classes = len(torch.unique(labels)) knn_indices = torch.zeros((num_samples, self.k), dtype=torch.long, device=embeddings.device) + distance_func = self.distance_metrics.get(distance_metric) + for start in range(0, num_samples, batch_size): end = min(start + batch_size, num_samples) batch_features = embeddings[start:end] - # Euclidean distances between the batch and all other features - dists = torch.cdist(batch_features, embeddings, p=2) + # Distances between the batch and all other features + dists = distance_func(batch_features, embeddings) # Exclude self-distances by setting diagonal to a large number diag_indices = torch.arange(start, end, device=embeddings.device) diff --git a/transformer_ranker/ranker.py b/transformer_ranker/ranker.py index 37503a6..7b9a458 100644 --- a/transformer_ranker/ranker.py +++ b/transformer_ranker/ranker.py @@ -7,7 +7,7 @@ from .datacleaner import DatasetCleaner, TaskCategory from .embedder import Embedder -from .estimators import KNN, HScore, LogME +from .estimators import LogME, HScore, NearestNeighbors from .utils import Result, configure_logger logger = configure_logger("transformer_ranker", logging.INFO) @@ -23,15 +23,13 @@ def __init__( **kwargs: Any ): """ - Rank language models for different NLP tasks. Embed a part of the dataset and - estimate embedding suitability with transferability metrics like hscore or logme. - Embeddings can be averaged across all layers or selected from the best-performing layer. + Prepare dataset and transferability metrics. - :param dataset: a dataset from huggingface, containing texts and label columns. + :param dataset: a dataset from huggingface with texts and labels. :param dataset_downsample: a fraction to which the dataset should be reduced. - :param kwargs: Additional dataset-specific parameters for data cleaning. + :param kwargs: additional dataset-specific parameters for data cleaning. """ - # Clean the original dataset and keep only needed columns + # Prepare dataset, downsample it datacleaner = DatasetCleaner( dataset_downsample=dataset_downsample, text_column=text_column, @@ -45,6 +43,13 @@ def __init__( self.texts, self.labels, self.task_category = datacleaner.prepare_dataset(dataset) + # Supported metrics + self.transferability_metrics = { + 'logme': LogME, + 'hscore': HScore, + 'knn': NearestNeighbors, + } + def run( self, models: list[Union[str, torch.nn.Module]], @@ -57,9 +62,10 @@ def run( **kwargs: Any ): """ - Load models, get embeddings, score, and rank results. + Load models, iterate through each to gather embeddings and score them. + Embeddings can be averaged across all layers or selected from the best scoring layer. - :param models: A list of model names string identifiers + :param models: A list of model names :param batch_size: The number of samples to process in each batch, defaults to 32. :param estimator: Transferability metric (e.g., 'hscore', 'logme', 'knn'). :param layer_aggregator: Which layer to select (e.g., 'layermean', 'bestlayer'). @@ -81,22 +87,25 @@ def run( if gpu_estimation: self.labels = self.labels.to(device) + # Set transferability metric + regression = self.task_category == TaskCategory.TEXT_REGRESSION + metric = self.transferability_metrics[estimator](regression=regression) + # Store all results in a dictionary ranking_results = Result(metric=estimator) - # Iterate over each transformer model and score it + # Iterate over each model and score it for model in models: - # Select transformer layers: last layer or all layers + # Select model layers: last layer or all layers layer_ids = "-1" if layer_aggregator == "lastlayer" else "all" layer_pooling = "mean" if "mean" in layer_aggregator else None - # Set effective sentence pooling as parameter effective_sentence_pooling = ( None if self.task_category == TaskCategory.TOKEN_CLASSIFICATION else sentence_pooling ) - # Prepare the transformer embedder with word, sentence, and layer pooling + # Prepare embedder with word, sentence, and layer pooling embedder = Embedder( model=model, layer_ids=layer_ids, @@ -106,7 +115,7 @@ def run( **kwargs, ) - # Gather models embeddings for the dataset + # Gather embeddings embeddings = embedder.embed( self.texts, batch_size=batch_size, @@ -114,7 +123,7 @@ def run( move_embeddings_to_cpu=not gpu_estimation, ) - # Transferability for word tasks: all word embeddings in one list + # Prepare all embeddings in one list if self.task_category == TaskCategory.TOKEN_CLASSIFICATION: embeddings = [word for sentence in embeddings for word in sentence] @@ -122,7 +131,7 @@ def run( embedded_layer_ids = embedder.layer_ids num_layers = embeddings[0].size(0) - # Remove transformer model from memory after embeddings are extracted + # Remove model from memory del embedder torch.cuda.empty_cache() @@ -132,7 +141,7 @@ def run( for layer_id in tqdm( range(num_layers), desc="Transferability Score", bar_format=tqdm_bar_format ): - # Get the position of the layer index + # Get the position of layer index layer_index = embedded_layer_ids[layer_id] # Stack embeddings for that layer @@ -140,13 +149,9 @@ def run( [word_embedding[layer_index] for word_embedding in embeddings] ) - # Estimate score using layer embeddings and labels - score = self._estimate_score( - estimator=estimator, - embeddings=layer_embeddings, - labels=self.labels, - ) - layer_scores.append(score) + # Estimate transferability + score = metric.fit(embeddings=layer_embeddings, labels=self.labels) + layer_scores.append(round(score, 4)) # Store scores for each layer in the result dictionary ranking_results.layerwise_scores[model_name] = dict( @@ -159,8 +164,6 @@ def run( # Log the final score along with scores for each layer result_log = f"{model_name} estimation: {final_score} ({ranking_results.metric})" - - # Log scores for layer ranking if layer_aggregator == "bestlayer": result_log += f", scores for each layer: {ranking_results.layerwise_scores[model_name]}" @@ -172,7 +175,7 @@ def run( def _preload_transformers( models: list[Union[str, torch.nn.Module]], device: Optional[str] = None ) -> None: - """Loads all models into HuggingFace cache""" + """Load models to HuggingFace cache""" cached_models, download_models = [], [] for model_name in models: try: @@ -188,47 +191,17 @@ def _preload_transformers( Embedder(model_name, device=device) def _confirm_ranker_setup(self, estimator, layer_aggregator) -> None: - """Validate estimator and layer selection setup""" - valid_estimators = ["hscore", "logme", "knn"] + """Validate estimator and layer pooling""" + valid_estimators = self.transferability_metrics.keys() if estimator not in valid_estimators: raise ValueError( f"Unsupported estimation method: {estimator}. " f"Use one of the following {valid_estimators}" ) - valid_layer_aggregators = ["layermean", "lastlayer", "bestlayer"] - if layer_aggregator not in valid_layer_aggregators: + valid_layer_pooling = ["layermean", "lastlayer", "bestlayer"] + if layer_aggregator not in valid_layer_pooling: raise ValueError( f"Unsupported layer pooling: {layer_aggregator}. " f"Use one of the following {valid_layer_aggregators}" ) - - valid_task_categories = ["text classification", "token classification", "text regression"] - if self.task_category not in valid_task_categories: - raise ValueError( - "Unable to determine task type of the dataset. Please specify it as a parameter: " - 'task_category= "text classification", "token classification", or ' - '"text regression"' - ) - - if self.task_category == TaskCategory.TEXT_REGRESSION and estimator == "hscore": - supported_estimators = [est for est in valid_estimators if est != "hscore"] - raise ValueError( - f'"{estimator}" does not support text regression. ' - f"Use one of the following estimators: {supported_estimators}" - ) - - def _estimate_score(self, estimator, embeddings: torch.Tensor, labels: torch.Tensor) -> float: - """Use an estimator to score a transformer""" - regression = self.task_category == TaskCategory.TEXT_REGRESSION - - estimator_classes = { - "knn": KNN(k=3, regression=regression), - "logme": LogME(regression=regression), - "hscore": HScore(), - } - - estimator = estimator_classes[estimator] - score = estimator.fit(embeddings=embeddings, labels=labels) - - return round(score, 4)