iamseyhmus7's picture
Update app/model.py
3d8fdbe verified
import os
# 🚨 HF cache dizinini /tmp altına al!
os.environ["HF_HOME"] = "/tmp/hf"
os.environ["HF_DATASETS_CACHE"] = "/tmp/hf/datasets"
os.environ["HF_METRICS_CACHE"] = "/tmp/hf/metrics"
os.environ["TRANSFORMERS_CACHE"] = "/tmp/hf/transformers"
os.environ["HF_HUB_CACHE"] = "/tmp/hf/hub"
import asyncio
import logging
import re
import yaml
import torch
import numpy as np
from functools import lru_cache
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from transformers import AutoTokenizer, AutoModelForCausalLM
from sentence_transformers import SentenceTransformer, CrossEncoder
from pinecone import Pinecone
from pathlib import Path
from dotenv import load_dotenv
from typing import Dict
# === LOGGING ===
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# === CONFIG LOAD ===
CONFIG_PATH = Path(__file__).resolve().parent / "config.yaml"
def load_config() -> Dict:
try:
with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
logger.error(f"Konfigürasyon dosyası yüklenemedi: {e}")
return {
"pinecone": {"top_k": 10, "rerank_top": 5, "batch_size": 32},
"model": {"max_new_tokens": 50, "temperature": 0.7},
"cache": {"maxsize": 100}
}
config = load_config()
# === ENV LOAD ===
env_path = Path(__file__).resolve().parent.parent / "RAG" / ".env"
load_dotenv(dotenv_path=env_path)
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_ENV = os.getenv("PINECONE_ENVIRONMENT")
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME")
if not all([PINECONE_API_KEY, PINECONE_ENV, PINECONE_INDEX_NAME]):
raise ValueError("Pinecone ortam değişkenleri eksik!")
# === PINECONE CONNECT ===
pinecone_client = Pinecone(api_key=PINECONE_API_KEY, environment=PINECONE_ENV)
try:
index = pinecone_client.Index(PINECONE_INDEX_NAME)
index_stats = index.describe_index_stats()
logger.info(f"Pinecone index stats: {index_stats}")
except Exception as e:
logger.error(f"Pinecone bağlantı hatası: {e}")
raise
# === MODEL LOAD ===
MODEL_PATH = "iamseyhmus7/GenerationTurkishGPT2_final"
try:
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
model = AutoModelForCausalLM.from_pretrained(MODEL_PATH)
tokenizer.pad_token = tokenizer.eos_token
model.config.pad_token_id = tokenizer.pad_token_id
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
logger.info(f"Model {MODEL_PATH} Hugging Face Hub'dan yüklendi, cihaz: {device}")
except Exception as e:
logger.error(f"Model yükleme hatası: {e}")
raise
# === EMBEDDING MODELS ===
embedder = SentenceTransformer("intfloat/multilingual-e5-large", device="cpu")
cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", device="cpu")
logger.info("Embedding ve reranking modelleri yüklendi")
# === FASTAPI ===
app = FastAPI()
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
app.mount("/static", StaticFiles(directory=os.path.join(BASE_DIR, "static")), name="static")
templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates"))
class QuestionRequest(BaseModel):
query: str
def clean_text_output(text: str) -> str:
"""
Tüm prompt, komut, yönerge, link ve gereksiz açıklamaları temizler.
Sadece net, kısa yanıtı bırakır.
"""
# Modelin başındaki yönerge/talimat cümleleri
text = re.sub(
r"^(Sadece doğru, kısa ve açık bilgi ver\.? Ekstra açıklama veya kaynak ekleme\.?)",
"", text, flags=re.IGNORECASE
)
# Büyük prompt ve yönergeleri sil (Metin:, output:, Cevap:)
text = re.sub(r"^.*?(Metin:|output:|Cevap:)", "", text, flags=re.IGNORECASE | re.DOTALL)
# Tek satırlık açıklama veya yönerge kalanlarını sil
text = re.sub(r"^(Aşağıdaki haber.*|Yalnızca olay özeti.*|Cevapta sadece.*|Metin:|output:|Cevap:)", "", text, flags=re.IGNORECASE | re.MULTILINE)
# 'Detaylı bilgi için', 'Daha fazla bilgi için', 'Wikipedia', 'Kaynak:', linkler vs.
text = re.sub(r"(Detaylı bilgi için.*|Daha fazla bilgi için.*|Wikipedia.*|Kaynak:.*|https?://\S+)", "", text, flags=re.IGNORECASE)
# Madde işaretleri ve baştaki sayı/karakterler
text = re.sub(r"^\- ", "", text, flags=re.MULTILINE)
text = re.sub(r"^\d+[\.\)]?\s+", "", text, flags=re.MULTILINE)
## Model promptlarının başında kalan talimat cümlelerini sil
text = re.sub(
r"^(Sadece doğru, kısa ve açık bilgi ver\.? Ekstra açıklama veya kaynak ekleme\.?)",
"", text, flags=re.IGNORECASE
)
# Tekrarlı boşluklar ve baş/son boşluk
text = re.sub(r"\s+", " ", text).strip()
return text
@lru_cache(maxsize=config["cache"]["maxsize"])
def get_embedding(text: str, max_length: int = 512) -> np.ndarray:
formatted = f"query: {text.strip()}"[:max_length]
return embedder.encode(formatted, normalize_embeddings=True)
@lru_cache(maxsize=32)
def pinecone_query_cached(query: str, top_k: int) -> tuple:
query_embedding = get_embedding(query)
result = index.query(vector=query_embedding.tolist(), top_k=top_k, include_metadata=True)
matches = result.get("matches", [])
output = []
for m in matches:
text = m.get("metadata", {}).get("text", "").strip()
url = m.get("metadata", {}).get("url", "")
if text:
output.append((text, url))
return tuple(output)
async def retrieve_sources_from_pinecone(query: str, top_k: int = None) -> Dict[str, any]:
top_k = top_k or config["pinecone"]["top_k"]
output = pinecone_query_cached(query, top_k)
if not output:
return {"sources": "", "results": [], "source_url": ""}
# Cross-encoder ile yeniden sıralama
sentence_pairs = [[query, text] for text, url in output]
scores = await asyncio.to_thread(cross_encoder.predict, sentence_pairs)
reranked = [(float(score), text, url) for score, (text, url) in zip(scores, output)]
reranked.sort(key=lambda x: x[0], reverse=True)
top_results = reranked[:1]
top_texts = [text for _, text, _ in top_results]
source_url = top_results[0][2] if top_results else ""
return {"sources": "\n".join(top_texts), "results": top_results, "source_url": source_url}
async def generate_model_response(question: str) -> str:
prompt = (
f"input: {question}\noutput:"
"Sadece doğru, kısa ve açık bilgi ver. Ekstra açıklama veya kaynak ekleme."
)
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=256).to(device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=64,
do_sample=False,
num_beams=5,
no_repeat_ngram_size=3,
early_stopping=True,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id
)
answer = tokenizer.decode(outputs[0], skip_special_tokens=True)
return answer
def extract_self_answer(output: str) -> str:
# Eğer "output:" etiketi varsa, sonrasını al
match = re.search(r"output:(.*)", output, flags=re.IGNORECASE | re.DOTALL)
if match:
return match.group(1).strip()
# Eğer "Cevap:" varsa, sonrasını al
if "Cevap:" in output:
return output.split("Cevap:")[-1].strip()
return output.strip()
async def selfrag_agent(question: str):
# 1. VDB cevabı ve kaynak url
result = await retrieve_sources_from_pinecone(question)
vdb_paragraph = result.get("sources", "").strip()
source_url = result.get("source_url", "")
# 2. Model cevabı
model_paragraph = await generate_model_response(question)
model_paragraph = extract_self_answer(model_paragraph)
# 3. Temizle (SADECE METİN DEĞERLERİNDE!)
vdb_paragraph = clean_text_output(vdb_paragraph)
model_paragraph = clean_text_output(model_paragraph)
# 4. Cross-encoder ile skorlama
candidates = []
candidate_urls = []
label_names = []
if vdb_paragraph:
candidates.append(vdb_paragraph)
candidate_urls.append(source_url)
label_names.append("VDB")
if model_paragraph:
candidates.append(model_paragraph)
candidate_urls.append(None)
label_names.append("MODEL")
if not candidates:
return {"answer": "Cevap bulunamadı.", "source_url": None}
sentence_pairs = [[question, cand] for cand in candidates]
scores = await asyncio.to_thread(cross_encoder.predict, sentence_pairs)
print(f"VDB Skor: {scores[0]:.4f}")
if len(scores) > 1:
print(f"Model Skor: {scores[1]:.4f}")
# === Seçim Kuralları ===
if len(scores) == 2:
vdb_score = scores[0]
model_score = scores[1]
# Eğer modelin skoru, VDB'nin 2 katından fazlaysa modeli döndür
if model_score > 1.5 * vdb_score:
best_idx = 1
else:
best_idx = 0
else:
# Sadece VDB veya model varsa, en yüksek skoru seç
best_idx = int(np.argmax(scores))
final_answer = candidates[best_idx]
final_source_url = candidate_urls[best_idx]
return {
"answer": final_answer,
"source_url": final_source_url
}
@app.get("/")
async def home(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/api/ask")
async def ask_question(request: QuestionRequest):
try:
question = request.query.strip()
if not question:
return JSONResponse(status_code=400, content={"error": "Sorgu boş olamaz."})
result = await selfrag_agent(question)
response_text = result["answer"]
if result["source_url"]:
response_text += f'<br><br>Daha fazla bilgi için: <a href="{result["source_url"]}" target="_blank">{result["source_url"]}</a>'
return JSONResponse(content={"answer": response_text})
except Exception as e:
logger.error(f"API hatası: {e}")
return JSONResponse(status_code=500, content={"error": f"Sunucu hatası: {str(e)}"})