Spaces:
Running
on
Zero
Running
on
Zero
from transformers import AutoModelForCausalLM, AutoTokenizer | |
import torch | |
import torch.nn as nn | |
import os | |
import warnings | |
from typing import Optional, Union, List, Tuple | |
from transformers import ( | |
AutoTokenizer, | |
AutoModel, | |
AutoModelForCausalLM, | |
AutoConfig, | |
BitsAndBytesConfig, | |
PretrainedConfig, | |
PreTrainedModel, | |
LlamaConfig, | |
LlamaModel, | |
) | |
from transformers.modeling_outputs import CausalLMOutputWithPast | |
from transformers import PretrainedConfig | |
from .llava_arch import LlavaMetaModel, LlavaMetaForCausalLM | |
from .language_model.llava_llama import LlavaLlamaConfig | |
# TODO: we may move LlavaConfig to configuration_llava.py | |
# from model.configuration_llava import LlavaConfig | |
class LlavaLlamaModel(LlavaMetaModel, LlavaMetaForCausalLM, PreTrainedModel): | |
config_class = LlavaLlamaConfig | |
main_input_name = "input_embeds" | |
supports_gradient_checkpointing = True | |
def __init__(self, config: LlavaLlamaConfig = None, *args, **kwargs) -> None: | |
super().__init__(config) | |
self.init_vlm(config=config, *args, **kwargs) | |
def from_pretrained( | |
cls, | |
pretrained_model_name_or_path: Optional[Union[str, os.PathLike]], | |
*model_args, | |
config: Optional[Union[PretrainedConfig, str, os.PathLike]] = None, | |
cache_dir: Optional[Union[str, os.PathLike]] = None, | |
ignore_mismatched_sizes: bool = False, | |
force_download: bool = False, | |
local_files_only: bool = False, | |
token: Optional[Union[str, bool]] = None, | |
revision: str = "main", | |
use_safetensors: bool = None, | |
**kwargs, | |
): | |
if hasattr(cls, "load_pretrained"): | |
return cls.load_pretrained(pretrained_model_name_or_path, | |
*model_args, config=config, cache_dir=cache_dir, ignore_mismatched_sizes=ignore_mismatched_sizes, force_download=force_download, local_files_only=local_files_only, token=token, | |
revision=revision, use_safetensors=use_safetensors, **kwargs | |
) | |
return super(LlavaLlamaModel).from_pretrained(pretrained_model_name_or_path, | |
*model_args, config=config, cache_dir=cache_dir, ignore_mismatched_sizes=ignore_mismatched_sizes, force_download=force_download, local_files_only=local_files_only, token=token, | |
revision=revision, use_safetensors=use_safetensors, **kwargs) | |
def forward( | |
self, | |
input_ids: torch.LongTensor = None, | |
images: Optional[torch.FloatTensor] = None, | |
attention_mask: Optional[torch.Tensor] = None, | |
position_ids: Optional[torch.LongTensor] = None, | |
past_key_values: Optional[List[torch.FloatTensor]] = None, | |
inputs_embeds: Optional[torch.FloatTensor] = None, | |
labels: Optional[torch.LongTensor] = None, | |
use_cache: Optional[bool] = None, | |
output_attentions: Optional[bool] = None, | |
output_hidden_states: Optional[bool] = None, | |
return_dict: Optional[bool] = None, | |
) -> Union[Tuple, CausalLMOutputWithPast]: | |
self.freezed_module_patch() | |
if inputs_embeds is None: | |
( | |
input_ids, | |
position_ids, | |
attention_mask, | |
past_key_values, | |
inputs_embeds, | |
labels, | |
) = self.prepare_inputs_labels_for_multimodal( | |
input_ids, position_ids, attention_mask, past_key_values, labels, images | |
) | |
# Note (kentang-mit@): we have a unit test for this function. | |
if self.training: | |
( | |
_, | |
new_position_ids, | |
new_attention_mask, | |
_, | |
new_inputs_embeds, | |
new_labels, | |
sorted_seqlens_in_batch, | |
) = self.repack_multimodal_data( | |
input_ids, | |
position_ids, | |
attention_mask, | |
past_key_values, | |
inputs_embeds, | |
labels, | |
) | |
new_input_ids = None | |
past_key_values = None | |
else: | |
new_attention_mask = attention_mask | |
new_position_ids = position_ids | |
new_inputs_embeds = inputs_embeds | |
new_labels = labels | |
sorted_seqlens_in_batch = attention_mask.sum(-1).int() | |
new_input_ids = input_ids | |
outputs = self.llm.forward( | |
input_ids=new_input_ids, | |
attention_mask=new_attention_mask, | |
position_ids=new_position_ids, | |
past_key_values=past_key_values, | |
inputs_embeds=new_inputs_embeds, | |
labels=new_labels, | |
use_cache=use_cache, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
seqlens_in_batch=sorted_seqlens_in_batch, | |
) | |
return outputs | |
def generate( | |
self, | |
input_ids: Optional[torch.FloatTensor] = None, | |
images: Optional[torch.FloatTensor] = None, | |
attention_mask: Optional[torch.LongTensor] = None, | |
**generation_kwargs, | |
): | |
if images is not None: | |
( | |
_, | |
_, | |
attention_mask, | |
_, | |
inputs_embeds, | |
_, | |
) = self.prepare_inputs_labels_for_multimodal( | |
input_ids, None, attention_mask, None, None, images | |
) | |
else: | |
inputs_embeds = self.get_input_embeddings()(input_ids) | |
inputs_embeds = inputs_embeds.to(self.dtype) | |
outputs = self.llm.generate( | |
inputs_embeds=inputs_embeds, | |
attention_mask=attention_mask, | |
**generation_kwargs | |
) | |
return outputs | |
def disable_torch_init(): | |
""" | |
Disable the redundant torch default initialization to accelerate model creation. | |
""" | |
import torch | |
setattr(torch.nn.Linear, "reset_parameters", lambda self: None) | |
setattr(torch.nn.LayerNorm, "reset_parameters", lambda self: None) | |
def load_pretrained_model( | |
model_path, | |
model_name, | |
model_base=None, | |
load_8bit=False, | |
load_4bit=False, | |
device_map="auto", | |
device="cuda", | |
**kwargs, | |
): | |
kwargs = {"device_map": device_map, **kwargs} | |
if device != "cuda": | |
kwargs["device_map"] = {"": device} | |
if load_8bit: | |
kwargs["load_in_8bit"] = True | |
elif load_4bit: | |
kwargs["load_in_4bit"] = True | |
kwargs["quantization_config"] = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_compute_dtype=torch.float16, | |
bnb_4bit_use_double_quant=True, | |
bnb_4bit_quant_type="nf4", | |
) | |
else: | |
kwargs["torch_dtype"] = torch.float16 | |
config = AutoConfig.from_pretrained(model_path) | |
config.resume_path = model_path | |
prepare_config_for_eval(config, kwargs) | |
model = LlavaLlamaModel( | |
config=config, | |
low_cpu_mem_usage=True, | |
**kwargs | |
) | |
tokenizer = model.tokenizer | |
model.eval() | |
# mm_use_im_start_end = getattr( | |
# model.config, "mm_use_im_start_end", False) | |
# mm_use_im_patch_token = getattr( | |
# model.config, "mm_use_im_patch_token", True) | |
# if mm_use_im_patch_token: | |
# tokenizer.add_tokens( | |
# [DEFAULT_IMAGE_PATCH_TOKEN], special_tokens=True) | |
# if mm_use_im_start_end: | |
# tokenizer.add_tokens( | |
# [DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN], special_tokens=True | |
# ) | |
model.resize_token_embeddings(len(tokenizer)) | |
vision_tower = model.get_vision_tower() | |
vision_tower.to(device=device, dtype=torch.float16) | |
mm_projector = model.get_mm_projector() | |
mm_projector.to(device=device, dtype=torch.float16) | |
context_provider = model.get_context_provider() | |
if context_provider is not None: | |
context_provider.to(device=device, dtype=torch.float16) | |
image_processor = vision_tower.image_processor | |
if hasattr(model.llm.config, "max_sequence_length"): | |
context_len = model.config.max_sequence_length | |
else: | |
context_len = 2048 | |
return tokenizer, model, image_processor, context_len | |
def parse_model_name_or_path(config: PretrainedConfig, model_name="llm", suffix="_cfg"): | |
target_model = f"{model_name}{suffix}" | |
target_cfg = getattr(config, target_model, None) | |
if isinstance(target_cfg, str): | |
return target_cfg | |
elif isinstance(target_cfg, dict): | |
return target_cfg["architectures"][0] | |
else: | |
raise ValueError(f"Invalid {target_model} configuration!") | |
def prepare_config_for_eval(config: PretrainedConfig, kwargs: dict): | |
try: | |
# compatible with deprecated config convention | |
if getattr(config, "vision_tower_cfg", None) is None: | |
config.vision_tower_cfg = config.mm_vision_tower | |
except AttributeError: | |
raise ValueError( | |
f"Invalid configuration! Cannot find vision_tower in config:\n{config}") | |
config.model_dtype = kwargs.pop("torch_dtype").__str__() | |
# siglip does not support device_map = "auto" | |
vision_tower_name = parse_model_name_or_path(config, "vision_tower") | |
if "siglip" in vision_tower_name.lower(): | |
kwargs["device_map"] = "cuda" | |
AutoConfig.register("llava_llama", LlavaLlamaConfig) | |
AutoModel.register(LlavaLlamaConfig, LlavaLlamaModel) | |