import os import re import numpy as np import tensorflow from keras.callbacks import Callback, ReduceLROnPlateau from tensorflow.keras.preprocessing.text import Tokenizer from tensorflow.keras.preprocessing.sequence import pad_sequences from tensorflow.keras.layers import Input, LSTM, Dense, Embedding, Dropout, Flatten from tensorflow.keras.regularizers import l2 from tensorflow.keras.models import Model, load_model, model_from_json from tensorflow.keras.optimizers import Adam import matplotlib.pyplot as plt import logging import heapq import pickle import time import json import pdb tensorflow.keras.mixed_precision.set_global_policy('mixed_float16') class BeamSearchHelper: def __init__(self, model, tokenizer, max_seq_length, encoder_filename, decoder_filename, top_k=5, temperature=1.0, top_p=0.9, beam_width=3, scaling_factor=10, min_word=3): self.model = model self.tokenizer = tokenizer self.max_seq_length = max_seq_length self.top_k = top_k self.encoder_filename = encoder_filename self.decoder_filename = decoder_filename self.temperature = temperature self.scaling_factor = scaling_factor self.top_p = top_p self.beam_width = beam_width self.min_word = min_word self.logger = self.setup_logger() def setup_logger(self): logger = logging.getLogger("ChatbotBeamSearch") logger.setLevel(logging.DEBUG) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_formatter = logging.Formatter('%(levelname)s: %(message)s') console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) file_handler = logging.FileHandler("chatbotBeam.log") file_handler.setLevel(logging.DEBUG) file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) return logger def beam_search(self, input_text): # Load encoder and decoder models encoder_model = load_model(self.encoder_filename) decoder_model = load_model(self.decoder_filename) # Preprocess input input_seqs = self.tokenizer.texts_to_sequences([input_text]) input_seqs = pad_sequences(input_seqs, maxlen=self.max_seq_length, padding='post') # Encode input sequence encoder_states = encoder_model.predict(input_seqs) state_h, state_c = encoder_states # Ensure batch size of 1 state_h = state_h[0:1, :] state_c = state_c[0:1, :] # Initialize decoder with token start_token_index = self.tokenizer.word_index.get('', 1) target_seq = np.zeros((1, 1)) target_seq[0, 0] = start_token_index # Initialize beam search candidates sequences = [(target_seq, state_h, state_c, 0.0, [])] # (seq, h, c, score, decoded_words) for _ in range(self.max_seq_length): all_candidates = [] for seq, state_h, state_c, score, decoded_words in sequences: # Predict the next token output_tokens, state_h, state_c = decoder_model.predict([seq, state_h, state_c]) logits = output_tokens[0, -1, :] * self.scaling_factor logits = logits / self.temperature exp_logits = np.exp(logits - np.max(logits)) # Prevent overflow probabilities = exp_logits / np.sum(exp_logits) # Get the top beam_width candidate indices top_indices = np.argsort(probabilities)[-self.beam_width:] for idx in top_indices: prob = probabilities[idx] candidate_score = (score - np.log(prob + 1e-8)) / (len(decoded_words) + 1) # Normalize by length # Append predicted token new_decoded_words = decoded_words + [idx] new_seq = np.copy(seq) new_seq[0, 0] = idx # Set new token in sequence # Enforce min_word before stopping at if idx == self.tokenizer.word_index.get('', -1): if len(new_decoded_words) < self.min_word: continue # Ignore if min_word isn't reached else: return " ".join(self.tokenizer.index_word[i] for i in new_decoded_words if i in self.tokenizer.index_word) # Add to candidate list all_candidates.append((new_seq, state_h, state_c, candidate_score, new_decoded_words)) # Select best beam_width sequences if not all_candidates: # If no valid candidates, exit early break sequences = sorted(all_candidates, key=lambda x: x[3])[:self.beam_width] # Convert token indices back to words best_sequence = sequences[0][4] # Get best decoded words return " ".join(self.tokenizer.index_word[idx] for idx in best_sequence if idx in self.tokenizer.index_word) class BeamState: def __init__(self, sequence, score, state, logger): self.sequence = sequence self.score = score self.state = state self.logger = logger def __lt__(self, other): return self.score < other.score def log(self, message): self.logger.debug(message) class MonitorEarlyStopping(Callback): def __init__(self, monitor='val_loss', patience=3, mode='min', restore_best_weights=True, verbose=1): super(MonitorEarlyStopping, self).__init__() self.monitor = monitor self.patience = patience self.mode = mode self.restore_best_weights = restore_best_weights self.verbose = verbose self.best_weights = None self.best_epoch = None self.wait = 0 self.best_value = float('inf') if mode == 'min' else -float('inf') self.stopped_epoch_list = [] # List to track stopped epochs def on_epoch_end(self, epoch, logs=None): current_value = logs.get(self.monitor) if current_value is None: if self.verbose > 0: print(f"Warning: Metric '{self.monitor}' is not available in logs.") return # Check for improvement based on mode if (self.mode == 'min' and current_value < self.best_value) or (self.mode == 'max' and current_value > self.best_value): self.best_value = current_value self.best_weights = self.model.get_weights() self.best_epoch = epoch self.wait = 0 if self.verbose > 0: print(f"Epoch {epoch + 1}: {self.monitor} improved to {self.best_value:.4f}") else: self.wait += 1 if self.verbose > 0: print(f"Epoch {epoch + 1}: {self.monitor} did not improve. Patience: {self.wait}/{self.patience}") self.stopped_epoch_list.append(epoch + 1) # Stop training if patience is exceeded if self.wait >= self.patience: if self.verbose > 0: print(f"Stopping early at epoch {epoch + 1}. Best {self.monitor}: {self.best_value:.4f} at epoch {self.best_epoch + 1}") self.model.stop_training = True if self.restore_best_weights: if self.verbose > 0: print(f"Restoring best model weights from epoch {self.best_epoch + 1}.") self.model.set_weights(self.best_weights) class ChatbotTrainer: def __init__(self): # Corpus Setup self.corpus = None self.all_vocab_size = 0 # Model Setup self.model = None self.name = "Alex" self.model_filename = f"{self.name}_model.keras" self. encoder_filename = "encoder.keras" self.decoder_filename = "decoder.keras" self.tokenizer_save_path = "chatBotTokenizer.pkl" self.tokenizer = None self.reverse_tokenizer = None self.embedding_dim = 64 self.max_seq_length = 64 self.learning_rate = 0.0013 self.optimizer = Adam(learning_rate=self.learning_rate, clipnorm=1.0) self.batch_size = 16 self.epochs = 30 self.early_patience = self.epochs // 2 self.lstm_units = 128 self.dropout = 0.1 self.recurrent_dropout = 0.1 self.test_size = 0.2 self.max_vocabulary = 69000 # Model but instantiated here but filled later self.encoder_model = None self.encoder_inputs = None self.decoder_inputs = None self.decoder_outputs = None self.decoder_model = None self.max_vocab_size = None self.config = None # Training Setup self.vocabularyList = [] self.troubleList = [] self.running_trouble = [] # Prediction Setup (Everything here will take priority) self.min_word = 10 # Only for generate_response self.temperature = 1 self.scaling_factor = 1 self.logger = self.setup_logger() # Initialize your logger here self.beam_width = 9 self.top_p = 0.7 self.top_k = 3 # Log Metrics... self.logger.info(f"""Metrics:\n Embedding/MaxSeqLength:({self.embedding_dim}, {self.max_seq_length})\n Batch Size: {self.batch_size}\n LSTM Units: {self.lstm_units}\n Epochs: {self.epochs}\n Dropout: ({self.dropout}, {self.recurrent_dropout})\n Test Split: {self.test_size}\n\n""") # Tokenizer setup & propagation if os.path.exists(self.tokenizer_save_path): with open(self.tokenizer_save_path, 'rb') as tokenizer_load_file: self.tokenizer = pickle.load(tokenizer_load_file) self.reverse_tokenizer = {index: word for word, index in self.tokenizer.word_index.items()} self.all_vocab_size = self.tokenizer.num_words for words, i in self.tokenizer.word_index.items(): if words not in self.vocabularyList: self.vocabularyList.append(words) self.logger.info("Tokenizer loaded successfully.") # print(f"Number of words in loaded tokenizer: {len(self.tokenizer.word_index)}") # print(f"Number of words in the Vocab List: {len(self.vocabularyList)}") else: self.logger.warning("Tokenizer not found, making now... ") self.tokenizer = Tokenizer(num_words=None, filters='!"#$%&()*+,-/.:;=?@[\\]^_`{|}~\t\n') # Save '', '', and '' to word index self.tokenizer.num_words = 0 self.vocabularyList = ['', ''] for token in self.vocabularyList: if token not in self.tokenizer.word_index: self.tokenizer.word_index[token] = self.tokenizer.num_words self.tokenizer.index_word[self.tokenizer.num_words] = token self.all_vocab_size += 1 self.tokenizer.num_words += 1 # Set Tokenizer Values: self.tokenizer.num_words = len(self.tokenizer.word_index) self.tokenizer.oov_token = "" self.logger.info(f"New Tokenizer Index's: {self.tokenizer.word_index}") # Debug Lines # for token in ['', '', '']: # print(f"Index of {token}: {self.tokenizer.word_index.get(token)}") # Debug Line # print(list(self.tokenizer.word_index.keys())) if os.path.exists(self.model_filename) and os.path.exists(self.encoder_filename) and os.path.exists(self.decoder_filename): self.model, self.encoder_model, self.decoder_model =self.load_model_file() def save_full_weights(self, encoder_path="encoder.weights.h5", decoder_path="decoder.weights.h5"): if self.encoder_model is not None and self.decoder_model is not None: if os.path.exists(encoder_path): os.remove(encoder_path) if os.path.exists(decoder_path): os.remove(decoder_path) self.encoder_model.save_weights(encoder_path) self.decoder_model.save_weights(decoder_path) self.logger.info(f"Encoder weights saved at {encoder_path}.") self.logger.info(f"Decoder weights saved at {decoder_path}.") else: self.logger.warning( "Encoder or Decoder model does not exist. Ensure models are initialized before saving weights.") def load_corpus(self, corpus_path): import convokit self.logger.info("Loading and preprocessing corpus...") self.corpus = convokit.Corpus(filename=corpus_path) self.logger.info("Corpus loaded and preprocessed successfully.") def load_full_weights(self, encoder_path="encoder.weights.h5", decoder_path="decoder.weights.h5"): if self.encoder_model is not None and self.decoder_model is not None: self.encoder_model.load_weights(encoder_path) self.decoder_model.load_weights(decoder_path) self.logger.info(f"Encoder weights loaded from {encoder_path}.") self.logger.info(f"Decoder weights loaded from {decoder_path}.") else: self.logger.warning( "Encoder or Decoder model does not exist. Ensure models are initialized before loading weights.") def plot_and_save_training_metrics(self, history, speaker): # Plot training metrics such as loss and accuracy plt.figure(figsize=(10, 6)) # Plot training loss plt.subplot(1, 2, 1) plt.plot(history.history['loss'], label='Training Loss') plt.plot(history.history['val_loss'], label='Validation Loss') plt.title('Training and Validation Loss') plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend() # Plot training accuracy plt.subplot(1, 2, 2) plt.plot(history.history['accuracy'], label='Training Accuracy') plt.plot(history.history['val_accuracy'], label='Validation Accuracy') plt.title('Training and Validation Accuracy') plt.xlabel('Epoch') plt.ylabel('Accuracy') plt.legend() # Save the plot as an image file # plot_filename = f"{speaker}_training_metrics.png" # plt.tight_layout() # plt.savefig(plot_filename) # Save the plot as an image # plt.close() # Close the plot to free up memory return "Did Not Save in Jupyter Notebook. See plot_and_save_training_metrics" def setup_logger(self): logger = logging.getLogger("ChatbotTrainer") logger.setLevel(logging.DEBUG) # Create console handler and set level to INFO for progress reports console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_formatter = logging.Formatter('%(levelname)s: %(message)s') console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) # Create a file handler and set level to DEBUG for progress reports and ERROR for error notifications file_handler = logging.FileHandler("chatbot.log") file_handler.setLevel(logging.DEBUG) # Set level to DEBUG to capture progress reports file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) return logger # This function allows to reformat the embedding weights to a new max_vocabulary # If max_vocabulary(defined in build_model) is changed incrementally (or set large to begin with; this is N/A) def redo_embeddings(self): # Get current embedding weights old_embedding_weights = self.model.get_layer("embedding").get_weights()[0] # Define new max vocabulary size new_vocab_size = self.max_vocabulary # Set this to the updated size embedding_dim = old_embedding_weights.shape[1] # Expand the embedding matrix new_embedding_weights = np.random.normal(size=(new_vocab_size, embedding_dim)) # Initialize new words randomly new_embedding_weights[:old_embedding_weights.shape[0], :] = old_embedding_weights # Keep old weights # Replace the embedding layer self.model.get_layer("embedding").set_weights([new_embedding_weights]) def save_tokenizer(self, texts=None): if self.tokenizer: if texts: for token in texts: if token not in self.tokenizer.word_index and self.tokenizer.num_words < self.max_vocabulary: self.tokenizer.word_index[token] = self.tokenizer.num_words self.all_vocab_size += 1 self.tokenizer.num_words += 1 # Debug Line # print(f"Word: {token}\nIndex: {self.tokenizer.num_words}") self.max_vocab_size = self.tokenizer.num_words self.tokenizer.fit_on_texts(texts) with open(self.tokenizer_save_path, 'wb') as tokenizer_save_file: pickle.dump(self.tokenizer, tokenizer_save_file) self.tokenizer.num_words = len(self.tokenizer.word_index) elif self.tokenizer == None: self.logger.warning("No tokenizer to save.") def save_embedding_weights(self, filepath="embedding_weights.npy"): if self.model is not None: embedding_layer = self.model.get_layer('embedding') # Extract the weights embedding_weights = embedding_layer.get_weights()[0] # Weights are stored as a list, take the first element # Save weights to a file if os.path.exists(filepath): os.remove(filepath) np.save(filepath, embedding_weights) self.logger.info(f"Embedding weights saved successfully at {filepath}.") else: self.logger.warning("No model exists to extract embedding weights.") def load_embedding_weights(self, filepath="embedding_weights.npy"): if self.model is not None: embedding_layer = self.model.get_layer('embedding') # Load weights from the file embedding_weights = np.load(filepath) # Ensure the weights shape matches the layer's expected shape if embedding_layer.input_dim == embedding_weights.shape[0] and embedding_layer.output_dim == \ embedding_weights.shape[1]: embedding_layer.set_weights([embedding_weights]) self.logger.info(f"Embedding weights loaded successfully from {filepath}.") else: self.logger.error("Mismatch in embedding weights shape. Ensure the model and weights are compatible.") else: self.logger.warning("No model exists to load embedding weights into.") def clean_text(self, text): txt = text.lower().strip() # Contraction mapping (expanded) contractions = { "i'm": "i am", "he's": "he is", "she's": "she is", "that's": "that is", "what's": "what is", "where's": "where is", "who's": "who is", "how's": "how is", "it's": "it is", "let's": "let us", "they're": "they are", "we're": "we are", "you're": "you are", "i've": "i have", "you've": "you have", "we've": "we have", "they've": "they have", "i'd": "i would", "you'd": "you would", "he'd": "he would", "she'd": "she would", "we'd": "we would", "they'd": "they would", "i'll": "i will", "you'll": "you will", "he'll": "he will", "she'll": "she will", "we'll": "we will", "they'll": "they will", "don't": "do not", "doesn't": "does not", "didn't": "did not", "won't": "will not", "wouldn't": "would not", "can't": "cannot", "couldn't": "could not", "shouldn't": "should not", "mightn't": "might not", "mustn't": "must not", "isn't": "is not", "aren't": "are not", "wasn't": "was not", "weren't": "were not", "haven't": "have not", "hasn't": "has not", "hadn't": "had not" } # Expand contractions for contraction, expansion in contractions.items(): txt = re.sub(r"\b" + re.escape(contraction) + r"\b", expansion, txt) # Remove unwanted characters but keep apostrophes txt = re.sub(r"[^a-zA-Z0-9' ]", " ", txt) # Keep words, numbers, and apostrophes txt = re.sub(r"\s+", " ", txt).strip() # Remove extra spaces # Preserve words in vocabulary list for word in txt.split(): if word not in self.vocabularyList: self.vocabularyList.append(word) self.save_tokenizer(self.vocabularyList) return txt # Training def preprocess_texts(self, input_texts, target_texts): input_texts = [self.clean_text(text) for text in input_texts.split(" ")] target_texts = [self.clean_text(text) for text in target_texts.split(" ")] # Initialize lists to store processed inputs and targets input_texts = [f" {texts} " for texts in input_texts if input_texts and input_texts != ""] target_texts = [f" {texts} " for texts in target_texts if target_texts and target_texts != ""] input_sequences = self.tokenizer.texts_to_sequences(input_texts) target_sequences = self.tokenizer.texts_to_sequences(target_texts) input_sequences = pad_sequences(input_sequences, maxlen=self.max_seq_length, padding='post') target_sequences = pad_sequences(target_sequences, maxlen=self.max_seq_length, padding='post') # Ensure target_sequences has enough samples if target_sequences.shape[0] != input_sequences.shape[0]: print(f"Padding mismatch! Input: {input_sequences.shape}, Target: {target_sequences.shape}") if target_sequences.shape[0] < input_sequences.shape[0]: target_sequences = np.resize(target_sequences, input_sequences.shape) if target_sequences.shape[0] > input_sequences.shape[0]: target_sequences = np.resize(input_sequences, target_sequences.shape) return input_sequences, target_sequences # Prediction def preprocess_input(self, texts): preprocessed_input = [""] texts = self.clean_text(texts) preprocessed_text = texts.lower().split(" ") preprocessed_input.extend(preprocessed_text) preprocessed_input.append("") # Convert words to token IDs preprocessed_input = self.tokenizer.texts_to_sequences([preprocessed_input]) preprocessed_input = [item for sublist in preprocessed_input for item in sublist] # Flatten preprocessed_input = np.array(preprocessed_input).reshape(1, -1) # (1, length) preprocessed_input = pad_sequences(preprocessed_input, maxlen=self.max_seq_length, padding='post') # ("Final Input Shape:", preprocessed_input.shape) # Debugging return preprocessed_input def build_model(self): if not self.model: # Encoder self.encoder_inputs = Input(shape=(self.max_seq_length,)) encoder_embedding = Embedding( input_dim=self.max_vocabulary, output_dim=self.embedding_dim, mask_zero=True, embeddings_regularizer=l2(0.01) )(self.encoder_inputs) encoder_lstm = LSTM( self.lstm_units, return_state=True, return_sequences=False, dropout=self.dropout, recurrent_dropout=self.recurrent_dropout ) _, state_h, state_c = encoder_lstm(encoder_embedding) encoder_states = [state_h, state_c] self.encoder_model = Model(self.encoder_inputs, encoder_states) # Decoder self.decoder_inputs = Input(shape=(None,), name='decoder_input') decoder_embedding = Embedding( input_dim=self.max_vocabulary, output_dim=self.embedding_dim, mask_zero=True )(self.decoder_inputs) decoder_lstm = LSTM( self.lstm_units, return_sequences=True, return_state=True, dropout=self.dropout, recurrent_dropout=self.recurrent_dropout, kernel_regularizer=l2(0.001) ) decoder_state_input_h = Input(shape=(self.lstm_units,)) decoder_state_input_c = Input(shape=(self.lstm_units,)) decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c] decoder_lstm_output, state_h, state_c = decoder_lstm(decoder_embedding, initial_state=decoder_states_inputs) decoder_states = [state_h, state_c] decoder_dense = Dense(self.max_vocabulary, activation='softmax', kernel_regularizer=l2(0.001), bias_regularizer=l2(0.001)) self.decoder_outputs = decoder_dense(decoder_lstm_output) self.decoder_model = Model([self.decoder_inputs] + decoder_states_inputs, [self.decoder_outputs] + decoder_states) # Combine encoder and decoder into the full model decoder_lstm_output, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states) self.decoder_outputs = decoder_dense(decoder_lstm_output) self.model = Model([self.encoder_inputs, self.decoder_inputs], self.decoder_outputs) self.model.compile( optimizer=self.optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'] ) return self.model, self.encoder_model, self.decoder_model def load_model_config(self, config_filename="model_config.json"): if os.path.exists(config_filename): with open(config_filename, "r", encoding="utf-8") as f: data = json.load(f) self.logger.info(f"Loading model config from {config_filename}") # Rebuild model from config self.model = model_from_json(data["model_config"]) # Rebuild optimizer self.optimizer = Adam.from_config(data["optimizer"]) # Compile model with restored optimizer self.model.compile( optimizer=self.optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'] ) self.logger.info("Model compiled successfully after loading config.") return self.model return None def train_model(self, input_texts, target_texts, conversation_id, speaker): # We Define running_trouble at the start of a new training self.running_trouble = [] # We make sure everything to do with the model is loaded properly, or generated if it doesn't exist loaded_model = self.load_model_config(config_filename="model_config.json") if os.path.exists(self.model_filename) and os.path.exists(self.encoder_filename) and os.path.exists( self.decoder_filename): self.model, self.encoder_model, self.decoder_model = self.load_model_file() self.logger.info("Loaded full model from saved files.") elif not os.path.exists(self.model_filename) and not os.path.exists(self.encoder_filename) and not os.path.exists( self.decoder_filename) and loaded_model: self.model = loaded_model elif not self.model and not self.encoder_model and not self.decoder_model: self.logger.info("Building new model...") self.model, self.encoder_model, self.decoder_model = self.build_model() # Once everything loads properly we start training: self.logger.info(f"Training Model for ConversationID: {conversation_id}") if self.corpus is None or self.tokenizer is None: raise ValueError("Corpus or tokenizer is not initialized.") # Preprocess the texts into sequences input_sequences, target_sequences = self.preprocess_texts(input_texts, target_texts) # Debug Lines # for token in ['', '', '']: # print(f"Index of {token}: {self.tokenizer.word_index.get(token)}") # Stats self.logger.info(f"Num Words: {self.tokenizer.num_words}") self.logger.info(f"Vocabulary Size: {len(self.tokenizer.word_index)}") self.logger.info(f"Length of Vocabulary List: {len(self.vocabularyList)}") # Prepare training data encoder_input_data = input_sequences decoder_input_data = target_sequences[:, :-1] decoder_target_data = target_sequences[:, 1:] self.logger.info(f"Encoder Input Data Shape: {encoder_input_data.shape}") self.logger.info(f"Decoder Input Data Shape: {decoder_input_data.shape}") self.logger.info(f"Decoder Target Data Shape: {decoder_target_data.shape}") # Instantiate the callback early_stopping = MonitorEarlyStopping( monitor='val_loss', patience=self.early_patience, mode='min', restore_best_weights=True, verbose=1 ) lr_patience = self.early_patience // 3 lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=lr_patience, verbose=1) # Train the model history = self.model.fit( [encoder_input_data, decoder_input_data], np.expand_dims(decoder_target_data, -1), batch_size=self.batch_size, epochs=self.epochs, validation_split=self.test_size, callbacks=[early_stopping, lr_scheduler] ) # Log any early stopping events if len(early_stopping.stopped_epoch_list) > 0: self.troubleList.append(speaker) # Reset stopped epoch list & save to running trouble self.running_trouble = [item for item in early_stopping.stopped_epoch_list] early_stopping.stopped_epoch_list = [] # Evaluate the model on the training data test_loss, test_accuracy = self.model.evaluate( [encoder_input_data, decoder_input_data], np.expand_dims(decoder_target_data, -1), batch_size=self.batch_size ) # Save training metrics as a plot plot_filename = self.plot_and_save_training_metrics(history, speaker) self.logger.info(f"Training metrics plot saved as {plot_filename}") self.logger.info(f"Test loss for Conversation {speaker}: {test_loss}") self.logger.info(f"Test accuracy for Conversation {speaker}: {test_accuracy}") self.logger.info(f"Model trained and saved successfully for speaker: {speaker}") # Compile the model before saving self.model.compile( optimizer=self.optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'] ) # Save the model after training self.save_tokenizer(self.vocabularyList) self.save_model(self.model, self.encoder_model, self.decoder_model) def save_model(self, model, encoder_model, decoder_model): self.logger.info("Saving Model...") if model: self.encoder_model.save(self.encoder_filename) self.logger.info("Encoder saved.") time.sleep(1) self.decoder_model.save(self.decoder_filename) self.logger.info("Decoder saved.") time.sleep(1) self.model.save(self.model_filename) self.logger.info("Model saved.") time.sleep(1) self.save_full_weights() self.save_embedding_weights() else: self.logger.warning("No model to save.") def load_model_file(self): self.logger.info("Loading Model and Tokenizer...") # Load model without the optimizer first model = load_model(self.model_filename, compile=False) # Manually recompile with a fresh Adam optimizer self.optimizer = Adam(learning_rate=self.learning_rate, clipnorm=1.0) model.compile(optimizer=self.optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy']) print("Model Loaded... \nNow loading encoder/decoder models... ") encoder_model = load_model(self.encoder_filename) decoder_model = load_model(self.decoder_filename) print("Decoder and Encoder Loaded... ") self.load_full_weights() self.load_embedding_weights() return model, encoder_model, decoder_model def beam_search(self, input_text): # Preprocess input to match generate_response format input_seq = self.preprocess_input(input_text) # Perform beam search using the BeamSearchHelper class beam_search_helper = BeamSearchHelper( model=self.model, tokenizer=self.tokenizer, max_seq_length=self.max_seq_length, encoder_filename=self.encoder_filename, decoder_filename=self.decoder_filename, top_k=self.top_k, temperature=self.temperature, top_p=self.top_p, beam_width=self.beam_width, scaling_factor=self.scaling_factor ) # Perform beam search output_seq = beam_search_helper.beam_search(input_seq) # Convert token indices back to words output_words = [self.tokenizer.index_word[idx] for idx in output_seq if idx in self.tokenizer.index_word] return " ".join(output_words) def generate_response(self, input_seq): try: # Clean and tokenize input text input_seqs = self.preprocess_input(input_seq) # Encode the input sequence using the encoder model encoder_states = self.encoder_model.predict(input_seqs) state_h, state_c = encoder_states state_h = state_h[0:1, :] # Ensure batch size 1 state_c = state_c[0:1, :] # Initialize the decoder input with the token start_token_index = self.tokenizer.word_index.get('', 1) target_seq = np.zeros((1, 1)) target_seq[0, 0] = start_token_index # Debugging before passing to the decoder # print(f"Initial Target Seq Shape: {target_seq.shape}, state_h Shape: {state_h.shape}, state_c Shape: {state_c.shape}") # Decode the sequence decoded_sentence = [] for _ in range(self.max_seq_length): output_tokens, state_h, state_c = self.decoder_model.predict([target_seq, state_h, state_c]) # Scale logits immediately after getting output_tokens logits = output_tokens[0, -1, :] * self.scaling_factor logits = logits / self.temperature logits = np.clip(logits, -50, 50) # Compute softmax exp_logits = np.exp(logits - np.max(logits)) # Prevent overflow probabilities = exp_logits / np.sum(exp_logits) probabilities = exp_logits / (np.sum(exp_logits) + 1e-8) predicted_token_index = np.random.choice(len(probabilities), p=probabilities) predicted_word = self.reverse_tokenizer.get(predicted_token_index, '') print(f"Logits: {logits[:10]}") # Debugging (First 10 values) print(f"Softmax Probabilities: {probabilities[:10]}") # Debugging if predicted_word == "" and len( decoded_sentence) < self.min_word: continue elif predicted_word == "": break if predicted_word not in ["", "", ""]: decoded_sentence.append(predicted_word) # Update target sequence for the next iteration target_seq[0, 0] = predicted_token_index return " ".join(decoded_sentence).strip() except Exception as e: self.logger.error(f"Error in generate_response: {str(e)}") return "Error"