|
import os |
|
|
|
|
|
os.environ["HF_HOME"] = "/tmp/hf" |
|
os.environ["HF_DATASETS_CACHE"] = "/tmp/hf/datasets" |
|
os.environ["HF_METRICS_CACHE"] = "/tmp/hf/metrics" |
|
os.environ["TRANSFORMERS_CACHE"] = "/tmp/hf/transformers" |
|
os.environ["HF_HUB_CACHE"] = "/tmp/hf/hub" |
|
|
|
|
|
|
|
import asyncio |
|
import logging |
|
import re |
|
import yaml |
|
import torch |
|
import numpy as np |
|
from functools import lru_cache |
|
from fastapi import FastAPI, Request |
|
from fastapi.responses import JSONResponse |
|
from fastapi.staticfiles import StaticFiles |
|
from fastapi.templating import Jinja2Templates |
|
from pydantic import BaseModel |
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
from sentence_transformers import SentenceTransformer, CrossEncoder |
|
from pinecone import Pinecone |
|
from pathlib import Path |
|
from dotenv import load_dotenv |
|
from typing import Dict |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
CONFIG_PATH = Path(__file__).resolve().parent / "config.yaml" |
|
def load_config() -> Dict: |
|
try: |
|
with open(CONFIG_PATH, 'r', encoding='utf-8') as f: |
|
return yaml.safe_load(f) |
|
except Exception as e: |
|
logger.error(f"Konfigürasyon dosyası yüklenemedi: {e}") |
|
return { |
|
"pinecone": {"top_k": 10, "rerank_top": 5, "batch_size": 32}, |
|
"model": {"max_new_tokens": 50, "temperature": 0.7}, |
|
"cache": {"maxsize": 100} |
|
} |
|
config = load_config() |
|
|
|
|
|
env_path = Path(__file__).resolve().parent.parent / "RAG" / ".env" |
|
load_dotenv(dotenv_path=env_path) |
|
|
|
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") |
|
PINECONE_ENV = os.getenv("PINECONE_ENVIRONMENT") |
|
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME") |
|
if not all([PINECONE_API_KEY, PINECONE_ENV, PINECONE_INDEX_NAME]): |
|
raise ValueError("Pinecone ortam değişkenleri eksik!") |
|
|
|
|
|
pinecone_client = Pinecone(api_key=PINECONE_API_KEY, environment=PINECONE_ENV) |
|
try: |
|
index = pinecone_client.Index(PINECONE_INDEX_NAME) |
|
index_stats = index.describe_index_stats() |
|
logger.info(f"Pinecone index stats: {index_stats}") |
|
except Exception as e: |
|
logger.error(f"Pinecone bağlantı hatası: {e}") |
|
raise |
|
|
|
|
|
MODEL_PATH = "iamseyhmus7/GenerationTurkishGPT2_final" |
|
try: |
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH) |
|
model = AutoModelForCausalLM.from_pretrained(MODEL_PATH) |
|
tokenizer.pad_token = tokenizer.eos_token |
|
model.config.pad_token_id = tokenizer.pad_token_id |
|
model.eval() |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
model.to(device) |
|
logger.info(f"Model {MODEL_PATH} Hugging Face Hub'dan yüklendi, cihaz: {device}") |
|
except Exception as e: |
|
logger.error(f"Model yükleme hatası: {e}") |
|
raise |
|
|
|
|
|
embedder = SentenceTransformer("intfloat/multilingual-e5-large", device="cpu") |
|
cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", device="cpu") |
|
logger.info("Embedding ve reranking modelleri yüklendi") |
|
|
|
|
|
app = FastAPI() |
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
app.mount("/static", StaticFiles(directory=os.path.join(BASE_DIR, "static")), name="static") |
|
templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates")) |
|
|
|
class QuestionRequest(BaseModel): |
|
query: str |
|
|
|
def clean_text_output(text: str) -> str: |
|
""" |
|
Tüm prompt, komut, yönerge, link ve gereksiz açıklamaları temizler. |
|
Sadece net, kısa yanıtı bırakır. |
|
""" |
|
|
|
text = re.sub( |
|
r"^(Sadece doğru, kısa ve açık bilgi ver\.? Ekstra açıklama veya kaynak ekleme\.?)", |
|
"", text, flags=re.IGNORECASE |
|
) |
|
|
|
text = re.sub(r"^.*?(Metin:|output:|Cevap:)", "", text, flags=re.IGNORECASE | re.DOTALL) |
|
|
|
text = re.sub(r"^(Aşağıdaki haber.*|Yalnızca olay özeti.*|Cevapta sadece.*|Metin:|output:|Cevap:)", "", text, flags=re.IGNORECASE | re.MULTILINE) |
|
|
|
text = re.sub(r"(Detaylı bilgi için.*|Daha fazla bilgi için.*|Wikipedia.*|Kaynak:.*|https?://\S+)", "", text, flags=re.IGNORECASE) |
|
|
|
text = re.sub(r"^\- ", "", text, flags=re.MULTILINE) |
|
text = re.sub(r"^\d+[\.\)]?\s+", "", text, flags=re.MULTILINE) |
|
|
|
text = re.sub( |
|
r"^(Sadece doğru, kısa ve açık bilgi ver\.? Ekstra açıklama veya kaynak ekleme\.?)", |
|
"", text, flags=re.IGNORECASE |
|
) |
|
|
|
text = re.sub(r"\s+", " ", text).strip() |
|
return text |
|
|
|
@lru_cache(maxsize=config["cache"]["maxsize"]) |
|
def get_embedding(text: str, max_length: int = 512) -> np.ndarray: |
|
formatted = f"query: {text.strip()}"[:max_length] |
|
return embedder.encode(formatted, normalize_embeddings=True) |
|
|
|
@lru_cache(maxsize=32) |
|
def pinecone_query_cached(query: str, top_k: int) -> tuple: |
|
query_embedding = get_embedding(query) |
|
result = index.query(vector=query_embedding.tolist(), top_k=top_k, include_metadata=True) |
|
matches = result.get("matches", []) |
|
output = [] |
|
for m in matches: |
|
text = m.get("metadata", {}).get("text", "").strip() |
|
url = m.get("metadata", {}).get("url", "") |
|
if text: |
|
output.append((text, url)) |
|
return tuple(output) |
|
|
|
async def retrieve_sources_from_pinecone(query: str, top_k: int = None) -> Dict[str, any]: |
|
top_k = top_k or config["pinecone"]["top_k"] |
|
output = pinecone_query_cached(query, top_k) |
|
if not output: |
|
return {"sources": "", "results": [], "source_url": ""} |
|
|
|
sentence_pairs = [[query, text] for text, url in output] |
|
scores = await asyncio.to_thread(cross_encoder.predict, sentence_pairs) |
|
reranked = [(float(score), text, url) for score, (text, url) in zip(scores, output)] |
|
reranked.sort(key=lambda x: x[0], reverse=True) |
|
top_results = reranked[:1] |
|
top_texts = [text for _, text, _ in top_results] |
|
source_url = top_results[0][2] if top_results else "" |
|
return {"sources": "\n".join(top_texts), "results": top_results, "source_url": source_url} |
|
|
|
async def generate_model_response(question: str) -> str: |
|
prompt = ( |
|
f"input: {question}\noutput:" |
|
"Sadece doğru, kısa ve açık bilgi ver. Ekstra açıklama veya kaynak ekleme." |
|
) |
|
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=256).to(device) |
|
with torch.no_grad(): |
|
outputs = model.generate( |
|
**inputs, |
|
max_new_tokens=64, |
|
do_sample=False, |
|
num_beams=5, |
|
no_repeat_ngram_size=3, |
|
early_stopping=True, |
|
pad_token_id=tokenizer.pad_token_id, |
|
eos_token_id=tokenizer.eos_token_id |
|
) |
|
answer = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
return answer |
|
|
|
def extract_self_answer(output: str) -> str: |
|
|
|
match = re.search(r"output:(.*)", output, flags=re.IGNORECASE | re.DOTALL) |
|
if match: |
|
return match.group(1).strip() |
|
|
|
if "Cevap:" in output: |
|
return output.split("Cevap:")[-1].strip() |
|
return output.strip() |
|
|
|
async def selfrag_agent(question: str): |
|
|
|
result = await retrieve_sources_from_pinecone(question) |
|
vdb_paragraph = result.get("sources", "").strip() |
|
source_url = result.get("source_url", "") |
|
|
|
|
|
model_paragraph = await generate_model_response(question) |
|
model_paragraph = extract_self_answer(model_paragraph) |
|
|
|
|
|
vdb_paragraph = clean_text_output(vdb_paragraph) |
|
model_paragraph = clean_text_output(model_paragraph) |
|
|
|
|
|
candidates = [] |
|
candidate_urls = [] |
|
label_names = [] |
|
if vdb_paragraph: |
|
candidates.append(vdb_paragraph) |
|
candidate_urls.append(source_url) |
|
label_names.append("VDB") |
|
if model_paragraph: |
|
candidates.append(model_paragraph) |
|
candidate_urls.append(None) |
|
label_names.append("MODEL") |
|
|
|
if not candidates: |
|
return {"answer": "Cevap bulunamadı.", "source_url": None} |
|
|
|
sentence_pairs = [[question, cand] for cand in candidates] |
|
scores = await asyncio.to_thread(cross_encoder.predict, sentence_pairs) |
|
print(f"VDB Skor: {scores[0]:.4f}") |
|
if len(scores) > 1: |
|
print(f"Model Skor: {scores[1]:.4f}") |
|
|
|
|
|
if len(scores) == 2: |
|
vdb_score = scores[0] |
|
model_score = scores[1] |
|
|
|
if model_score > 1.5 * vdb_score: |
|
best_idx = 1 |
|
else: |
|
best_idx = 0 |
|
else: |
|
|
|
best_idx = int(np.argmax(scores)) |
|
|
|
final_answer = candidates[best_idx] |
|
final_source_url = candidate_urls[best_idx] |
|
|
|
return { |
|
"answer": final_answer, |
|
"source_url": final_source_url |
|
} |
|
|
|
|
|
@app.get("/") |
|
async def home(request: Request): |
|
return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
@app.post("/api/ask") |
|
async def ask_question(request: QuestionRequest): |
|
try: |
|
question = request.query.strip() |
|
if not question: |
|
return JSONResponse(status_code=400, content={"error": "Sorgu boş olamaz."}) |
|
result = await selfrag_agent(question) |
|
response_text = result["answer"] |
|
if result["source_url"]: |
|
response_text += f'<br><br>Daha fazla bilgi için: <a href="{result["source_url"]}" target="_blank">{result["source_url"]}</a>' |
|
return JSONResponse(content={"answer": response_text}) |
|
except Exception as e: |
|
logger.error(f"API hatası: {e}") |
|
return JSONResponse(status_code=500, content={"error": f"Sunucu hatası: {str(e)}"}) |
|
|