|
from fastapi import FastAPI, Query,HTTPException |
|
from fastapi.responses import JSONResponse, FileResponse, StreamingResponse |
|
from google.cloud import texttospeech |
|
from google.oauth2.service_account import Credentials |
|
from langchain.schema import HumanMessage |
|
from langchain_groq import ChatGroq |
|
import json |
|
from dotenv import load_dotenv |
|
import os |
|
from utils import ( |
|
extract_titles_and_summaries, |
|
perform_sentiment_analysis, |
|
extract_topics_with_hf, |
|
compare_articles |
|
) |
|
|
|
load_dotenv() |
|
GROQ_API_KEY = os.getenv('GROQ_API_KEY') |
|
PRIVATE_KEY = os.getenv('PRIVATE_KEY').replace("\\n", "\n") |
|
CLIENT_EMAIL = os.getenv('CLIENT_EMAIL') |
|
|
|
app = FastAPI(title="Company Sentiment API", description="Get company news summaries with sentiment analysis") |
|
|
|
llm=ChatGroq(api_key=GROQ_API_KEY, model="llama-3.1-8b-instant") |
|
|
|
JSON_FILE_PATH = "final_summary.json" |
|
AUDIO_FILE_PATH = "hindi_summary.mp3" |
|
|
|
def get_tts_client(): |
|
credentials = Credentials.from_service_account_info({ |
|
"type": "service_account", |
|
"private_key": PRIVATE_KEY, |
|
"client_email": CLIENT_EMAIL, |
|
"token_uri": "https://oauth2.googleapis.com/token" |
|
}) |
|
return texttospeech.TextToSpeechClient(credentials=credentials) |
|
|
|
def generate_summary(company_name): |
|
news_articles = extract_titles_and_summaries(company_name) |
|
news_articles, sentiment_counts = perform_sentiment_analysis(news_articles) |
|
news_articles = extract_topics_with_hf(news_articles) |
|
final_summary = compare_articles(news_articles, sentiment_counts) |
|
hindi_text = "" |
|
if PRIVATE_KEY and CLIENT_EMAIL: |
|
hindi_prompt = f"Just Translate this text into Hindi: {final_summary['Final Sentiment Analysis']}" |
|
hindi_response = llm.invoke([HumanMessage(content=hindi_prompt)]).content |
|
hindi_text = hindi_response.strip() if hindi_response else "Translation not available." |
|
if hindi_text: |
|
print(f"Generated Hindi Text: {hindi_text}") |
|
else: |
|
print("Hindi Text not generated") |
|
|
|
try: |
|
client = get_tts_client() |
|
input_text = texttospeech.SynthesisInput(text=hindi_text) |
|
voice = texttospeech.VoiceSelectionParams( |
|
language_code="hi-IN", |
|
name="hi-IN-Chirp3-HD-Kore" |
|
) |
|
audio_config = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3) |
|
response = client.synthesize_speech(input=input_text, voice=voice, audio_config=audio_config) |
|
with open(AUDIO_FILE_PATH, "wb") as out: |
|
out.write(response.audio_content) |
|
print(f"Audio content written to file: {AUDIO_FILE_PATH}") |
|
|
|
except Exception as e: |
|
print(f"Error generating audio: {e}") |
|
if not os.path.exists(AUDIO_FILE_PATH): |
|
print(f"Audio file could not be found at {AUDIO_FILE_PATH}.") |
|
|
|
final_summary["Audio"] = AUDIO_FILE_PATH |
|
|
|
with open(JSON_FILE_PATH,"w",encoding="utf-8") as f: |
|
json.dump(final_summary,f,ensure_ascii=False, indent=4) |
|
|
|
return { |
|
'Company': final_summary["Company"], |
|
'Articles': [ |
|
{ |
|
'Title': article.get('Title', 'No Title'), |
|
'Summary': article.get('Summary', 'No Summary'), |
|
'Sentiment': article.get('Sentiment', 'Unknown'), |
|
'Score': article.get('Score', 0.0), |
|
'Topics': article.get('Topics', []) |
|
} |
|
for article in final_summary["Articles"] |
|
], |
|
'Comparative Sentiment Score': { |
|
'Sentiment Distribution': sentiment_counts, |
|
'Coverage Differences': final_summary["Comparative Sentiment Score"].get("Coverage Differences", []), |
|
'Topic Overlap': { |
|
'Common Topics': final_summary["Comparative Sentiment Score"].get("Topic Overlap", {}).get("Common Topics", []), |
|
'Unique Topics': final_summary["Comparative Sentiment Score"].get("Topic Overlap", {}).get("Unique Topics", {}) |
|
} |
|
}, |
|
'Final Sentiment Analysis': final_summary["Final Sentiment Analysis"], |
|
'Audio': AUDIO_FILE_PATH |
|
} |
|
|
|
@app.get("/") |
|
def home(): |
|
return {"message": "Welcome to the Company Sentiment API"} |
|
|
|
@app.post("/generateSummary") |
|
def get_summary(company_name: str = Query(..., description="Enter company name")): |
|
structured_summary = generate_summary(company_name) |
|
return structured_summary |
|
|
|
@app.get("/downloadJson") |
|
def download_json(): |
|
return FileResponse(JSON_FILE_PATH, media_type="application/json", filename="final_summary.json") |
|
|
|
@app.get("/downloadHindiAudio") |
|
def download_audio(): |
|
return FileResponse(AUDIO_FILE_PATH, media_type="audio/mp3", filename="hindi_summary.mp3") |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|