from collections import Counter import matplotlib.pyplot as plt import pandas as pd import praw # Reddit's API import re # Regular expression module import streamlit as st import time import numpy as np from wordcloud import WordCloud from transformers import ( pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForTokenClassification, TokenClassificationPipeline ) from transformers.pipelines import AggregationStrategy # Function to normalize text by replacing multiple spaces/newlines with a single space def normalize_text(text): if not isinstance(text, str): return "" return re.sub(r'\s+', ' ', text).strip() # ---------- Cached function for scraping Reddit data ---------- # @st.cache_data(show_spinner=False) def scrape_reddit_data(search_query, total_limit): # Retrieve API credentials from st.secrets reddit = praw.Reddit( client_id=st.secrets["reddit_client_id"], client_secret=st.secrets["reddit_client_secret"], user_agent=st.secrets["reddit_user_agent"] ) subreddit = reddit.subreddit("all") posts_data = [] # Iterate over submissions based on the search query and limit for i, submission in enumerate(subreddit.search(search_query, sort="relevance", limit=total_limit)): # No UI updates here as caching does not allow live progress updates if submission.title and submission.selftext: posts_data.append([ submission.title, submission.url, submission.created_utc, submission.selftext, ]) time.sleep(0.25) df = pd.DataFrame(posts_data, columns=["Title", "URL", "Date", "Detail"]) for col in ["Title", "Detail"]: df[col] = df[col].apply(normalize_text) # Filter out rows with empty Title or Detail df = df[(df["Title"] != "") & (df["Detail"] != "")] df['Date'] = pd.to_datetime(df['Date'], unit='s') df = df.sort_values(by="Date", ascending=True).reset_index(drop=True) return df # ------------------ Sentiment Analysis Functions ------------------------# def split_text_by_token_limit(text, tokenizer, max_tokens): tokens = tokenizer.encode(text, add_special_tokens=False) chunks = [] for i in range(0, len(tokens), max_tokens): chunk_tokens = tokens[i:i+max_tokens] chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True) chunks.append(chunk_text) return chunks # def safe_sentiment(sentiment_pipeline, text, length, progress_bar): # try: # result = sentiment_pipeline(text)[0] # except Exception as e: # result = None # if "count" not in st.session_state: # st.session_state.count = 0 # st.session_state.count += 1 # progress = st.session_state.count / length # progress_bar.progress(progress) # return result def safe_sentiment(sentiment_pipeline, text, length, progress_bar): try: result = sentiment_pipeline(text)[0] except Exception as e: result = None if "count" not in st.session_state: st.session_state.count = 0 st.session_state.count += 1 progress = st.session_state.count / length # Clamp the progress value between 0.0 and 1.0 progress = min(max(progress, 0.0), 1.0) progress_bar.progress(progress) return result def safe_sentiment_batch(sentiment_pipeline, texts): try: results = sentiment_pipeline(texts) except Exception as e: results = [None] * len(texts) return results def analyze_detail(text, tokenizer, sentiment_pipeline, max_tokens): text = preprocess_text(text) chunks = split_text_by_token_limit(text, tokenizer, max_tokens) if not chunks: return None # batch processing (for each chunk) results = safe_sentiment_batch(sentiment_pipeline, chunks) # arrange the result scores = {"POSITIVE": 0, "NEGATIVE": 0, "NEUTRAL": 0} for result in results: if result is not None: label = result['label'].upper() if label in scores: scores[label] += result['score'] final_label = max(scores, key=lambda k: scores[k]) final_score = scores[final_label] return {"label": final_label, "score": final_score} def preprocess_text(text): # Replace URLs and user mentions text = re.sub(r'http\S+', 'http', text) text = re.sub(r'@\w+', '@user', text) return text def generate_variants(keyword): # Split the keyword into individual words words = keyword.split() # Original keyword original = keyword # Convert the keyword to all uppercase letters all_upper = keyword.upper() # Convert the keyword to all lowercase letters all_lower = keyword.lower() # Concatenate words with each word capitalized (no spaces) no_space_title = ''.join(word.capitalize() for word in words) # Concatenate words in all uppercase (no spaces) no_space_upper = ''.join(word.upper() for word in words) # Concatenate words in all lowercase (no spaces) no_space_lower = ''.join(word.lower() for word in words) # Create a string with only the first letter of each word (e.g., MHW) initials = ''.join(word[0].upper() for word in words) # Return all variants as a list return [original, all_upper, all_lower, no_space_title, no_space_upper, no_space_lower, initials] # Function to check if a cell contains any excluded keywords def contains_excluded_keywords(cell, excluded_keywords): if isinstance(cell, np.ndarray): cell_str = ' '.join(map(str, cell)) return any(keyword in cell_str for keyword in excluded_keywords) elif isinstance(cell, str): return any(keyword in cell for keyword in excluded_keywords) return False # Function to extract terms from a cell def extract_terms(cell): if isinstance(cell, np.ndarray): # Convert each element to a string and strip whitespace return [str(item).strip() for item in cell if str(item).strip()] elif isinstance(cell, str): # Split the string by commas and strip whitespace from each term return [term.strip() for term in cell.split(',') if term.strip()] else: return [] # def remove_excluded_from_list(keywords_list, excluded_keywords): # """ # Remove items from the keywords_list if they contain any of the excluded keywords. # This function checks for partial matches in a case-insensitive manner. # """ # if not isinstance(keywords_list, list): # return keywords_list # If it's not a list, return as is # filtered_list = [] # for item in keywords_list: # # Check if item contains any excluded keyword (case-insensitive) # if any(kw.lower() in item.lower() for kw in excluded_keywords): # # Skip this item if it matches an excluded keyword # continue # else: # filtered_list.append(item) # return filtered_list def remove_excluded_from_text(text, excluded_keywords): """ Remove occurrences of any excluded keyword from the text. Matching is case-insensitive. Extra whitespace is cleaned. """ if not isinstance(text, str): return text filtered_text = text for kw in excluded_keywords: # Create a regex pattern for the keyword (case-insensitive) pattern = re.compile(re.escape(kw), re.IGNORECASE) # Replace any occurrence of the keyword with an empty string filtered_text = pattern.sub("", filtered_text) # Remove extra spaces and strip the result filtered_text = re.sub(r'\s+', ' ', filtered_text).strip() return filtered_text def process_extracted_result(result, excluded_keywords): """ Process an extracted result by removing excluded keywords from each string. If result is a list, process each element; if it's a string, process it directly. Return a list of non-empty cleaned strings. """ cleaned_items = [] if isinstance(result, list): for item in result: cleaned_item = remove_excluded_from_text(item, excluded_keywords) if cleaned_item: # Only add non-empty strings cleaned_items.append(cleaned_item) elif isinstance(result, str): cleaned_item = remove_excluded_from_text(result, excluded_keywords) if cleaned_item: cleaned_items.append(cleaned_item) return cleaned_items