Spaces:
Running
Running
import requests | |
from bs4 import BeautifulSoup | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
from transformers import pipeline | |
from collections import Counter | |
import time | |
import json | |
import numpy as np | |
def sentiment_analysis(querystring, headers): | |
# Load FinBERT | |
model_name = "yiyanghkust/finbert-tone" | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
classifier = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer) | |
def calculate_sentiment_scores(sentiment_data): | |
# Convert list values to their lengths, excluding 'details' | |
processed = { | |
k: len(v) if isinstance(v, list) and k != 'details' else v | |
for k, v in sentiment_data.items() if k != 'details' | |
} | |
total = sum(processed.values()) | |
return { | |
"overall": max(processed, key=processed.get) if processed else "neutral", | |
"positive_percent": processed.get("positive", 0) / total * 100 if total > 0 else 0, | |
"negative_percent": processed.get("negative", 0) / total * 100 if total > 0 else 0, | |
"sentiment_ratio": processed.get("positive", 0) / processed.get("negative", 1) if processed.get("negative", 1) != 0 else float('-99999999'), | |
"average_confidence": sum(sentiment_data.get("confidence", [0])) / len(sentiment_data.get("confidence", [0])) if sentiment_data.get("confidence") else 0 | |
} | |
# API setup | |
url = "https://indian-stock-exchange-api2.p.rapidapi.com/stock" | |
# Step 1: Get stock data | |
print("Fetching stock data...") | |
response = requests.get(url, headers=headers, params=querystring) | |
data = response.json() | |
news_data = data.get("recentNews", {}) | |
print(f"Found {len(news_data)} news articles") | |
# Step 2: Extract URLs | |
urls = [item["url"] for item in news_data if isinstance(item, dict) and "url" in item] | |
print(f"Processing {len(urls)} articles...") | |
# Step 3: Analyze sentiment for each article | |
summary = Counter() | |
details = [] | |
for i, news_item in enumerate(news_data): | |
news_url = news_item.get("url") | |
headline = news_item.get("headline", "") | |
intro = news_item.get("intro", "") | |
content_for_sentiment = "" | |
if news_url: | |
try: | |
print(f"\n[{i+1}/{len(urls)}] Analyzing: {news_url[:60]}...") | |
html = requests.get(news_url, timeout=10).text | |
soup = BeautifulSoup(html, "html.parser") | |
# Grab <p> tags and filter | |
paragraphs = soup.find_all("p") | |
if not paragraphs: | |
raise ValueError("No content found in paragraphs") | |
content_for_sentiment = " ".join(p.get_text() for p in paragraphs if len(p.get_text()) > 40) | |
content_for_sentiment = content_for_sentiment.strip() | |
if len(content_for_sentiment) < 100: | |
print("β Content too short from web scraping, falling back to headline/intro") | |
content_for_sentiment = headline + " ." + intro | |
except Exception as e: | |
print(f"β Error scraping {news_url}: {str(e)}. Falling back to headline/intro for sentiment analysis.") | |
content_for_sentiment = headline + " ." + intro | |
else: | |
print(f"\n[{i+1}/{len(urls)}] No URL provided, using headline/intro for sentiment analysis.") | |
content_for_sentiment = headline + " ." + intro | |
if not content_for_sentiment.strip(): | |
print("β No content available for sentiment analysis, skipping.") | |
continue | |
# Truncate to 512 tokens max | |
content_for_sentiment = content_for_sentiment[:1000] | |
result = classifier(content_for_sentiment[:512])[0] | |
label = result['label'].lower() | |
score = round(result['score'], 3) | |
summary[label] += 1 | |
details.append({ | |
"url": news_url, | |
"title": news_item.get("title", "No title"), # Use title from news_item if available | |
"sentiment": label, | |
"confidence": score, | |
"content_length": len(content_for_sentiment), | |
"image_222x148": news_item.get("image_222x148"), | |
"intro": intro, | |
"headline": headline | |
}) | |
print(f"β Sentiment: {label.upper()} (confidence: {score:.1%})") | |
time.sleep(1.2) | |
# Step 4: Generate comprehensive output | |
sentiment_scores = calculate_sentiment_scores({ | |
"positive": summary["positive"], | |
"negative": summary["negative"], | |
"neutral": summary["neutral"], | |
"details": details | |
}) | |
output = { | |
"metadata": { | |
"total_articles": len(urls), | |
"processed_articles": len(details), | |
"processing_time": time.strftime("%Y-%m-%d %H:%M:%S") | |
}, | |
"sentiment_metrics": { | |
"overall_score": sentiment_scores["overall"], # Removed round() for string label | |
"positive_score": round(sentiment_scores["positive_percent"], 2), | |
"negative_score": round(sentiment_scores["negative_percent"], 2), | |
"sentiment_ratio": round(sentiment_scores["sentiment_ratio"], 2), | |
"average_confidence": round(sentiment_scores["average_confidence"], 2) | |
}, | |
"article_details": details | |
} | |
# Print formatted results | |
print("\n=== SENTIMENT ANALYSIS RESULTS ===") | |
print(f"Overall Sentiment Score: {output['sentiment_metrics']['overall_score']}") # Updated print statement | |
print(f"Positive/Negative Ratio: {output['sentiment_metrics']['sentiment_ratio']:.2f}") | |
print(f"Average Confidence: {output['sentiment_metrics']['average_confidence']:.1f}%") | |
import json | |
with open("sentiment_results.json", "w") as f: | |
json.dump(output, f, indent=2) | |
print("Results saved to sentiment_results.json") | |
return output | |
def mainOne(querystring): | |
""" | |
Main function that takes querystring as parameter and runs sentiment analysis | |
Args: | |
querystring: Dictionary containing stock name (e.g. {'name': 'HDFC BANK'}) | |
Returns: | |
Dictionary containing sentiment analysis results | |
""" | |
try: | |
headers = { | |
"x-rapidapi-host": "indian-stock-exchange-api2.p.rapidapi.com", | |
"x-rapidapi-key": "a12f59fc40msh153da8fdf3885b6p100406jsn57d1d84b0d06" | |
} | |
# Run the sentiment analysis | |
results = sentiment_analysis(querystring, headers) | |
return results | |
except Exception as e: | |
print(f"Error in main function: {str(e)}") | |
return {"error": str(e)} |