|
from importlib import import_module |
|
from typing import Tuple |
|
|
|
import torch |
|
import transformers |
|
from torch import nn |
|
from torch.nn import functional as F |
|
|
|
__all__ = ["patch"] |
|
|
|
|
|
def _get_unpad_data(attention_mask: torch.Tensor, *args, **kwargs) -> Tuple[torch.Tensor, torch.Tensor, int]: |
|
if hasattr(_get_unpad_data, "seqlens_in_batch"): |
|
seqlens_in_batch = _get_unpad_data.seqlens_in_batch |
|
else: |
|
seqlens_in_batch = torch.sum(attention_mask, dim=1) |
|
|
|
indices = torch.nonzero(attention_mask.flatten(), as_tuple=False).flatten() |
|
max_seqlen_in_batch = seqlens_in_batch.max().item() |
|
cu_seqlens = F.pad(torch.cumsum(seqlens_in_batch, dim=0, dtype=torch.int32), (1, 0)) |
|
return indices, cu_seqlens, max_seqlen_in_batch |
|
|
|
|
|
def set_seqlens_in_batch(seqlens_in_batch: torch.Tensor) -> None: |
|
_get_unpad_data.seqlens_in_batch = seqlens_in_batch |
|
|
|
|
|
def patch(model: nn.Module) -> None: |
|
if transformers.__version__ < "4.43.0": |
|
m = import_module(model.__module__) |
|
if not hasattr(m, "_get_unpad_data"): |
|
raise ValueError(f"Module {m} does not have function '_get_unpad_data' for packing") |
|
m._get_unpad_data = _get_unpad_data |
|
else: |
|
transformers.modeling_flash_attention_utils._get_unpad_data = _get_unpad_data |
|
|