""" Version: 5th_pruned_optimized_transcription_app.py (alias HF_modded_nb-whisper_T4) Description: webapp, transkribering (norsk), NbAiLab/nb-whisper-large, oppsummering, pdf-download. """ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import time import os import re import warnings from pydub import AudioSegment import pandas as pd import numpy as np import torch import torchaudio import torchaudio.transforms as transforms from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq from ...generation.configuration_utils import GenerationConfig import spacy import networkx as nx from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from transformers import AutoTokenizer, AutoModelForSeq2SeqLM import gradio as gr from fpdf import FPDF from PIL import Image # from huggingface_hub import model_info #############################################################################################################################################3 # Suppress warnings warnings.filterwarnings("ignore") """ def generate( self, input_features: Optional[torch.Tensor] = None, # <====================== ACTIVE generation_config: Optional[GenerationConfig] = None, # <====================== could be ACTIVE(ed.)* logits_processor: Optional[LogitsProcessorList] = None, stopping_criteria: Optional[StoppingCriteriaList] = None, prefix_allowed_tokens_fn: Optional[Callable[[int, torch.Tensor], List[int]]] = None, synced_gpus: bool = False, return_timestamps: Optional[bool] = None, task: Optional[str] = None, language: Optional[Union[str, List[str]]] = None, # <====================== ACTIVE is_multilingual: Optional[bool] = None, prompt_ids: Optional[torch.Tensor] = None, prompt_condition_type: Optional[str] = None, # first-segment, all-segments condition_on_prev_tokens: Optional[bool] = None, temperature: Optional[Union[float, Tuple[float, ...]]] = None, compression_ratio_threshold: Optional[float] = None, logprob_threshold: Optional[float] = None, no_speech_threshold: Optional[float] = None, num_segment_frames: Optional[int] = None, attention_mask: Optional[torch.Tensor] = None, # <====================== NOT ACTIVE by DEFAULT time_precision: float = 0.02, return_token_timestamps: Optional[bool] = None, return_segments: bool = False, return_dict_in_generate: Optional[bool] = None, **kwargs, # <====================== ACTIVE ): """ """ *generation_config (`~generation.GenerationConfig`, *optional*): The generation configuration to be used as base parametrization for the generation call. `**kwargs` passed to generate matching the attributes of `generation_config` will override them. If `generation_config` is not provided, the default will be used, which had the following loading priority: 1) from the `generation_config.json` model file, if it exists; 2) from the model configuration. Please note that unspecified parameters will inherit [`~generation.GenerationConfig`]'s default values, whose documentation should be checked to parameterize generation. from v4.39 the forced decoder ids are always None in favour of decoder input ids generation_config.forced_decoder_ids = None """ """ Example: - *Longform transcription*: To transcribe or translate audios longer than 30 seconds, process the audio files without truncation and pass all mel features at once to generate. ```python >>> import torch >>> from transformers import AutoProcessor, WhisperForConditionalGeneration >>> from datasets import load_dataset, Audio >>> processor = AutoProcessor.from_pretrained("openai/whisper-tiny.en") >>> model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-tiny.en") >>> model.cuda() # doctest: +IGNORE_RESULT >>> # load audios > 30 seconds >>> ds = load_dataset("distil-whisper/meanwhile", "default")["test"] >>> # resample to 16kHz >>> ds = ds.cast_column("audio", Audio(sampling_rate=16000)) >>> # take first 8 audios and retrieve array >>> audio = ds[:8]["audio"] >>> audio = [x["array"] for x in audio] >>> # make sure to NOT truncate the input audio, to return the `attention_mask` and to pad to the longest audio >>> inputs = processor(audio, return_tensors="pt", truncation=False, padding="longest", return_attention_mask=True, sampling_rate=16_000) >>> inputs = inputs.to("cuda", torch.float32) >>> # transcribe audio to ids >>> generated_ids = model.generate(**inputs) >>> transcription = processor.batch_decode(generated_ids, skip_special_tokens=True) >>> transcription[0] " Folks, if you watch the show, you know, I spent a lot of time (..)" """ # Convert m4a audio to wav format def convert_to_wav(audio_file): audio = AudioSegment.from_file(audio_file, format="m4a") wav_file = "temp.wav" audio.export(wav_file, format="wav") return wav_file #############################################################################################################################################3 # # # # #--------------------------------------------------------------------------------------------------------------------------------------------- processor = AutoProcessor.from_pretrained("NbAiLab/nb-whisper-large-verbatim") model = AutoModelForSpeechSeq2Seq.from_pretrained("NbAiLab/nb-whisper-large-verbatim") model.cuda() # device = 0 if torch.cuda.is_available() else "cpu" # 0. deprecate old inputs if "inputs" in kwargs: input_features = kwargs.pop("inputs") warnings.warn( "The input name `inputs` is deprecated. Please make sure to use `input_features` instead.", FutureWarning, ) """ # 1. prepare generation config generation_config, kwargs = self._prepare_generation_config(generation_config, **kwargs) # 2. set global generate variables #input_stride = self.model.encoder.conv1.stride[0] * self.model.encoder.conv2.stride[0] #num_segment_frames = input_stride * self.config.max_source_positions #batch_size, total_input_frames = self._retrieve_total_input_frames( input_features=input_features, kwargs=kwargs #input_stride=input_stride, ) """ generate_kwargs = { "num_beams": 5, "language": "no", "task": "transcribe", "forced_decoder_ids": None # ALT. generation_config.forced_decoder_ids = None } def transcribe_audio(audio_file, chunk_length_s=30): #--------------------------------------------------------------------------------------------------------------------------------------------- # # # # #############################################################################################################################################3 if audio_file.endswith(".m4a"): audio_file = convert_to_wav(audio_file) start_time = time.time() # Load waveform using torchaudio waveform, sample_rate = torchaudio.load(audio_file) # Convert to mono if the audio has more than one channel if waveform.shape[0] > 1: waveform = torch.mean(waveform, dim=0, keepdim=True) if sample_rate != 16000: resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000) waveform = resampler(waveform) sample_rate = 16000 # Calculate the number of chunks chunk_size = chunk_length_s * sample_rate num_chunks = waveform.shape[1] // chunk_size + int(waveform.shape[1] % chunk_size != 0) # Initialize empty list@store transcribed text from ea.chunk full_text = [] for i in range(num_chunks): start = i * chunk_size end = min((i + 1) * chunk_size, waveform.shape[1]) chunk_waveform = waveform[:, start:end] # Check chunk waveform is properly shaped if chunk_waveform.shape[0] > 1: chunk_waveform = torch.mean(chunk_waveform, dim=0, keepdim=True) #############################################################################################################################################3 # # # # #--------------------------------------------------------------------------------------------------------------------------------------------- # make sure to NOT truncate the input audio, to return the `attention_mask` and to pad to the longest audio inputs = processor(chunk_waveform.squeeze(0).numpy(), sampling_rate=sample_rate, return_tensors="pt", truncation=False, padding="longest", return_attention_mask=True) inputs = inputs.to("cuda", torch.float32) input_features = inputs.input_features # transcribe audio to ids generated_ids = model.generate(inputs=input_features,**generate_kwargs) # transcription chunk_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0] #--------------------------------------------------------------------------------------------------------------------------------------------- # # # # #############################################################################################################################################3 full_text.append(chunk_text) # Combine the transcribed text from all chunks text = " ".join(full_text) output_time = time.time() - start_time # Audio duration (in seconds) audio_duration = waveform.shape[1] / sample_rate # Real-time Factor (RTF) rtf = output_time / audio_duration # Format of the result result = ( f"Time taken: {output_time:.2f} seconds\n" f"Audio duration: {audio_duration / 60:.2f} minutes ({audio_duration:.2f} seconds)\n" f"Real-time Factor (RTF): {rtf:.2f}\n" f"Number of words: {len(text.split())}\n\n" "Real-time Factor (RTF) is a measure used to evaluate the speed of speech recognition systems. " "It is the ratio of transcription time to the duration of the audio.\n\n" "An RTF of less than 1 means the transcription process is faster than real-time (expected)." ) #############################################################################################################################################3 # # # # #--------------------------------------------------------------------------------------------------------------------------------------------- return text, result #--------------------------------------------------------------------------------------------------------------------------------------------- # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Clean and preprocess/@summarization def clean_text(text): text = re.sub(r'https?:\/\/.*[\r\n]*', '', text) text = re.sub(r'[^\w\s]', '', text) text = re.sub(r'\s+', ' ', text).strip() return text nlp = spacy.blank("nb") # 'nb' ==> codename = Norwegian Bokmål spacy_stop_words = spacy.lang.nb.stop_words.STOP_WORDS def preprocess_text(text): # Process the text with SpaCy doc = nlp(text) # SpaCy's stop top wrds direct stop_words = spacy_stop_words # Filter out stop words words = [token.text for token in doc if token.text.lower() not in stop_words] return ' '.join(words) # Summarize w/T5 model def summarize_text(text): preprocessed_text = preprocess_text(text) inputs = summarization_tokenizer(preprocessed_text, max_length=1024, return_tensors="pt", truncation=True) inputs = inputs.to(device) summary_ids = summarization_model.generate(inputs.input_ids, num_beams=5, max_length=150, early_stopping=True) return summarization_tokenizer.decode(summary_ids[0], skip_special_tokens=True) requires updating the pre-trained model weights to match # Builds similarity matrix def build_similarity_matrix(sentences, stop_words): similarity_matrix = nx.Graph() for i, tokens_a in enumerate(sentences): for j, tokens_b in enumerate(sentences): if i != j: common_words = set(tokens_a) & set(tokens_b) similarity_matrix.add_edge(i, j, weight=len(common_words)) return similarity_matrix # "Graph-based summarization" =====> def graph_based_summary(text, num_paragraphs=3): doc = nlp(text) sentences = [sent.text for sent in doc.sents] if len(sentences) < num_paragraphs: return sentences sentence_tokens = [nlp(sent) for sent in sentences] stop_words = spacy_stop_words filtered_tokens = [[token.text for token in tokens if token.text.lower() not in stop_words] for tokens in sentence_tokens] similarity_matrix = build_similarity_matrix(filtered_tokens, stop_words) scores = nx.pagerank(similarity_matrix) ranked_sentences = sorted(((scores[i], sent) for i, sent in enumerate(sentences)), reverse=True) return ' '.join([sent for _, sent in ranked_sentences[:num_paragraphs]]) # LexRank def lex_rank_summary(text, num_paragraphs=3, threshold=0.1): doc = nlp(text) sentences = [sent.text for sent in doc.sents] if len(sentences) < num_paragraphs: return sentences stop_words = spacy_stop_words vectorizer = TfidfVectorizer(stop_words=list(stop_words)) X = vectorizer.fit_transform(sentences) similarity_matrix = cosine_similarity(X, X) # Apply threshold@similarity matrix similarity_matrix[similarity_matrix < threshold] = 0 nx_graph = nx.from_numpy_array(similarity_matrix) scores = nx.pagerank(nx_graph) ranked_sentences = sorted(((scores[i], s) for i, s in enumerate(sentences)), reverse=True) return ' '.join([ranked_sentences[i][1] for i in range(num_paragraphs)]) # TextRank def text_rank_summary(text, num_paragraphs=3): doc = nlp(text) sentences = [sent.text for sent in doc.sents] if len(sentences) < num_paragraphs: return sentences stop_words = spacy_stop_words vectorizer = TfidfVectorizer(stop_words=list(stop_words)) X = vectorizer.fit_transform(sentences) similarity_matrix = cosine_similarity(X, X) nx_graph = nx.from_numpy_array(similarity_matrix) scores = nx.pagerank(nx_graph) ranked_sentences = sorted(((scores[i], s) for i, s in enumerate(sentences)), reverse=True) return ' '.join([ranked_sentences[i][1] for i in range(num_paragraphs)]) # Save text+summary/PDF def save_to_pdf(text, summary): pdf = FPDF() pdf.add_page() pdf.set_font("Arial", size=12) if text: pdf.multi_cell(0, 10, "Text:\n" + text) pdf.ln(10) # Paragraph space if summary: pdf.multi_cell(0, 10, "Summary:\n" + summary) pdf_output_path = "transcription.pdf" pdf.output(pdf_output_path) return pdf_output_path iface = gr.Blocks() PLACEHOLDER = """

Switch Work | Verktæysett no.1

En webapp for transkribering av lydfiler til norsk skrift. Språkmodell: NbAiLab/nb-whisper-large, Ekstra: oppsummering, pdf-download

""" with iface: #gr.HTML('