import requests from bs4 import BeautifulSoup from transformers import AutoTokenizer, AutoModelForSequenceClassification from transformers import pipeline from collections import Counter import time import numpy as np import yfinance as yf import pandas as pd from datetime import datetime, timedelta import json from typing import Dict, List, Tuple import re # Add this import import warnings warnings.filterwarnings('ignore') # Load FinBERT model_name = "yiyanghkust/finbert-tone" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForSequenceClassification.from_pretrained(model_name) classifier = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer) class StockSentimentAnalyzer: def __init__(self): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) # API setup for Indian stock data self.api_url = "https://indian-stock-exchange-api2.p.rapidapi.com/stock" self.api_headers = { "x-rapidapi-host": "indian-stock-exchange-api2.p.rapidapi.com", "x-rapidapi-key": "a12f59fc40msh153da8fdf3885b6p100406jsn57d1d84b0d06" } self.symbol = None def get_stock_data(self, symbol: str, period: str = "1mo") -> pd.DataFrame: """Fetch stock data from Yahoo Finance""" try: # Add .NS for NSE stocks if not present if not symbol.endswith('.NS') and not symbol.endswith('.BO'): symbol += '.NS' stock = yf.Ticker(symbol) data = stock.history(period=period) return data except Exception as e: print(f"Error fetching stock data for {symbol}: {e}") return pd.DataFrame() def get_news_from_api(self, company_name: str) -> List[Dict]: """Get news articles from the API""" querystring = {"name": company_name} try: response = requests.get(self.api_url, headers=self.api_headers, params=querystring) data = response.json() news_data = data.get("recentNews", {}) return news_data except Exception as e: print(f"Error fetching news from API: {e}") return [] def scrape_news_sentiment(self, company_name: str, symbol: str) -> Dict: """Scrape news sentiment from multiple sources""" news_data = { 'headlines': [], 'sources': [], 'sentiment_scores': [], 'dates': [], 'urls': [] } # Get news from API api_news = self.get_news_from_api(company_name) urls = [item["url"] for item in api_news if isinstance(item, dict) and "url" in item] print(f"Found {len(urls)} news articles from API") # Process each URL for i, news_url in enumerate(urls): try: print(f"\n[{i+1}/{len(urls)}] Analyzing: {news_url[:60]}...") html = requests.get(news_url, timeout=10).text soup = BeautifulSoup(html, "html.parser") # Get title title = soup.title.string if soup.title else "No title" # Grab
tags and filter paragraphs = soup.find_all("p") if not paragraphs: print("→ No content found") continue content = " ".join(p.get_text() for p in paragraphs if len(p.get_text()) > 40) content = content.strip() if len(content) < 100: print("→ Content too short") continue # Truncate to 512 tokens max content = content[:1000] result = classifier(content[:512])[0] label = result['label'].lower() score = result['score'] # Convert FinBERT sentiment to polarity score (-1 to 1) polarity = 0 if label == "positive": polarity = score elif label == "negative": polarity = -score news_data['headlines'].append(title) news_data['sources'].append('API') news_data['sentiment_scores'].append(polarity) news_data['dates'].append(datetime.now()) news_data['urls'].append(news_url) print(f"→ Sentiment: {label.upper()} (confidence: {score:.1%})") time.sleep(1.2) # polite delay except Exception as e: print(f"❌ Error: {str(e)}") continue # Economic Times try: et_url = f"https://economictimes.indiatimes.com/topic/{company_name.replace(' ', '-')}" response = self.session.get(et_url, timeout=10) soup = BeautifulSoup(response.content, 'html.parser') headlines = soup.find_all(['h2', 'h3', 'h4'], class_=re.compile('.*title.*|.*headline.*')) for headline in headlines[:5]: # Limit to 5 headlines text = headline.get_text().strip() if text and len(text) > 10: # Use FinBERT for sentiment analysis result = classifier(text)[0] label = result['label'].lower() score = result['score'] # Convert to polarity polarity = 0 if label == "positive": polarity = score elif label == "negative": polarity = -score news_data['headlines'].append(text) news_data['sources'].append('Economic Times') news_data['sentiment_scores'].append(polarity) news_data['dates'].append(datetime.now()) news_data['urls'].append(et_url) except Exception as e: print(f"Error scraping Economic Times: {e}") return news_data def calculate_news_sentiment_score(self, news_data: Dict) -> Dict: """Calculate various sentiment scores from news data""" if not news_data['sentiment_scores']: return { 'positive_score': 50, 'negative_score': 50, 'fear_score': 50, 'confidence_score': 50, 'overall_sentiment_score': 50 } sentiments = news_data['sentiment_scores'] headlines = news_data['headlines'] # Count sentiments positive_count = sum(1 for s in sentiments if s > 0.1) negative_count = sum(1 for s in sentiments if s < -0.1) neutral_count = len(sentiments) - positive_count - negative_count total = len(sentiments) positive_score = (positive_count / total) * 100 if total > 0 else 50 negative_score = (negative_count / total) * 100 if total > 0 else 50 # Calculate average confidence confidence_values = [abs(s) for s in sentiments] avg_confidence = sum(confidence_values) / len(confidence_values) if confidence_values else 0 confidence_score = avg_confidence * 100 # Fear score based on keywords fear_keywords = ['fall', 'drop', 'crash', 'loss', 'decline', 'bear', 'sell', 'down', 'negative', 'risk'] confidence_keywords = ['rise', 'gain', 'bull', 'buy', 'up', 'positive', 'growth', 'profit', 'strong'] fear_mentions = sum(1 for headline in headlines for keyword in fear_keywords if keyword.lower() in headline.lower()) confidence_mentions = sum(1 for headline in headlines for keyword in confidence_keywords if keyword.lower() in headline.lower()) fear_score = min(100, (fear_mentions / len(headlines)) * 200) if headlines else 50 confidence_boost = min(100, (confidence_mentions / len(headlines)) * 200) if headlines else 50 # Overall sentiment score overall_sentiment = 50 + ((positive_score - negative_score) * 0.3) + ((confidence_boost - fear_score) * 0.2) return { 'positive_score': round(positive_score, 2), 'negative_score': round(negative_score, 2), 'fear_score': round(fear_score, 2), 'confidence_score': round(confidence_score, 2), 'overall_sentiment_score': round(min(100, max(0, overall_sentiment)), 2) } def calculate_volatility_score(self, stock_data: pd.DataFrame) -> float: """Calculate innovative volatility score (0-100)""" if stock_data.empty: return 0 # Calculate different volatility measures returns = stock_data['Close'].pct_change().dropna() # Standard deviation of returns (annualized) std_vol = returns.std() * np.sqrt(252) * 100 # Average True Range volatility high_low = stock_data['High'] - stock_data['Low'] high_close = np.abs(stock_data['High'] - stock_data['Close'].shift()) low_close = np.abs(stock_data['Low'] - stock_data['Close'].shift()) true_range = np.maximum(high_low, np.maximum(high_close, low_close)) atr = true_range.rolling(14).mean().iloc[-1] atr_vol = (atr / stock_data['Close'].iloc[-1]) * 100 # Price range volatility price_range = ((stock_data['High'].max() - stock_data['Low'].min()) / stock_data['Close'].iloc[-1]) * 100 # Combine and normalize to 0-100 scale volatility_score = min(100, (std_vol * 0.4 + atr_vol * 0.4 + price_range * 0.2)) return round(volatility_score, 2) def calculate_momentum_score(self, stock_data: pd.DataFrame) -> float: """Calculate momentum score based on price trends (0-100)""" if stock_data.empty: return 50 # RSI calculation delta = stock_data['Close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) current_rsi = rsi.iloc[-1] if not np.isnan(rsi.iloc[-1]) else 50 # Price momentum (% change over different periods) mom_1d = ((stock_data['Close'].iloc[-1] - stock_data['Close'].iloc[-2]) / stock_data['Close'].iloc[-2]) * 100 mom_5d = ((stock_data['Close'].iloc[-1] - stock_data['Close'].iloc[-6]) / stock_data['Close'].iloc[-6]) * 100 if len(stock_data) > 5 else 0 mom_20d = ((stock_data['Close'].iloc[-1] - stock_data['Close'].iloc[-21]) / stock_data['Close'].iloc[-21]) * 100 if len(stock_data) > 20 else 0 # Moving average trends ma_5 = stock_data['Close'].rolling(5).mean().iloc[-1] ma_20 = stock_data['Close'].rolling(20).mean().iloc[-1] if len(stock_data) > 20 else ma_5 current_price = stock_data['Close'].iloc[-1] ma_score = 50 if current_price > ma_5 > ma_20: ma_score = 75 elif current_price > ma_5: ma_score = 65 elif current_price < ma_5 < ma_20: ma_score = 25 elif current_price < ma_5: ma_score = 35 # Combine scores momentum_score = (current_rsi * 0.4 + ma_score * 0.3 + min(max(mom_1d * 2 + 50, 0), 100) * 0.1 + min(max(mom_5d + 50, 0), 100) * 0.1 + min(max(mom_20d * 0.5 + 50, 0), 100) * 0.1) return round(momentum_score, 2) def calculate_liquidity_score(self, stock_data: pd.DataFrame) -> float: """Calculate liquidity score based on volume patterns (0-100)""" if stock_data.empty: return 0 # Average volume avg_volume = stock_data['Volume'].mean() recent_volume = stock_data['Volume'].tail(5).mean() # Volume trend volume_trend = (recent_volume - avg_volume) / avg_volume * 100 if avg_volume > 0 else 0 # Volume-price relationship price_changes = stock_data['Close'].pct_change() volume_changes = stock_data['Volume'].pct_change() correlation = price_changes.corr(volume_changes) correlation = 0 if np.isnan(correlation) else correlation # Normalize to 0-100 scale volume_score = min(100, max(0, 50 + volume_trend * 0.3 + correlation * 25)) return round(volume_score, 2) def calculate_technical_strength_score(self, stock_data: pd.DataFrame) -> float: """Calculate technical strength based on multiple indicators (0-100)""" if stock_data.empty: return 50 scores = [] # Support/Resistance levels highs = stock_data['High'].rolling(20).max() lows = stock_data['Low'].rolling(20).min() current_price = stock_data['Close'].iloc[-1] # Price position within range price_position = ((current_price - lows.iloc[-1]) / (highs.iloc[-1] - lows.iloc[-1])) * 100 scores.append(min(100, max(0, price_position))) # Volume-weighted average price deviation vwap = (stock_data['Close'] * stock_data['Volume']).sum() / stock_data['Volume'].sum() vwap_score = 50 + ((current_price - vwap) / vwap) * 100 scores.append(min(100, max(0, vwap_score))) # Bollinger Bands position ma_20 = stock_data['Close'].rolling(20).mean() std_20 = stock_data['Close'].rolling(20).std() upper_band = ma_20 + (std_20 * 2) lower_band = ma_20 - (std_20 * 2) if not upper_band.empty and not lower_band.empty: bb_position = ((current_price - lower_band.iloc[-1]) / (upper_band.iloc[-1] - lower_band.iloc[-1])) * 100 scores.append(min(100, max(0, bb_position))) return round(np.mean(scores), 2) def calculate_market_correlation_score(self, symbol: str, stock_data: pd.DataFrame) -> float: """Calculate correlation with major indices (0-100)""" try: # Get Nifty 50 data for comparison nifty = yf.Ticker("^NSEI") nifty_data = nifty.history(period="1mo") if nifty_data.empty or stock_data.empty: return 50 # Align dates common_dates = stock_data.index.intersection(nifty_data.index) if len(common_dates) < 5: return 50 stock_returns = stock_data.loc[common_dates]['Close'].pct_change().dropna() nifty_returns = nifty_data.loc[common_dates]['Close'].pct_change().dropna() # Calculate correlation correlation = stock_returns.corr(nifty_returns) if np.isnan(correlation): return 50 # Convert correlation to 0-100 score # High positive correlation = higher score (follows market) # Negative correlation = lower score (contrarian) correlation_score = (correlation + 1) * 50 return round(correlation_score, 2) except Exception as e: print(f"Error calculating market correlation: {e}") return 50 def calculate_growth_potential_score(self, stock_data: pd.DataFrame) -> float: """Calculate growth potential based on trend analysis (0-100)""" if stock_data.empty: return 50 # Calculate different timeframe growth rates current_price = stock_data['Close'].iloc[-1] growth_scores = [] # Weekly growth if len(stock_data) >= 5: week_ago_price = stock_data['Close'].iloc[-5] weekly_growth = ((current_price - week_ago_price) / week_ago_price) * 100 weekly_score = min(100, max(0, 50 + weekly_growth * 2)) growth_scores.append(weekly_score) # Monthly growth if len(stock_data) >= 20: month_ago_price = stock_data['Close'].iloc[-20] monthly_growth = ((current_price - month_ago_price) / month_ago_price) * 100 monthly_score = min(100, max(0, 50 + monthly_growth)) growth_scores.append(monthly_score) # Volume growth trend recent_volume = stock_data['Volume'].tail(5).mean() earlier_volume = stock_data['Volume'].head(5).mean() if earlier_volume > 0: volume_growth = ((recent_volume - earlier_volume) / earlier_volume) * 100 volume_score = min(100, max(0, 50 + volume_growth * 0.5)) growth_scores.append(volume_score) return round(np.mean(growth_scores) if growth_scores else 50, 2) def calculate_stability_score(self, stock_data: pd.DataFrame) -> float: """Calculate stability score based on price steadiness (0-100)""" if stock_data.empty: return 50 # Calculate coefficient of variation returns = stock_data['Close'].pct_change().dropna() mean_return = returns.mean() std_return = returns.std() if mean_return != 0: cv = abs(std_return / mean_return) # Lower CV = higher stability stability_score = max(0, 100 - cv * 100) else: stability_score = 50 # Consider price gaps gaps = abs(stock_data['Open'] - stock_data['Close'].shift()).dropna() avg_gap = gaps.mean() avg_price = stock_data['Close'].mean() if avg_price > 0: gap_ratio = avg_gap / avg_price gap_penalty = min(50, gap_ratio * 1000) stability_score = max(0, stability_score - gap_penalty) return round(stability_score, 2) def calculate_risk_score(self, analysis: Dict) -> float: """Calculate risk score based on multiple factors""" risk_factors = [ analysis['volatility_score'], analysis['fear_score'], 100 - analysis['liquidity_score'], 100 - analysis['technical_strength_score'], 100 - analysis['stability_score'] ] return round(np.mean(risk_factors), 2) def calculate_investment_attractiveness(self, analysis: Dict) -> float: """Calculate investment attractiveness score""" attractiveness_factors = [ analysis['overall_sentiment_score'], analysis['growth_potential_score'], analysis['momentum_score'], 100 - analysis['risk_score'] ] return round(np.mean(attractiveness_factors), 2) def get_comprehensive_analysis(self, symbol: str, company_name: str = None) -> Dict: """Get comprehensive sentiment analysis for a stock""" # If company name not provided, try to extract from symbol self.symbol = symbol # If company name not provided, try to extract from symbol if not company_name: try: # Add .NS for NSE stocks if not present if not symbol.endswith('.NS') and not symbol.endswith('.BO'): symbol_with_suffix = symbol + '.NS' else: symbol_with_suffix = symbol # Get company info from yfinance ticker = yf.Ticker(symbol_with_suffix) info = ticker.info # Extract company name with multiple fallbacks company_name = ( info.get('longName') or info.get('shortName') or info.get('name') or symbol # Final fallback to symbol ) # Validate the extracted name if company_name: # Remove special characters and check if meaningful cleaned_name = ''.join(c for c in company_name if c.isalnum() or c in (' ', '-', '&')) if (len(cleaned_name.strip()) < 2 or # Too short cleaned_name.strip() == symbol or # Same as symbol any(x in cleaned_name for x in ['-', ' - ']) or # Contains dashes (likely placeholder) cleaned_name.isnumeric()): # Just numbers company_name = symbol # Fallback to symbol if name is invalid else: company_name = symbol except Exception as e: print(f"⚠️ Could not fetch company name for {symbol}: {str(e)}") company_name = symbol # Ensure we have at least the symbol as name company_name = company_name or symbol print(company_name) print(f"\n{'='*80}") print(f"🔍 ANALYZING: {company_name.upper()} ({symbol})") print(f"{'='*80}") # Get stock data print("📊 Fetching stock data...") stock_data = self.get_stock_data(symbol) if stock_data.empty: print("❌ Could not fetch stock data. Please check the symbol.") return {} # Get news sentiment print("📰 Scraping news sentiment...") news_data = self.scrape_news_sentiment(company_name, symbol) # Calculate all scores print("🧮 Calculating sentiment scores...") # Basic stock info current_price = stock_data['Close'].iloc[-1] prev_close = stock_data['Close'].iloc[-2] if len(stock_data) > 1 else current_price price_change = current_price - prev_close price_change_pct = (price_change / prev_close) * 100 if prev_close != 0 else 0 analysis = { 'symbol': symbol, 'company_name': company_name, 'analysis_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'current_price': round(current_price, 2), 'price_change': round(price_change, 2), 'price_change_pct': round(price_change_pct, 2), 'volume': int(stock_data['Volume'].iloc[-1]), 'market_cap_approx': 'N/A', # Would need additional API for exact market cap # Innovative Scores 'volatility_score': self.calculate_volatility_score(stock_data), 'momentum_score': self.calculate_momentum_score(stock_data), 'liquidity_score': self.calculate_liquidity_score(stock_data), 'technical_strength_score': self.calculate_technical_strength_score(stock_data), 'market_correlation_score': self.calculate_market_correlation_score(symbol, stock_data), 'growth_potential_score': self.calculate_growth_potential_score(stock_data), 'stability_score': self.calculate_stability_score(stock_data), # News sentiment scores **self.calculate_news_sentiment_score(news_data), # Additional metrics 'news_count': len(news_data['headlines']), 'recent_headlines': news_data['headlines'][:5] # Top 5 headlines } # Calculate risk score analysis['risk_score'] = self.calculate_risk_score(analysis) # Calculate risk level based on risk score if analysis['risk_score'] >= 75: analysis['risk_level'] = "VERY HIGH" elif analysis['risk_score'] >= 60: analysis['risk_level'] = "HIGH" elif analysis['risk_score'] >= 40: analysis['risk_level'] = "MODERATE" elif analysis['risk_score'] >= 25: analysis['risk_level'] = "LOW" else: analysis['risk_level'] = "VERY LOW" # Add risk factors based on analysis analysis['risk_factors'] = [] if analysis['volatility_score'] > 70: analysis['risk_factors'].append("High market volatility") if analysis['fear_score'] > 60: analysis['risk_factors'].append("Elevated market fear") if analysis['negative_score'] > 60: analysis['risk_factors'].append("Negative sentiment trend") if analysis['market_correlation_score'] < 30: analysis['risk_factors'].append("Low market correlation") if analysis['stability_score'] < 40: analysis['risk_factors'].append("Low stability indicators") # Calculate investment attractiveness analysis['investment_attractiveness_score'] = self.calculate_investment_attractiveness(analysis) return analysis def generate_recommendation(self, analysis: Dict) -> str: """Generate trading recommendation based on analysis""" if not analysis: return "Unable to generate recommendation - insufficient data" sentiment = analysis['overall_sentiment_score'] risk = analysis['risk_score'] momentum = analysis['momentum_score'] volatility = analysis['volatility_score'] attractiveness = analysis['investment_attractiveness_score'] if sentiment > 70 and risk < 40 and momentum > 60 and attractiveness > 65: return "🟢 STRONG BUY - High sentiment, low risk, strong momentum" elif sentiment > 60 and risk < 50 and attractiveness > 55: return "🟢 BUY - Positive sentiment with manageable risk" elif sentiment > 40 and sentiment < 60 and risk < 60: return "🟡 HOLD - Neutral sentiment, monitor closely" elif sentiment < 40 and risk > 60: return "🔴 SELL - Negative sentiment with high risk" elif sentiment < 30 or risk > 75: return "🔴 STRONG SELL - Very negative sentiment or very high risk" else: return "🟡 HOLD - Mixed signals, proceed with caution" def display_analysis(self, analysis: Dict): """Display comprehensive analysis in a formatted way""" if not analysis: print("❌ No analysis data available") return print(f"\n{'='*80}") print(f"📈 COMPREHENSIVE STOCK ANALYSIS REPORT") print(f"{'='*80}") # Basic Info print(f"\n📊 BASIC INFORMATION:") print(f"Company: {analysis['company_name']}") print(f"Symbol: {analysis['symbol']}") print(f"Current Price: ₹{analysis['current_price']}") print(f"Price Change: ₹{analysis['price_change']} ({analysis['price_change_pct']:+.2f}%)") print(f"Volume: {analysis['volume']:,}") print(f"Analysis Date: {analysis['analysis_date']}") # Sentiment Scores print(f"\n🎯 SENTIMENT SCORES (0-100):") print(f"Overall Sentiment Score: {analysis['overall_sentiment_score']}/100") print(f"Positive Score: {analysis['positive_score']}/100") print(f"Negative Score: {analysis['negative_score']}/100") print(f"Fear Score: {analysis['fear_score']}/100") print(f"Confidence Score: {analysis['confidence_score']}/100") # Technical Scores print(f"\n⚙️ TECHNICAL SCORES (0-100):") print(f"Volatility Score: {analysis['volatility_score']}/100") print(f"Momentum Score: {analysis['momentum_score']}/100") print(f"Technical Strength: {analysis['technical_strength_score']}/100") print(f"Liquidity Score: {analysis['liquidity_score']}/100") print(f"Market Correlation: {analysis['market_correlation_score']}/100") # Advanced Scores print(f"\n🚀 ADVANCED SCORES (0-100):") print(f"Growth Potential: {analysis['growth_potential_score']}/100") print(f"Stability Score: {analysis['stability_score']}/100") print(f"Risk Score: {analysis['risk_score']}/100") print(f"Investment Attractiveness: {analysis['investment_attractiveness_score']}/100") # Recommendation recommendation = self.generate_recommendation(analysis) print(f"\n💡 RECOMMENDATION:") print(f"{recommendation}") # News Analysis print(f"\n📰 NEWS ANALYSIS:") print(f"Headlines Analyzed: {analysis['news_count']}") if analysis['recent_headlines']: print(f"\n📋 Recent Headlines:") for i, headline in enumerate(analysis['recent_headlines'], 1): print(f"{i}. {headline}") # Risk Assessment print(f"\n⚠️ RISK ASSESSMENT:") print(f"Risk Level: {analysis['risk_level']}") print(f"Key Risk Factors:") for risk_factor in analysis['risk_factors']: print(f"- {risk_factor}") # Save analysis to JSON output_file = f"analysis_{self.symbol}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(output_file, 'w') as f: json.dump(analysis, f, indent=4) print(f"\n💾 Analysis saved to {output_file}") def main(): """Main function to run the stock analysis""" analyzer = StockSentimentAnalyzer() print("🚀 Welcome to Stock Sentiment Analyzer!") print("Enter stock symbols (e.g., RELIANCE, TCS, HDFCBANK)") print("The system will automatically add .NS for NSE stocks") print("Type 'quit' to exit\n") while True: try: # Get user input user_input = input("Enter stock symbol: ").strip().upper() if user_input.lower() == 'quit': print("👋 Thank you for using Stock Sentiment Analyzer!") break if not user_input: print("❌ Please enter a valid stock symbol") continue # Get company name (optional) company_name = input("Enter company name (optional, press Enter to skip): ").strip() # Perform analysis analysis = analyzer.get_comprehensive_analysis(user_input, company_name if company_name else None) # Display results if analysis: analyzer.display_analysis(analysis) except Exception as e: print(f"❌ Error: {str(e)}") print("Please try again with a different stock symbol") if __name__ == "__main__": main()