import time import requests import time import os import json import streamlit as st def model_predict(client, prompt): for message in client.chat_completion( messages=[{"role": "system", "content": "You are a chatbot evaluating github repositories, their python codes and corresponding readme files. Strictly answer the questions with Yes or No."}, {"role": "user", "content": prompt}], max_tokens=500, stream=True, ): return message.choices[0].delta.content return "" def get_api_link(url): username, repo_name = decompose_url(url) if (username == None): return "" return f"https://api.github.com/repos/{username}/{repo_name}/zipball/" def decompose_url(url): try: url = url.split("github.com")[1] url = url.strip(".") url = url.split(".git")[0] url = url.strip("/") parts = url.split("/") username = parts[0] repo_name = parts[1] return username, repo_name except: return None, None def fetch_repo_stars(verbose, repo_url, token): headers = {"Authorization": f"token {token}"} api_url = get_api_link(repo_url) api_url = api_url.replace("/zipball/", "") # Sending GET request to GitHub API response = requests.get(api_url, headers=headers) if response.status_code == 200: return json.loads(response.content)["stargazers_count"] if (response.status_code == 404): log(verbose, "ERROR", "Repository private.") def fetch_repo(verbose, repo_url, repo_name, token): if (os.path.exists(repo_name)): os.remove(repo_name) if ("github.com" not in repo_url): log(verbose, "ERROR", f"URL not for github repo, please evaluate manually ({repo_url}).") return headers = {"Authorization": f"token {token}"} api_url = get_api_link(repo_url) if (api_url == ""): log(verbose, "ERROR", f"Failed to parse the URL, please evaluate manually ({repo_url}).") return # Sending GET request to GitHub API response = requests.get(api_url, headers=headers) if response.status_code == 200: with open(repo_name, 'wb') as file: file.write(response.content) log(verbose, "LOG", "Repository downloaded successfully") if (response.status_code == 404): log(verbose, "ERROR", "Repository private.") def fetch_readme(zip): readme_files = [readme for readme in zip.namelist() if ((readme.endswith("README.MD") | readme.endswith("README.md") | readme.endswith("readme.md")) & (len(readme.split("/")) == 2))] readme = "" for readme_file in readme_files: readme += zip.open(readme_file).read().decode("utf-8") + "\n\n" return readme def fetch_license(zip): license_files = [license for license in zip.namelist() if (("LICENSE" in license) & (len(license.split("/")) == 2))] license = None if (len(license_files) > 0): license = zip.open(license_files[0]).read().decode("utf-8") return license def fetch_openalex(verbose, paper_name, year): api_url = f"https://api.openalex.org/works?filter=default.search:{paper_name},publication_year:{year}" response = requests.get(api_url) if response.status_code == 200: return response.json() else: log(verbose, "WARNING", "Could not find OpenAlex information for paper.") def log(verbose, log_type, log_text, hf=False): if (verbose == 0): return show_tips = (verbose == 2) | (verbose == 4) if ((verbose == 1) | (verbose == 2)): show = print if ((verbose == 3) | (verbose == 4)): show = st.write # Align line-break if (log_text.startswith("\n")): show("\n") log_text = log_text.lstrip('\n') # Only show tips in verbose mode 2 and 4 if ((log_type == "TITLE") & show_tips): show(f"\n#### {log_text}") if ((log_type == "TIP") & show_tips): show(f"*{log_text}*") if ((log_type == "LOG") & show_tips): show(f"{log_text}") if ((log_type == "ERROR")): show(f"**{log_text}**") if ((log_type != "TIP") & (log_type != "LOG") & (log_type != "ERROR") & (log_type != "TITLE")): raise ValueError("Invalid log type. Use 'TIP', 'LOG', 'TITLE' or 'ERROR'.") def init_llm(verbose): log(verbose, "LOG", "Initializing LLM...")