Spaces:
Running
Running
import requests | |
from bs4 import BeautifulSoup | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
from transformers import pipeline | |
from collections import Counter | |
import time | |
import numpy as np | |
import yfinance as yf | |
import pandas as pd | |
from datetime import datetime, timedelta | |
import json | |
from typing import Dict, List, Tuple | |
import re # Add this import | |
import warnings | |
warnings.filterwarnings('ignore') | |
# Load FinBERT | |
model_name = "yiyanghkust/finbert-tone" | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
classifier = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer) | |
class StockSentimentAnalyzer: | |
def __init__(self): | |
self.session = requests.Session() | |
self.session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
}) | |
# API setup for Indian stock data | |
self.api_url = "https://indian-stock-exchange-api2.p.rapidapi.com/stock" | |
self.api_headers = { | |
"x-rapidapi-host": "indian-stock-exchange-api2.p.rapidapi.com", | |
"x-rapidapi-key": "a12f59fc40msh153da8fdf3885b6p100406jsn57d1d84b0d06" | |
} | |
self.symbol = None | |
def get_stock_data(self, symbol: str, period: str = "1mo") -> pd.DataFrame: | |
"""Fetch stock data from Yahoo Finance""" | |
try: | |
# Add .NS for NSE stocks if not present | |
if not symbol.endswith('.NS') and not symbol.endswith('.BO'): | |
symbol += '.NS' | |
stock = yf.Ticker(symbol) | |
data = stock.history(period=period) | |
return data | |
except Exception as e: | |
print(f"Error fetching stock data for {symbol}: {e}") | |
return pd.DataFrame() | |
def get_news_from_api(self, company_name: str) -> List[Dict]: | |
"""Get news articles from the API""" | |
querystring = {"name": company_name} | |
try: | |
response = requests.get(self.api_url, headers=self.api_headers, params=querystring) | |
data = response.json() | |
news_data = data.get("recentNews", {}) | |
return news_data | |
except Exception as e: | |
print(f"Error fetching news from API: {e}") | |
return [] | |
def scrape_news_sentiment(self, company_name: str, symbol: str) -> Dict: | |
"""Scrape news sentiment from multiple sources""" | |
news_data = { | |
'headlines': [], | |
'sources': [], | |
'sentiment_scores': [], | |
'dates': [], | |
'urls': [] | |
} | |
# Get news from API | |
api_news = self.get_news_from_api(company_name) | |
urls = [item["url"] for item in api_news if isinstance(item, dict) and "url" in item] | |
print(f"Found {len(urls)} news articles from API") | |
# Process each URL | |
for i, news_url in enumerate(urls): | |
try: | |
print(f"\n[{i+1}/{len(urls)}] Analyzing: {news_url[:60]}...") | |
html = requests.get(news_url, timeout=10).text | |
soup = BeautifulSoup(html, "html.parser") | |
# Get title | |
title = soup.title.string if soup.title else "No title" | |
# Grab <p> tags and filter | |
paragraphs = soup.find_all("p") | |
if not paragraphs: | |
print("→ No content found") | |
continue | |
content = " ".join(p.get_text() for p in paragraphs if len(p.get_text()) > 40) | |
content = content.strip() | |
if len(content) < 100: | |
print("→ Content too short") | |
continue | |
# Truncate to 512 tokens max | |
content = content[:1000] | |
result = classifier(content[:512])[0] | |
label = result['label'].lower() | |
score = result['score'] | |
# Convert FinBERT sentiment to polarity score (-1 to 1) | |
polarity = 0 | |
if label == "positive": | |
polarity = score | |
elif label == "negative": | |
polarity = -score | |
news_data['headlines'].append(title) | |
news_data['sources'].append('API') | |
news_data['sentiment_scores'].append(polarity) | |
news_data['dates'].append(datetime.now()) | |
news_data['urls'].append(news_url) | |
print(f"→ Sentiment: {label.upper()} (confidence: {score:.1%})") | |
time.sleep(1.2) # polite delay | |
except Exception as e: | |
print(f"❌ Error: {str(e)}") | |
continue | |
# Economic Times | |
try: | |
et_url = f"https://economictimes.indiatimes.com/topic/{company_name.replace(' ', '-')}" | |
response = self.session.get(et_url, timeout=10) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
headlines = soup.find_all(['h2', 'h3', 'h4'], class_=re.compile('.*title.*|.*headline.*')) | |
for headline in headlines[:5]: # Limit to 5 headlines | |
text = headline.get_text().strip() | |
if text and len(text) > 10: | |
# Use FinBERT for sentiment analysis | |
result = classifier(text)[0] | |
label = result['label'].lower() | |
score = result['score'] | |
# Convert to polarity | |
polarity = 0 | |
if label == "positive": | |
polarity = score | |
elif label == "negative": | |
polarity = -score | |
news_data['headlines'].append(text) | |
news_data['sources'].append('Economic Times') | |
news_data['sentiment_scores'].append(polarity) | |
news_data['dates'].append(datetime.now()) | |
news_data['urls'].append(et_url) | |
except Exception as e: | |
print(f"Error scraping Economic Times: {e}") | |
return news_data | |
def calculate_news_sentiment_score(self, news_data: Dict) -> Dict: | |
"""Calculate various sentiment scores from news data""" | |
if not news_data['sentiment_scores']: | |
return { | |
'positive_score': 50, | |
'negative_score': 50, | |
'fear_score': 50, | |
'confidence_score': 50, | |
'overall_sentiment_score': 50 | |
} | |
sentiments = news_data['sentiment_scores'] | |
headlines = news_data['headlines'] | |
# Count sentiments | |
positive_count = sum(1 for s in sentiments if s > 0.1) | |
negative_count = sum(1 for s in sentiments if s < -0.1) | |
neutral_count = len(sentiments) - positive_count - negative_count | |
total = len(sentiments) | |
positive_score = (positive_count / total) * 100 if total > 0 else 50 | |
negative_score = (negative_count / total) * 100 if total > 0 else 50 | |
# Calculate average confidence | |
confidence_values = [abs(s) for s in sentiments] | |
avg_confidence = sum(confidence_values) / len(confidence_values) if confidence_values else 0 | |
confidence_score = avg_confidence * 100 | |
# Fear score based on keywords | |
fear_keywords = ['fall', 'drop', 'crash', 'loss', 'decline', 'bear', 'sell', 'down', 'negative', 'risk'] | |
confidence_keywords = ['rise', 'gain', 'bull', 'buy', 'up', 'positive', 'growth', 'profit', 'strong'] | |
fear_mentions = sum(1 for headline in headlines | |
for keyword in fear_keywords | |
if keyword.lower() in headline.lower()) | |
confidence_mentions = sum(1 for headline in headlines | |
for keyword in confidence_keywords | |
if keyword.lower() in headline.lower()) | |
fear_score = min(100, (fear_mentions / len(headlines)) * 200) if headlines else 50 | |
confidence_boost = min(100, (confidence_mentions / len(headlines)) * 200) if headlines else 50 | |
# Overall sentiment score | |
overall_sentiment = 50 + ((positive_score - negative_score) * 0.3) + ((confidence_boost - fear_score) * 0.2) | |
return { | |
'positive_score': round(positive_score, 2), | |
'negative_score': round(negative_score, 2), | |
'fear_score': round(fear_score, 2), | |
'confidence_score': round(confidence_score, 2), | |
'overall_sentiment_score': round(min(100, max(0, overall_sentiment)), 2) | |
} | |
def calculate_volatility_score(self, stock_data: pd.DataFrame) -> float: | |
"""Calculate innovative volatility score (0-100)""" | |
if stock_data.empty: | |
return 0 | |
# Calculate different volatility measures | |
returns = stock_data['Close'].pct_change().dropna() | |
# Standard deviation of returns (annualized) | |
std_vol = returns.std() * np.sqrt(252) * 100 | |
# Average True Range volatility | |
high_low = stock_data['High'] - stock_data['Low'] | |
high_close = np.abs(stock_data['High'] - stock_data['Close'].shift()) | |
low_close = np.abs(stock_data['Low'] - stock_data['Close'].shift()) | |
true_range = np.maximum(high_low, np.maximum(high_close, low_close)) | |
atr = true_range.rolling(14).mean().iloc[-1] | |
atr_vol = (atr / stock_data['Close'].iloc[-1]) * 100 | |
# Price range volatility | |
price_range = ((stock_data['High'].max() - stock_data['Low'].min()) / stock_data['Close'].iloc[-1]) * 100 | |
# Combine and normalize to 0-100 scale | |
volatility_score = min(100, (std_vol * 0.4 + atr_vol * 0.4 + price_range * 0.2)) | |
return round(volatility_score, 2) | |
def calculate_momentum_score(self, stock_data: pd.DataFrame) -> float: | |
"""Calculate momentum score based on price trends (0-100)""" | |
if stock_data.empty: | |
return 50 | |
# RSI calculation | |
delta = stock_data['Close'].diff() | |
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() | |
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() | |
rs = gain / loss | |
rsi = 100 - (100 / (1 + rs)) | |
current_rsi = rsi.iloc[-1] if not np.isnan(rsi.iloc[-1]) else 50 | |
# Price momentum (% change over different periods) | |
mom_1d = ((stock_data['Close'].iloc[-1] - stock_data['Close'].iloc[-2]) / stock_data['Close'].iloc[-2]) * 100 | |
mom_5d = ((stock_data['Close'].iloc[-1] - stock_data['Close'].iloc[-6]) / stock_data['Close'].iloc[-6]) * 100 if len(stock_data) > 5 else 0 | |
mom_20d = ((stock_data['Close'].iloc[-1] - stock_data['Close'].iloc[-21]) / stock_data['Close'].iloc[-21]) * 100 if len(stock_data) > 20 else 0 | |
# Moving average trends | |
ma_5 = stock_data['Close'].rolling(5).mean().iloc[-1] | |
ma_20 = stock_data['Close'].rolling(20).mean().iloc[-1] if len(stock_data) > 20 else ma_5 | |
current_price = stock_data['Close'].iloc[-1] | |
ma_score = 50 | |
if current_price > ma_5 > ma_20: | |
ma_score = 75 | |
elif current_price > ma_5: | |
ma_score = 65 | |
elif current_price < ma_5 < ma_20: | |
ma_score = 25 | |
elif current_price < ma_5: | |
ma_score = 35 | |
# Combine scores | |
momentum_score = (current_rsi * 0.4 + ma_score * 0.3 + | |
min(max(mom_1d * 2 + 50, 0), 100) * 0.1 + | |
min(max(mom_5d + 50, 0), 100) * 0.1 + | |
min(max(mom_20d * 0.5 + 50, 0), 100) * 0.1) | |
return round(momentum_score, 2) | |
def calculate_liquidity_score(self, stock_data: pd.DataFrame) -> float: | |
"""Calculate liquidity score based on volume patterns (0-100)""" | |
if stock_data.empty: | |
return 0 | |
# Average volume | |
avg_volume = stock_data['Volume'].mean() | |
recent_volume = stock_data['Volume'].tail(5).mean() | |
# Volume trend | |
volume_trend = (recent_volume - avg_volume) / avg_volume * 100 if avg_volume > 0 else 0 | |
# Volume-price relationship | |
price_changes = stock_data['Close'].pct_change() | |
volume_changes = stock_data['Volume'].pct_change() | |
correlation = price_changes.corr(volume_changes) | |
correlation = 0 if np.isnan(correlation) else correlation | |
# Normalize to 0-100 scale | |
volume_score = min(100, max(0, 50 + volume_trend * 0.3 + correlation * 25)) | |
return round(volume_score, 2) | |
def calculate_technical_strength_score(self, stock_data: pd.DataFrame) -> float: | |
"""Calculate technical strength based on multiple indicators (0-100)""" | |
if stock_data.empty: | |
return 50 | |
scores = [] | |
# Support/Resistance levels | |
highs = stock_data['High'].rolling(20).max() | |
lows = stock_data['Low'].rolling(20).min() | |
current_price = stock_data['Close'].iloc[-1] | |
# Price position within range | |
price_position = ((current_price - lows.iloc[-1]) / (highs.iloc[-1] - lows.iloc[-1])) * 100 | |
scores.append(min(100, max(0, price_position))) | |
# Volume-weighted average price deviation | |
vwap = (stock_data['Close'] * stock_data['Volume']).sum() / stock_data['Volume'].sum() | |
vwap_score = 50 + ((current_price - vwap) / vwap) * 100 | |
scores.append(min(100, max(0, vwap_score))) | |
# Bollinger Bands position | |
ma_20 = stock_data['Close'].rolling(20).mean() | |
std_20 = stock_data['Close'].rolling(20).std() | |
upper_band = ma_20 + (std_20 * 2) | |
lower_band = ma_20 - (std_20 * 2) | |
if not upper_band.empty and not lower_band.empty: | |
bb_position = ((current_price - lower_band.iloc[-1]) / | |
(upper_band.iloc[-1] - lower_band.iloc[-1])) * 100 | |
scores.append(min(100, max(0, bb_position))) | |
return round(np.mean(scores), 2) | |
def calculate_market_correlation_score(self, symbol: str, stock_data: pd.DataFrame) -> float: | |
"""Calculate correlation with major indices (0-100)""" | |
try: | |
# Get Nifty 50 data for comparison | |
nifty = yf.Ticker("^NSEI") | |
nifty_data = nifty.history(period="1mo") | |
if nifty_data.empty or stock_data.empty: | |
return 50 | |
# Align dates | |
common_dates = stock_data.index.intersection(nifty_data.index) | |
if len(common_dates) < 5: | |
return 50 | |
stock_returns = stock_data.loc[common_dates]['Close'].pct_change().dropna() | |
nifty_returns = nifty_data.loc[common_dates]['Close'].pct_change().dropna() | |
# Calculate correlation | |
correlation = stock_returns.corr(nifty_returns) | |
if np.isnan(correlation): | |
return 50 | |
# Convert correlation to 0-100 score | |
# High positive correlation = higher score (follows market) | |
# Negative correlation = lower score (contrarian) | |
correlation_score = (correlation + 1) * 50 | |
return round(correlation_score, 2) | |
except Exception as e: | |
print(f"Error calculating market correlation: {e}") | |
return 50 | |
def calculate_growth_potential_score(self, stock_data: pd.DataFrame) -> float: | |
"""Calculate growth potential based on trend analysis (0-100)""" | |
if stock_data.empty: | |
return 50 | |
# Calculate different timeframe growth rates | |
current_price = stock_data['Close'].iloc[-1] | |
growth_scores = [] | |
# Weekly growth | |
if len(stock_data) >= 5: | |
week_ago_price = stock_data['Close'].iloc[-5] | |
weekly_growth = ((current_price - week_ago_price) / week_ago_price) * 100 | |
weekly_score = min(100, max(0, 50 + weekly_growth * 2)) | |
growth_scores.append(weekly_score) | |
# Monthly growth | |
if len(stock_data) >= 20: | |
month_ago_price = stock_data['Close'].iloc[-20] | |
monthly_growth = ((current_price - month_ago_price) / month_ago_price) * 100 | |
monthly_score = min(100, max(0, 50 + monthly_growth)) | |
growth_scores.append(monthly_score) | |
# Volume growth trend | |
recent_volume = stock_data['Volume'].tail(5).mean() | |
earlier_volume = stock_data['Volume'].head(5).mean() | |
if earlier_volume > 0: | |
volume_growth = ((recent_volume - earlier_volume) / earlier_volume) * 100 | |
volume_score = min(100, max(0, 50 + volume_growth * 0.5)) | |
growth_scores.append(volume_score) | |
return round(np.mean(growth_scores) if growth_scores else 50, 2) | |
def calculate_stability_score(self, stock_data: pd.DataFrame) -> float: | |
"""Calculate stability score based on price steadiness (0-100)""" | |
if stock_data.empty: | |
return 50 | |
# Calculate coefficient of variation | |
returns = stock_data['Close'].pct_change().dropna() | |
mean_return = returns.mean() | |
std_return = returns.std() | |
if mean_return != 0: | |
cv = abs(std_return / mean_return) | |
# Lower CV = higher stability | |
stability_score = max(0, 100 - cv * 100) | |
else: | |
stability_score = 50 | |
# Consider price gaps | |
gaps = abs(stock_data['Open'] - stock_data['Close'].shift()).dropna() | |
avg_gap = gaps.mean() | |
avg_price = stock_data['Close'].mean() | |
if avg_price > 0: | |
gap_ratio = avg_gap / avg_price | |
gap_penalty = min(50, gap_ratio * 1000) | |
stability_score = max(0, stability_score - gap_penalty) | |
return round(stability_score, 2) | |
def calculate_risk_score(self, analysis: Dict) -> float: | |
"""Calculate risk score based on multiple factors""" | |
risk_factors = [ | |
analysis['volatility_score'], | |
analysis['fear_score'], | |
100 - analysis['liquidity_score'], | |
100 - analysis['technical_strength_score'], | |
100 - analysis['stability_score'] | |
] | |
return round(np.mean(risk_factors), 2) | |
def calculate_investment_attractiveness(self, analysis: Dict) -> float: | |
"""Calculate investment attractiveness score""" | |
attractiveness_factors = [ | |
analysis['overall_sentiment_score'], | |
analysis['growth_potential_score'], | |
analysis['momentum_score'], | |
100 - analysis['risk_score'] | |
] | |
return round(np.mean(attractiveness_factors), 2) | |
def get_comprehensive_analysis(self, symbol: str, company_name: str = None) -> Dict: | |
"""Get comprehensive sentiment analysis for a stock""" | |
# If company name not provided, try to extract from symbol | |
self.symbol = symbol | |
# If company name not provided, try to extract from symbol | |
if not company_name: | |
try: | |
# Add .NS for NSE stocks if not present | |
if not symbol.endswith('.NS') and not symbol.endswith('.BO'): | |
symbol_with_suffix = symbol + '.NS' | |
else: | |
symbol_with_suffix = symbol | |
# Get company info from yfinance | |
ticker = yf.Ticker(symbol_with_suffix) | |
info = ticker.info | |
# Extract company name with multiple fallbacks | |
company_name = ( | |
info.get('longName') or | |
info.get('shortName') or | |
info.get('name') or | |
symbol # Final fallback to symbol | |
) | |
# Validate the extracted name | |
if company_name: | |
# Remove special characters and check if meaningful | |
cleaned_name = ''.join(c for c in company_name if c.isalnum() or c in (' ', '-', '&')) | |
if (len(cleaned_name.strip()) < 2 or # Too short | |
cleaned_name.strip() == symbol or # Same as symbol | |
any(x in cleaned_name for x in ['-', ' - ']) or # Contains dashes (likely placeholder) | |
cleaned_name.isnumeric()): # Just numbers | |
company_name = symbol # Fallback to symbol if name is invalid | |
else: | |
company_name = symbol | |
except Exception as e: | |
print(f"⚠️ Could not fetch company name for {symbol}: {str(e)}") | |
company_name = symbol | |
# Ensure we have at least the symbol as name | |
company_name = company_name or symbol | |
print(company_name) | |
print(f"\n{'='*80}") | |
print(f"🔍 ANALYZING: {company_name.upper()} ({symbol})") | |
print(f"{'='*80}") | |
# Get stock data | |
print("📊 Fetching stock data...") | |
stock_data = self.get_stock_data(symbol) | |
if stock_data.empty: | |
print("❌ Could not fetch stock data. Please check the symbol.") | |
return {} | |
# Get news sentiment | |
print("📰 Scraping news sentiment...") | |
news_data = self.scrape_news_sentiment(company_name, symbol) | |
# Calculate all scores | |
print("🧮 Calculating sentiment scores...") | |
# Basic stock info | |
current_price = stock_data['Close'].iloc[-1] | |
prev_close = stock_data['Close'].iloc[-2] if len(stock_data) > 1 else current_price | |
price_change = current_price - prev_close | |
price_change_pct = (price_change / prev_close) * 100 if prev_close != 0 else 0 | |
analysis = { | |
'symbol': symbol, | |
'company_name': company_name, | |
'analysis_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
'current_price': round(current_price, 2), | |
'price_change': round(price_change, 2), | |
'price_change_pct': round(price_change_pct, 2), | |
'volume': int(stock_data['Volume'].iloc[-1]), | |
'market_cap_approx': 'N/A', # Would need additional API for exact market cap | |
# Innovative Scores | |
'volatility_score': self.calculate_volatility_score(stock_data), | |
'momentum_score': self.calculate_momentum_score(stock_data), | |
'liquidity_score': self.calculate_liquidity_score(stock_data), | |
'technical_strength_score': self.calculate_technical_strength_score(stock_data), | |
'market_correlation_score': self.calculate_market_correlation_score(symbol, stock_data), | |
'growth_potential_score': self.calculate_growth_potential_score(stock_data), | |
'stability_score': self.calculate_stability_score(stock_data), | |
# News sentiment scores | |
**self.calculate_news_sentiment_score(news_data), | |
# Additional metrics | |
'news_count': len(news_data['headlines']), | |
'recent_headlines': news_data['headlines'][:5] # Top 5 headlines | |
} | |
# Calculate risk score | |
analysis['risk_score'] = self.calculate_risk_score(analysis) | |
# Calculate risk level based on risk score | |
if analysis['risk_score'] >= 75: | |
analysis['risk_level'] = "VERY HIGH" | |
elif analysis['risk_score'] >= 60: | |
analysis['risk_level'] = "HIGH" | |
elif analysis['risk_score'] >= 40: | |
analysis['risk_level'] = "MODERATE" | |
elif analysis['risk_score'] >= 25: | |
analysis['risk_level'] = "LOW" | |
else: | |
analysis['risk_level'] = "VERY LOW" | |
# Add risk factors based on analysis | |
analysis['risk_factors'] = [] | |
if analysis['volatility_score'] > 70: | |
analysis['risk_factors'].append("High market volatility") | |
if analysis['fear_score'] > 60: | |
analysis['risk_factors'].append("Elevated market fear") | |
if analysis['negative_score'] > 60: | |
analysis['risk_factors'].append("Negative sentiment trend") | |
if analysis['market_correlation_score'] < 30: | |
analysis['risk_factors'].append("Low market correlation") | |
if analysis['stability_score'] < 40: | |
analysis['risk_factors'].append("Low stability indicators") | |
# Calculate investment attractiveness | |
analysis['investment_attractiveness_score'] = self.calculate_investment_attractiveness(analysis) | |
return analysis | |
def generate_recommendation(self, analysis: Dict) -> str: | |
"""Generate trading recommendation based on analysis""" | |
if not analysis: | |
return "Unable to generate recommendation - insufficient data" | |
sentiment = analysis['overall_sentiment_score'] | |
risk = analysis['risk_score'] | |
momentum = analysis['momentum_score'] | |
volatility = analysis['volatility_score'] | |
attractiveness = analysis['investment_attractiveness_score'] | |
if sentiment > 70 and risk < 40 and momentum > 60 and attractiveness > 65: | |
return "🟢 STRONG BUY - High sentiment, low risk, strong momentum" | |
elif sentiment > 60 and risk < 50 and attractiveness > 55: | |
return "🟢 BUY - Positive sentiment with manageable risk" | |
elif sentiment > 40 and sentiment < 60 and risk < 60: | |
return "🟡 HOLD - Neutral sentiment, monitor closely" | |
elif sentiment < 40 and risk > 60: | |
return "🔴 SELL - Negative sentiment with high risk" | |
elif sentiment < 30 or risk > 75: | |
return "🔴 STRONG SELL - Very negative sentiment or very high risk" | |
else: | |
return "🟡 HOLD - Mixed signals, proceed with caution" | |
def display_analysis(self, analysis: Dict): | |
"""Display comprehensive analysis in a formatted way""" | |
if not analysis: | |
print("❌ No analysis data available") | |
return | |
print(f"\n{'='*80}") | |
print(f"📈 COMPREHENSIVE STOCK ANALYSIS REPORT") | |
print(f"{'='*80}") | |
# Basic Info | |
print(f"\n📊 BASIC INFORMATION:") | |
print(f"Company: {analysis['company_name']}") | |
print(f"Symbol: {analysis['symbol']}") | |
print(f"Current Price: ₹{analysis['current_price']}") | |
print(f"Price Change: ₹{analysis['price_change']} ({analysis['price_change_pct']:+.2f}%)") | |
print(f"Volume: {analysis['volume']:,}") | |
print(f"Analysis Date: {analysis['analysis_date']}") | |
# Sentiment Scores | |
print(f"\n🎯 SENTIMENT SCORES (0-100):") | |
print(f"Overall Sentiment Score: {analysis['overall_sentiment_score']}/100") | |
print(f"Positive Score: {analysis['positive_score']}/100") | |
print(f"Negative Score: {analysis['negative_score']}/100") | |
print(f"Fear Score: {analysis['fear_score']}/100") | |
print(f"Confidence Score: {analysis['confidence_score']}/100") | |
# Technical Scores | |
print(f"\n⚙️ TECHNICAL SCORES (0-100):") | |
print(f"Volatility Score: {analysis['volatility_score']}/100") | |
print(f"Momentum Score: {analysis['momentum_score']}/100") | |
print(f"Technical Strength: {analysis['technical_strength_score']}/100") | |
print(f"Liquidity Score: {analysis['liquidity_score']}/100") | |
print(f"Market Correlation: {analysis['market_correlation_score']}/100") | |
# Advanced Scores | |
print(f"\n🚀 ADVANCED SCORES (0-100):") | |
print(f"Growth Potential: {analysis['growth_potential_score']}/100") | |
print(f"Stability Score: {analysis['stability_score']}/100") | |
print(f"Risk Score: {analysis['risk_score']}/100") | |
print(f"Investment Attractiveness: {analysis['investment_attractiveness_score']}/100") | |
# Recommendation | |
recommendation = self.generate_recommendation(analysis) | |
print(f"\n💡 RECOMMENDATION:") | |
print(f"{recommendation}") | |
# News Analysis | |
print(f"\n📰 NEWS ANALYSIS:") | |
print(f"Headlines Analyzed: {analysis['news_count']}") | |
if analysis['recent_headlines']: | |
print(f"\n📋 Recent Headlines:") | |
for i, headline in enumerate(analysis['recent_headlines'], 1): | |
print(f"{i}. {headline}") | |
# Risk Assessment | |
print(f"\n⚠️ RISK ASSESSMENT:") | |
print(f"Risk Level: {analysis['risk_level']}") | |
print(f"Key Risk Factors:") | |
for risk_factor in analysis['risk_factors']: | |
print(f"- {risk_factor}") | |
# Save analysis to JSON | |
output_file = f"analysis_{self.symbol}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
with open(output_file, 'w') as f: | |
json.dump(analysis, f, indent=4) | |
print(f"\n💾 Analysis saved to {output_file}") | |
def main(): | |
"""Main function to run the stock analysis""" | |
analyzer = StockSentimentAnalyzer() | |
print("🚀 Welcome to Stock Sentiment Analyzer!") | |
print("Enter stock symbols (e.g., RELIANCE, TCS, HDFCBANK)") | |
print("The system will automatically add .NS for NSE stocks") | |
print("Type 'quit' to exit\n") | |
while True: | |
try: | |
# Get user input | |
user_input = input("Enter stock symbol: ").strip().upper() | |
if user_input.lower() == 'quit': | |
print("👋 Thank you for using Stock Sentiment Analyzer!") | |
break | |
if not user_input: | |
print("❌ Please enter a valid stock symbol") | |
continue | |
# Get company name (optional) | |
company_name = input("Enter company name (optional, press Enter to skip): ").strip() | |
# Perform analysis | |
analysis = analyzer.get_comprehensive_analysis(user_input, company_name if company_name else None) | |
# Display results | |
if analysis: | |
analyzer.display_analysis(analysis) | |
except Exception as e: | |
print(f"❌ Error: {str(e)}") | |
print("Please try again with a different stock symbol") | |
if __name__ == "__main__": | |
main() |