VulnBuster / detect_secrets_mcp.py
zjkarina's picture
Fix dependency conflicts between smolagents and semgrep
4f21d95
#!/usr/bin/env python3
"""
MCP server for detect-secrets - a tool for detecting secrets in code
"""
import gradio as gr
import subprocess
import json
import os
import tempfile
from typing import Dict, List, Optional
from pathlib import Path
def detect_secrets_scan(
code_input: str,
scan_type: str = "code",
base64_limit: float = 3.0,
hex_limit: float = 2.0,
exclude_lines: str = "",
exclude_files: str = "",
exclude_secrets: str = "",
word_list: str = "",
output_format: str = "json"
) -> Dict:
"""
Scans code for secrets using detect-secrets.
Args:
code_input (str): Code to scan or path to file/directory
scan_type (str): Scan type - 'code' for direct code or 'path' for file/directory
base64_limit (float): Entropy limit for base64 strings (0.0-8.0)
hex_limit (float): Entropy limit for hex strings (0.0-8.0)
exclude_lines (str): Regex pattern for lines to exclude
exclude_files (str): Regex pattern for files to exclude
exclude_secrets (str): Regex pattern for secrets to exclude
word_list (str): Path to word list file
output_format (str): Output format - 'json' or 'txt'
Returns:
Dict: Scan results
"""
try:
print(f"Debug: Input code length: {len(code_input)}")
print(f"Debug: First 100 chars: {code_input[:100]}")
# Build detect-secrets command
cmd = ["detect-secrets", "scan"]
# Add entropy limits
cmd.extend(["--base64-limit", str(base64_limit)])
cmd.extend(["--hex-limit", str(hex_limit)])
# Add exclude patterns
if exclude_lines:
cmd.extend(["--exclude-lines", exclude_lines])
if exclude_files:
cmd.extend(["--exclude-files", exclude_files])
if exclude_secrets:
cmd.extend(["--exclude-secrets", exclude_secrets])
if word_list:
cmd.extend(["--word-list", word_list])
# Добавляем параметры для улучшения обнаружения
cmd.extend(["--force-use-all-plugins"]) # Принудительно используем все плагины
cmd.extend(["--no-verify"]) # Отключаем верификацию
cmd.extend(["--disable-filter", "detect_secrets.filters.gibberish.should_exclude_secret"]) # Отключаем фильтр бессмысленного текста
cmd.extend(["--disable-filter", "detect_secrets.filters.heuristic.is_likely_id_string"]) # Отключаем фильтр ID строк
cmd.extend(["--disable-filter", "detect_secrets.filters.heuristic.is_sequential_string"]) # Отключаем фильтр последовательных строк
# Execute command with pipe
if scan_type == "code":
# Нормализуем строки
lines = code_input.replace('\r\n', '\n').replace('\r', '\n').split('\n')
print(f"Debug: Number of lines: {len(lines)}")
all_results = {}
all_plugins = set()
for idx, line in enumerate(lines):
if not line.strip():
continue # пропускаем пустые строки
print(f"Debug: Scanning line {idx + 1}: {line}")
cmd_line = cmd.copy() # копия базовой команды
cmd_line.append("--string")
cmd_line.append(line)
print(f"Debug: Command: {' '.join(cmd_line)}")
result = subprocess.run(cmd_line, capture_output=True, text=True)
stdout, stderr = result.stdout, result.stderr
print(f"Debug: Line {idx + 1} stdout: {stdout}")
# Парсим текстовый вывод
for output_line in stdout.split('\n'):
if ':' in output_line:
plugin, result = output_line.split(':', 1)
plugin = plugin.strip()
result = result.strip()
# Добавляем плагин в список использованных
all_plugins.add(plugin)
# Если плагин нашел секрет
if result.lower() == 'true':
# Добавляем результат
if plugin not in all_results:
all_results[plugin] = []
# Создаем запись о найденном секрете
secret_info = {
"type": plugin,
"line_number": idx + 1,
"line": line,
"hashed_secret": f"hash_{line}",
"is_secret": True,
"is_verified": False
}
# Добавляем энтропию, если она указана
if '(' in result:
entropy = result.split('(')[1].split(')')[0]
try:
secret_info["entropy"] = float(entropy)
except ValueError:
pass
all_results[plugin].append(secret_info)
# Собираем финальный результат
final_output = {
"version": "1.5.0",
"plugins_used": [{"name": plugin} for plugin in sorted(all_plugins)],
"filters_used": [],
"results": all_results,
"generated_at": ""
}
print(f"Debug: Final results: {json.dumps(final_output, indent=2)}")
return {
"success": True,
"results": final_output,
"stderr": "",
"return_code": 0
}
else:
# Для сканирования файла/директории используем обычный способ
if not os.path.exists(code_input):
return {
"error": f"Path not found: {code_input}",
"success": False
}
cmd.append(code_input)
result = subprocess.run(cmd, capture_output=True, text=True)
stdout, stderr = result.stdout, result.stderr
return_code = result.returncode
# Process result
if output_format == "json":
try:
output_data = json.loads(stdout) if stdout else {}
return {
"success": True,
"results": output_data,
"stderr": stderr,
"return_code": return_code
}
except json.JSONDecodeError as e:
print(f"Debug: JSON parse error: {e}")
print(f"Debug: Raw stdout: {stdout}")
return {
"success": False,
"error": "JSON parsing error",
"stdout": stdout,
"stderr": stderr,
"return_code": return_code
}
else:
return {
"success": True,
"output": stdout,
"stderr": stderr,
"return_code": return_code
}
except Exception as e:
print(f"Debug: Exception: {str(e)}")
return {
"success": False,
"error": f"Error executing detect-secrets: {str(e)}"
}
def detect_secrets_baseline(
target_path: str,
baseline_file: str,
base64_limit: float = 4.5,
hex_limit: float = 3.0
) -> Dict:
"""
Creates or updates a baseline file for detect-secrets.
Args:
target_path (str): Path to code for analysis
baseline_file (str): Path to baseline file
base64_limit (float): Entropy limit for base64 strings
hex_limit (float): Entropy limit for hex strings
Returns:
Dict: Result of baseline creation/update
"""
try:
if not os.path.exists(target_path):
return {
"error": f"Path not found: {target_path}",
"success": False
}
# Build command
cmd = ["detect-secrets", "scan"]
# Add entropy limits
cmd.extend(["--base64-limit", str(base64_limit)])
cmd.extend(["--hex-limit", str(hex_limit)])
# Add baseline file if exists
if os.path.exists(baseline_file):
cmd.extend(["--baseline", baseline_file])
# Add scan target
cmd.append(target_path)
# Execute command
result = subprocess.run(cmd, capture_output=True, text=True)
# Save output to baseline file
with open(baseline_file, 'w') as f:
f.write(result.stdout)
return {
"success": True,
"action": "created" if not os.path.exists(baseline_file) else "updated",
"message": f"Baseline file {'created' if not os.path.exists(baseline_file) else 'updated'}: {baseline_file}",
"return_code": result.returncode,
"stderr": result.stderr
}
except Exception as e:
return {
"success": False,
"error": f"Error working with baseline: {str(e)}"
}
def detect_secrets_audit(
baseline_file: str,
show_stats: bool = False,
show_report: bool = False,
only_real: bool = False,
only_false: bool = False
) -> Dict:
"""
Audits a detect-secrets baseline file.
Args:
baseline_file (str): Path to baseline file
show_stats (bool): Show statistics
show_report (bool): Show report
only_real (bool): Only show real secrets
only_false (bool): Only show false positives
Returns:
Dict: Audit results
"""
try:
if not os.path.exists(baseline_file):
return {
"error": f"Baseline file not found: {baseline_file}",
"success": False
}
# Build command
cmd = ["detect-secrets", "audit"]
if show_stats:
cmd.append("--stats")
if show_report:
cmd.append("--report")
if only_real:
cmd.append("--only-real")
if only_false:
cmd.append("--only-false")
cmd.append(baseline_file)
# Execute command
result = subprocess.run(cmd, capture_output=True, text=True)
return {
"success": True,
"output": result.stdout,
"stderr": result.stderr,
"return_code": result.returncode
}
except Exception as e:
return {
"success": False,
"error": f"Error auditing baseline: {str(e)}"
}
# Create Gradio interface
with gr.Blocks(title="Detect Secrets MCP") as demo:
gr.Markdown("# 🔍 Detect Secrets Scanner")
gr.Markdown("Secret detection tool with MCP support")
with gr.Tab("Basic Scanning"):
with gr.Row():
with gr.Column():
scan_type = gr.Radio(
choices=["code", "path"],
value="code",
label="Scan Type"
)
code_input = gr.Textbox(
lines=10,
placeholder="Enter code or path to scan...",
label="Code or Path"
)
base64_limit = gr.Slider(
minimum=0.0,
maximum=8.0,
value=4.5,
step=0.1,
label="Base64 Entropy Limit"
)
hex_limit = gr.Slider(
minimum=0.0,
maximum=8.0,
value=3.0,
step=0.1,
label="Hex Entropy Limit"
)
exclude_lines = gr.Textbox(
label="Exclude Lines Pattern (regex)"
)
exclude_files = gr.Textbox(
label="Exclude Files Pattern (regex)"
)
exclude_secrets = gr.Textbox(
label="Exclude Secrets Pattern (regex)"
)
word_list = gr.Textbox(
label="Word List File Path"
)
output_format = gr.Dropdown(
choices=["json", "txt"],
value="json",
label="Output Format"
)
scan_btn = gr.Button("🔍 Scan", variant="primary")
with gr.Column():
scan_output = gr.JSON(label="Scan Results")
scan_btn.click(
fn=detect_secrets_scan,
inputs=[
code_input, scan_type, base64_limit, hex_limit,
exclude_lines, exclude_files, exclude_secrets,
word_list, output_format
],
outputs=scan_output
)
with gr.Tab("Baseline Management"):
with gr.Row():
with gr.Column():
baseline_path = gr.Textbox(
label="Project Path",
placeholder="/path/to/your/project"
)
baseline_file = gr.Textbox(
label="Baseline File Path",
placeholder="/path/to/.secrets.baseline"
)
baseline_base64_limit = gr.Slider(
minimum=0.0,
maximum=8.0,
value=4.5,
step=0.1,
label="Base64 Entropy Limit"
)
baseline_hex_limit = gr.Slider(
minimum=0.0,
maximum=8.0,
value=3.0,
step=0.1,
label="Hex Entropy Limit"
)
baseline_btn = gr.Button("📋 Create/Update Baseline", variant="secondary")
with gr.Column():
baseline_output = gr.JSON(label="Baseline Results")
baseline_btn.click(
fn=detect_secrets_baseline,
inputs=[
baseline_path, baseline_file,
baseline_base64_limit, baseline_hex_limit
],
outputs=baseline_output
)
with gr.Tab("Baseline Audit"):
with gr.Row():
with gr.Column():
audit_baseline = gr.Textbox(
label="Baseline File Path",
placeholder="/path/to/.secrets.baseline"
)
show_stats = gr.Checkbox(
label="Show Statistics",
value=False
)
show_report = gr.Checkbox(
label="Show Report",
value=False
)
only_real = gr.Checkbox(
label="Only Real Secrets",
value=False
)
only_false = gr.Checkbox(
label="Only False Positives",
value=False
)
audit_btn = gr.Button("🔍 Audit Baseline", variant="secondary")
with gr.Column():
audit_output = gr.JSON(label="Audit Results")
audit_btn.click(
fn=detect_secrets_audit,
inputs=[
audit_baseline, show_stats,
show_report, only_real, only_false
],
outputs=audit_output
)
with gr.Tab("Examples"):
gr.Markdown("""
## 🚨 Examples of secrets that can be detected:
### 1. API Keys
```python
API_KEY = "sk_live_51H1h2K3L4M5N6O7P8Q9R0S1T2U3V4W5X6Y7Z8"
```
### 2. Passwords
```python
password = "SuperSecret123!" # High entropy string
```
### 3. Private Keys
```python
private_key = "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA..."
```
### 4. OAuth Tokens
```python
oauth_token = "ya29.a0AfB_byC..."
```
""")
if __name__ == "__main__":
# Получаем настройки сервера из переменных окружения
server_name = os.getenv("GRADIO_SERVER_NAME", "0.0.0.0")
server_port = int(os.getenv("GRADIO_SERVER_PORT", "7862"))
demo.launch(
mcp_server=True,
server_name=server_name,
server_port=server_port,
share=False
)