|
import logging |
|
from typing import Any |
|
|
|
import torch |
|
from transformers import PreTrainedModel |
|
|
|
from .parametrized_model import ParametrizedModel, ParametrizedModelConfig |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class ACIPModelConfig(ParametrizedModelConfig): |
|
""" |
|
Configuration for `ACIPModel`. Same functionality as `ParametrizedModelConfig`. |
|
|
|
See Also: |
|
- `ParametrizedModelConfig` |
|
- `ACIPModel` |
|
""" |
|
|
|
model_type = "acip_model" |
|
|
|
|
|
class ACIPModel(ParametrizedModel): |
|
""" |
|
This class extends `ParametrizedModel` by additional functionality required for ACIP. |
|
It manages a `score_map` that stores the scores of the parametrized modules' target parameters, |
|
which are updated during tuning by the ACIP method. |
|
Moreover, it provides `prune_model_by_score` that prunes the target parameters of the model according to |
|
their scores to achieve any given size ratio. |
|
|
|
Notes: The `score_map` is managed in float32 internally because a lower precision may lead to unexpected numerical |
|
inaccuracies in the resulting parameter ranking. Fortunately, the memory consumption is negligible compared to |
|
the model weights itself. |
|
|
|
See Also: |
|
- `ParametrizedModel` |
|
- `ACIPModelConfig` |
|
""" |
|
|
|
config_class = ACIPModelConfig |
|
|
|
def __init__(self, config: ACIPModelConfig, base_model: PreTrainedModel | None = None, **_: Any): |
|
super().__init__(config, base_model) |
|
self.config = config |
|
|
|
self._score_map: dict[str, torch.Tensor] | None = None |
|
|
|
|
|
self._init_score_map_buffers() |
|
|
|
def _init_score_map_buffers(self): |
|
""" |
|
Register and initialize score map buffers in parametrized modules (with random numbers). |
|
Each target parameter "p_name" is associated with a buffer "p_name_score" that stores its score vector. |
|
""" |
|
for m_name, module in self.parametrized_modules.items(): |
|
for p_name, param in module.parametrization.get_target_params().items(): |
|
module.parametrization.register_buffer(p_name + "_score", torch.ones_like(param.data).float()) |
|
|
|
def _update_score_map(self): |
|
"""Render `score_map` from the parametrized modules' score buffers.""" |
|
self._score_map = {} |
|
for m_name, module in self.parametrized_modules.items(): |
|
for p_name in module.parametrization.get_target_params().keys(): |
|
self._score_map[f"{m_name}.parametrization.{p_name}"] = module.parametrization.get_buffer( |
|
p_name + "_score" |
|
) |
|
|
|
@property |
|
def score_map(self) -> dict[str, torch.Tensor]: |
|
"""Returns the score map as Tensor dictionary whose keys match those of `self.get_target_params`.""" |
|
if self._score_map is None: |
|
self._update_score_map() |
|
return self._score_map |
|
|
|
@score_map.setter |
|
def score_map(self, score_map: dict[str, torch.Tensor]) -> None: |
|
""" |
|
Updates `score_map` and the corresponding parametrized modules' score buffers. |
|
|
|
Args: |
|
score_map: Dictionary whose keys should match (a subset of) `self.get_target_params`. |
|
""" |
|
if self._score_map is None: |
|
self._update_score_map() |
|
|
|
for p_name, score in score_map.items(): |
|
buffer = self.model.get_buffer(p_name + "_score") |
|
if buffer.shape != score.shape: |
|
raise ValueError( |
|
f"Score map for '{p_name}' has incorrect shape: expected {buffer.shape}, got {score.shape}" |
|
) |
|
|
|
buffer.copy_(score.detach().float()) |
|
self._score_map[p_name] = buffer |
|
|
|
def _predict_size_ratio_by_score(self, k: int, full: bool = False) -> tuple[float, dict[str, torch.Tensor]]: |
|
""" |
|
Helper function that checks what would happen if the k smallest target parameters are pruned |
|
according to the global score map ranking. It returns the resulting size ratio |
|
and the corresponding parameter masks. |
|
|
|
Args: |
|
k: Number of target parameters to prune. |
|
full: Whether to count the number of parameters of the entire model or only the parametrized modules. |
|
See also `ParametrizedModel.get_num_params`. |
|
|
|
Returns: Tuple of size ratio and parameter masks. The masks indicate which parameters to keep. |
|
""" |
|
|
|
score_map_cat = torch.cat([param.flatten() for param in self.score_map.values()]) |
|
threshold = torch.kthvalue(score_map_cat, k).values.item() |
|
|
|
|
|
param_masks = {} |
|
for p_name, score in self.score_map.items(): |
|
param_masks[p_name] = (score > threshold).to(dtype=score.dtype) |
|
|
|
|
|
size_ratio = self.get_size_ratio(full=full, target_params=param_masks) |
|
return size_ratio, param_masks |
|
|
|
def _get_param_masks(self, size_ratio: float, full: bool = False) -> dict[str, torch.Tensor]: |
|
""" |
|
Helper function that determines which parameters to keep to reach a target size ratio. |
|
Instead of looping over `k -> _predict_size_ratio_by_score(k)`, a binary search can be used because |
|
the size ratio is monotonically increasing in k. |
|
|
|
Args: |
|
size_ratio: Target size ratio. |
|
full: Whether to count the number of parameters of the entire model or only the parametrized modules. |
|
See also `ParametrizedModel.get_num_params`. |
|
|
|
Returns: Parameter masks indicating which parameters to keep to reach the target size ratio. |
|
""" |
|
if size_ratio == 1.0: |
|
return {p_name: torch.ones_like(score) for p_name, score in self.score_map.items()} |
|
|
|
|
|
|
|
k_lo, k_hi = 1, sum(score.numel() for score in self.score_map.values()) |
|
while k_lo < k_hi: |
|
k_mid = (k_lo + k_hi + 1) // 2 |
|
ratio, _ = self._predict_size_ratio_by_score(k=k_mid, full=full) |
|
if ratio > size_ratio: |
|
k_lo = k_mid |
|
else: |
|
k_hi = k_mid - 1 |
|
k = k_lo |
|
|
|
return self._predict_size_ratio_by_score(k=k, full=full)[1] |
|
|
|
def prune_model_by_score( |
|
self, |
|
size_ratio: float | None = None, |
|
compression_rate: float | None = None, |
|
full: bool = False, |
|
) -> None: |
|
""" |
|
This method prunes the target parameters of the model according to their scores to achieve |
|
a given size ratio. |
|
|
|
This can be efficiently implemented by a simple binary search strategy: |
|
We find the smallest number of parameters to be pruned according to the score map ranking |
|
such that the resulting size ratio is at least the target `size_ratio`. |
|
|
|
Args: |
|
size_ratio: The target size ratio, which is the ratio between the size of the compressed model and |
|
the original model (where size is measured in number of parameters). |
|
If not provided, `compression_rate` must be provided. |
|
compression_rate: This is a convenience parameter that allows you to set the target compression rate |
|
instead of `size_ratio`. It is equivalent to `size_ratio = 1.0 - compression_rate`. |
|
If both `size_ratio` and `compression_rate` are provided, `size_ratio` is used. |
|
full: Whether to count the number of parameters of the entire model or only the parametrized modules. |
|
See also `ParametrizedModel.get_num_params`. |
|
""" |
|
if size_ratio is None and compression_rate is None: |
|
raise ValueError("Either `size_ratio` or `compression_rate` must be provided.") |
|
elif size_ratio is None and compression_rate is not None: |
|
size_ratio = 1.0 - compression_rate |
|
else: |
|
logger.warning("Both `size_ratio` and `compression_rate` are provided. Using `size_ratio`.") |
|
|
|
param_masks = self._get_param_masks(size_ratio=size_ratio, full=full) |
|
|
|
|
|
for p_name, param in self.get_target_params().items(): |
|
param.data[param_masks[p_name] > 0.0] = 1.0 |
|
param.data[param_masks[p_name] == 0.0] = 0.0 |
|
for m_name, module in self.parametrized_modules.items(): |
|
if any(p_name.startswith(m_name) for p_name in param_masks.keys()): |
|
module.parametrization.reset_target_params(mode="nonzero") |
|
|
|
|
|
|
|
|
|
ACIPModelConfig.register_for_auto_class() |
|
ACIPModel.register_for_auto_class("AutoModel") |
|
|