第一个版本

This commit is contained in:
2026-01-11 04:17:53 +08:00
commit c160320892
11 changed files with 2383 additions and 0 deletions

27
.gitignore vendored Normal file
View File

@@ -0,0 +1,27 @@
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
*.db
*.sqlite
*.sqlite3
exports/
*.log
.mitmproxy/

220
README.md Normal file
View File

@@ -0,0 +1,220 @@
# LLM Proxy - OpenAI API 代理和训练数据收集工具
一个透明的 HTTP 代理服务器,用于拦截和保存 LLM API 请求,自动导出为 JSONL 格式的训练数据。
## 功能特性
-**透明代理**:拦截所有 `/v1/` 开头的 LLM API 请求
-**零配置**:无需在代理中配置 API Key直接使用客户端的 Key
-**多提供商支持**:支持 OpenAI、Anthropic、GLM、OpenRouter 等所有 OpenAI 兼容的 API
-**智能解析**:自动识别和解析 LLM 请求,忽略其他请求
-**思考过程保存**自动保存模型的推理内容reasoning
-**多轮对话支持**:完整保存对话上下文
-**JSONL 导出**:一键导出为标准训练数据格式
-**SQLite 存储**:轻量级数据库,无需额外配置
## 安装
### 1. 克隆项目
```bash
git clone https://github.com/mitmproxy/mitmproxy.git
cd mitmproxy
```
### 2. 安装依赖
```bash
pip install -r requirements.txt
```
## 使用方法
### 启动代理服务器
```bash
python start_proxy.py
```
默认监听 `127.0.0.1:8080`
### 配置系统代理
#### Windows
1. 打开"设置" → "网络和 Internet" → "代理"
2. 开启"使用代理服务器"
3. 地址:`127.0.0.1`
4. 端口:`8080`
#### macOS
```bash
networksetup -setwebproxy Wi-Fi 127.0.0.1 8080
networksetup -setsecurewebproxy Wi-Fi 127.0.0.1 8080
```
#### Linux
在浏览器或系统设置中配置 HTTP/HTTPS 代理为 `127.0.0.1:8080`
### 使用客户端
#### Trae
1. 启动代理服务器
2. 配置系统代理(见上)
3. 在 Trae 中正常使用,配置任何 API 提供商和 Key
4. 所有请求自动被拦截和保存
#### CherryStudio
**方法 1配置自定义提供商**
1. 打开 CherryStudio 设置 → 模型服务
2. 添加自定义提供商
3. API 地址:`http://127.0.0.1:8080/v1`
4. API Key任意值代理会忽略
5. 添加你使用的模型
**方法 2使用系统代理**
1. 启动代理服务器
2. 配置系统代理(见上)
3. 在 CherryStudio 中正常使用
### 导出训练数据
```bash
# 导出所有对话(包含思考过程)
python export.py
# 导出指定文件
python export.py --output my_data.jsonl
# 不包含思考过程
python export.py --no-reasoning
# 包含元数据
python export.py --with-metadata
# 查看数据库统计
python export.py --stats
```
## 配置文件
编辑 `config.json` 来自定义配置:
```json
{
"proxy": {
"listen_port": 8080,
"listen_host": "127.0.0.1"
},
"database": {
"path": "llm_data.db"
},
"filter": {
"enabled": true,
"path_patterns": ["/v1/"],
"save_all_requests": false
},
"export": {
"output_dir": "exports",
"include_reasoning": true,
"include_metadata": false
}
}
```
## JSONL 格式
导出的 JSONL 文件格式:
```jsonl
{"messages": [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Hello!"}, {"role": "assistant", "content": "Hi there!", "reasoning": "The user greeted me, so I should respond politely."}]}
{"messages": [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What is 2+2?"}, {"role": "assistant", "content": "2+2 equals 4.", "reasoning": "This is a simple arithmetic problem. 2+2 = 4."}]}
```
## 数据库结构
### conversations 表
- `id`: 主键
- `conversation_id`: 对话 ID
- `created_at`: 创建时间
- `updated_at`: 更新时间
### requests 表
- `id`: 主键
- `request_id`: 请求 ID
- `conversation_id`: 对话 ID外键
- `model`: 模型名称
- `messages`: 消息列表JSON
- `request_body`: 完整请求体JSON
- `created_at`: 创建时间
### responses 表
- `id`: 主键
- `request_id`: 请求 ID外键
- `response_body`: 完整响应体JSON
- `reasoning_content`: 思考过程
- `tokens_used`: 使用的 token 数量
- `created_at`: 创建时间
## 工作原理
1. **拦截请求**:代理拦截所有 `/v1/` 开头的 HTTP 请求
2. **智能解析**:尝试解析请求体,识别是否为 LLM API 请求
3. **保存请求**:将请求信息保存到 SQLite 数据库
4. **透明转发**:保持原始 Authorization header转发到目标服务器
5. **保存响应**:接收响应后,保存响应内容和思考过程
6. **导出数据**:随时导出为 JSONL 格式用于训练
## 注意事项
### HTTPS 证书
如果客户端使用 HTTPS 连接到 API`https://api.openai.com`),需要:
1. 安装 mitmproxy 证书到系统信任库
2. 或者在客户端配置中使用 HTTP`http://api.openai.com`
### 证书安装
首次运行代理时mitmproxy 会生成证书:
- Windows: `%USERPROFILE%\.mitmproxy\mitmproxy-ca-cert.pem`
- macOS/Linux: `~/.mitmproxy/mitmproxy-ca-cert.pem`
将证书安装到系统信任库即可。
### 隐私和安全
- 代理不会保存 API Key
- 所有数据存储在本地 SQLite 数据库
- 请妥善保管导出的训练数据
## 故障排除
### 请求没有被拦截
1. 检查系统代理是否正确配置
2. 检查代理服务器是否正在运行
3. 检查请求路径是否包含 `/v1/`
### HTTPS 请求失败
1. 安装 mitmproxy 证书到系统信任库
2. 或者在客户端配置中使用 HTTP 而不是 HTTPS
### 数据库错误
1. 检查数据库文件权限
2. 删除 `llm_data.db` 重新初始化
## 许可证
MIT License
## 贡献
欢迎提交 Issue 和 Pull Request

20
config.json Normal file
View File

@@ -0,0 +1,20 @@
{
"proxy": {
"listen_port": 8080,
"listen_host": "127.0.0.1"
},
"database": {
"path": "llm_data.db"
},
"filter": {
"enabled": true,
"path_patterns": ["/v1/", "/chat/completions", "/completions"],
"host_patterns": ["deepseek.com", "openrouter.ai", "api.openai.com"],
"save_all_requests": false
},
"export": {
"output_dir": "exports",
"include_reasoning": true,
"include_metadata": false
}
}

243
database.py Normal file
View File

@@ -0,0 +1,243 @@
import sqlite3
import json
import uuid
from datetime import datetime
from typing import Optional, Dict, Any, List
from pathlib import Path
class LLMDatabase:
def __init__(self, db_path: str = "llm_data.db"):
self.db_path = db_path
self.init_database()
def get_connection(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def init_database(self):
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id TEXT UNIQUE NOT NULL,
conversation_id TEXT,
model TEXT,
messages TEXT,
request_body TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (conversation_id) REFERENCES conversations(conversation_id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS responses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id TEXT NOT NULL,
response_body TEXT,
reasoning_content TEXT,
tokens_used INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (request_id) REFERENCES requests(request_id)
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_conversation_id ON requests(conversation_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_request_id ON responses(request_id)
""")
conn.commit()
conn.close()
def get_or_create_conversation(self, conversation_id: Optional[str] = None) -> str:
if conversation_id is None:
conversation_id = str(uuid.uuid4())
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT OR IGNORE INTO conversations (conversation_id)
VALUES (?)
""", (conversation_id,))
cursor.execute("""
UPDATE conversations SET updated_at = CURRENT_TIMESTAMP
WHERE conversation_id = ?
""", (conversation_id,))
conn.commit()
conn.close()
return conversation_id
def save_request(self, request_id: str, model: str, messages: List[Dict[str, Any]],
request_body: Dict[str, Any], conversation_id: Optional[str] = None) -> None:
conversation_id = self.get_or_create_conversation(conversation_id)
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO requests
(request_id, conversation_id, model, messages, request_body)
VALUES (?, ?, ?, ?, ?)
""", (
request_id,
conversation_id,
model,
json.dumps(messages, ensure_ascii=False),
json.dumps(request_body, ensure_ascii=False)
))
conn.commit()
conn.close()
def save_response(self, request_id: str, response_body: Dict[str, Any],
reasoning_content: Optional[str] = None, tokens_used: Optional[int] = None) -> None:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO responses
(request_id, response_body, reasoning_content, tokens_used)
VALUES (?, ?, ?, ?)
""", (
request_id,
json.dumps(response_body, ensure_ascii=False),
reasoning_content,
tokens_used
))
conn.commit()
conn.close()
def get_conversation_messages(self, conversation_id: str) -> List[Dict[str, Any]]:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT r.messages, resp.response_body, resp.reasoning_content
FROM requests r
LEFT JOIN responses resp ON r.request_id = resp.request_id
WHERE r.conversation_id = ?
ORDER BY r.created_at
""", (conversation_id,))
rows = cursor.fetchall()
conn.close()
messages = []
for row in rows:
request_messages = json.loads(row['messages'])
response_body = json.loads(row['response_body']) if row['response_body'] else None
reasoning_content = row['reasoning_content']
if not messages:
for msg in request_messages:
messages.append(msg)
else:
max_prefix = min(len(messages), len(request_messages))
prefix_len = 0
while prefix_len < max_prefix and messages[prefix_len] == request_messages[prefix_len]:
prefix_len += 1
for msg in request_messages[prefix_len:]:
messages.append(msg)
if response_body and 'choices' in response_body:
for choice in response_body['choices']:
assistant_msg = {
'role': 'assistant',
'content': choice.get('message', {}).get('content', '')
}
if reasoning_content:
assistant_msg['reasoning'] = reasoning_content
messages.append(assistant_msg)
return messages
def get_all_conversations(self) -> List[Dict[str, Any]]:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT conversation_id, created_at, updated_at
FROM conversations
ORDER BY updated_at DESC
""")
rows = cursor.fetchall()
conn.close()
return [
{
'conversation_id': row['conversation_id'],
'created_at': row['created_at'],
'updated_at': row['updated_at']
}
for row in rows
]
def export_to_jsonl(self, output_path: str, include_reasoning: bool = True) -> int:
conversations = self.get_all_conversations()
count = 0
with open(output_path, 'w', encoding='utf-8') as f:
for conv in conversations:
messages = self.get_conversation_messages(conv['conversation_id'])
if not messages:
continue
if not include_reasoning:
messages = [
{k: v for k, v in msg.items() if k != 'reasoning'}
for msg in messages
]
jsonl_line = json.dumps({'messages': messages}, ensure_ascii=False)
f.write(jsonl_line + '\n')
count += 1
return count
def get_stats(self) -> Dict[str, Any]:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM conversations")
conversation_count = cursor.fetchone()['count']
cursor.execute("SELECT COUNT(*) as count FROM requests")
request_count = cursor.fetchone()['count']
cursor.execute("SELECT COUNT(*) as count FROM responses")
response_count = cursor.fetchone()['count']
cursor.execute("SELECT SUM(tokens_used) as total FROM responses")
total_tokens = cursor.fetchone()['total'] or 0
conn.close()
return {
'conversations': conversation_count,
'requests': request_count,
'responses': response_count,
'total_tokens': total_tokens
}

5
export.bat Normal file
View File

@@ -0,0 +1,5 @@
@echo off
echo Exporting LLM training data...
echo.
python export.py %*
pause

93
export.py Normal file
View File

@@ -0,0 +1,93 @@
import json
import argparse
from pathlib import Path
from datetime import datetime
from database import LLMDatabase
from proxy_addon import load_config
def export_training_data(output_path: str, db_path: str = "llm_data.db",
include_reasoning: bool = True) -> int:
db = LLMDatabase(db_path)
count = db.export_to_jsonl(output_path, include_reasoning)
return count
def export_with_metadata(output_path: str, db_path: str = "llm_data.db") -> int:
db = LLMDatabase(db_path)
conversations = db.get_all_conversations()
count = 0
with open(output_path, 'w', encoding='utf-8') as f:
for conv in conversations:
messages = db.get_conversation_messages(conv['conversation_id'])
if not messages:
continue
data = {
'messages': messages,
'metadata': {
'conversation_id': conv['conversation_id'],
'created_at': conv['created_at'],
'updated_at': conv['updated_at']
}
}
jsonl_line = json.dumps(data, ensure_ascii=False)
f.write(jsonl_line + '\n')
count += 1
return count
def main():
config = load_config()
export_config = config.get('export', {})
db_config = config.get('database', {})
parser = argparse.ArgumentParser(description='Export LLM training data to JSONL format')
parser.add_argument('--output', '-o', type=str,
default=f"exports/training_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jsonl",
help='Output file path')
parser.add_argument('--db', type=str, default=db_config.get('path', 'llm_data.db'),
help='Database file path')
parser.add_argument('--no-reasoning', action='store_true',
help='Exclude reasoning content from export')
parser.add_argument('--with-metadata', action='store_true',
help='Include metadata in export')
parser.add_argument('--stats', action='store_true',
help='Show database statistics')
args = parser.parse_args()
if args.stats:
db = LLMDatabase(args.db)
stats = db.get_stats()
print("\nDatabase Statistics:")
print(f" Conversations: {stats['conversations']}")
print(f" Requests: {stats['requests']}")
print(f" Responses: {stats['responses']}")
print(f" Total Tokens: {stats['total_tokens']}")
return
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
include_reasoning = not args.no_reasoning
if args.with_metadata:
count = export_with_metadata(str(output_path), args.db)
print(f"\nExported {count} conversations with metadata to: {output_path}")
else:
count = export_training_data(str(output_path), args.db, include_reasoning)
print(f"\nExported {count} conversations to: {output_path}")
if include_reasoning:
print(" (Reasoning content included)")
else:
print(" (Reasoning content excluded)")
if __name__ == '__main__':
main()

1370
log.txt Normal file

File diff suppressed because it is too large Load Diff

288
proxy_addon.py Normal file
View File

@@ -0,0 +1,288 @@
import json
import uuid
import logging
from typing import Optional, Dict, Any, List
from mitmproxy import http
from database import LLMDatabase
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LLMProxyAddon:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.db = LLMDatabase(config['database']['path'])
self.path_patterns = config['filter'].get('path_patterns', ['/v1/'])
self.host_patterns = config['filter'].get('host_patterns', [])
self.save_all = config['filter'].get('save_all_requests', False)
logger.info("LLMProxyAddon initialized")
def is_llm_request(self, flow: http.HTTPFlow) -> bool:
path = flow.request.path
host = flow.request.host
if host.startswith("clerk.openrouter.ai"):
return False
for pattern in self.path_patterns:
if pattern in path:
logger.info(f"LLM path match: host={host}, path={path}")
return True
for pattern in self.host_patterns:
if pattern in host:
logger.info(f"LLM host match: host={host}, path={path}")
return True
return False
def extract_conversation_id(self, request_body: Dict[str, Any]) -> Optional[str]:
if 'conversation_id' in request_body:
return request_body['conversation_id']
messages = request_body.get('messages', [])
if messages and len(messages) > 0:
first_msg = messages[0]
if 'conversation_id' in first_msg:
return first_msg['conversation_id']
if not messages:
return None
system_content = None
first_user_content = None
for msg in messages:
role = msg.get('role')
if role == 'system' and system_content is None:
system_content = msg.get('content', '')
if role == 'user' and first_user_content is None:
first_user_content = msg.get('content', '')
if system_content is not None and first_user_content is not None:
break
if first_user_content is None:
return None
key = (system_content or '') + '\n---\n' + first_user_content
conv_id = uuid.uuid5(uuid.NAMESPACE_URL, key)
return str(conv_id)
def extract_reasoning(self, response_body: Dict[str, Any]) -> Optional[str]:
reasoning = None
if 'choices' in response_body:
for choice in response_body['choices']:
message = choice.get('message', {})
if 'reasoning_content' in message:
reasoning = message['reasoning_content']
break
if 'reasoning' in message:
reasoning = message['reasoning']
break
if 'reasoning_content' in response_body:
reasoning = response_body['reasoning_content']
if 'reasoning' in response_body:
reasoning = response_body['reasoning']
return reasoning
def extract_tokens_used(self, response_body: Dict[str, Any]) -> Optional[int]:
usage = response_body.get('usage', {})
if usage:
total_tokens = usage.get('total_tokens')
if total_tokens is not None:
return total_tokens
prompt_tokens = usage.get('prompt_tokens', 0)
completion_tokens = usage.get('completion_tokens', 0)
return prompt_tokens + completion_tokens
return None
def parse_sse_response(self, raw_content: bytes) -> Optional[Dict[str, Any]]:
text = raw_content.decode('utf-8', errors='ignore')
lines = text.splitlines()
data_lines = []
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith(':'):
continue
if not line.startswith('data:'):
continue
payload = line[5:].strip()
if payload == '[DONE]':
break
data_lines.append(payload)
if not data_lines:
return None
content_parts = []
reasoning_parts = []
tool_calls_state: Dict[str, Dict[str, Any]] = {}
for payload in data_lines:
try:
obj = json.loads(payload)
except json.JSONDecodeError:
continue
choices = obj.get('choices', [])
for choice in choices:
delta = choice.get('delta') or choice.get('message') or {}
if 'reasoning_content' in delta:
reasoning_parts.append(delta.get('reasoning_content') or '')
if 'content' in delta:
content_parts.append(delta.get('content') or '')
if 'tool_calls' in delta:
for idx, tc in enumerate(delta.get('tool_calls') or []):
tc_id = tc.get('id') or str(idx)
state = tool_calls_state.get(tc_id)
if state is None:
state = {
'id': tc.get('id'),
'type': tc.get('type'),
'function': {
'name': None,
'arguments': ''
}
}
tool_calls_state[tc_id] = state
fn = tc.get('function') or {}
if fn.get('name'):
state['function']['name'] = fn['name']
if fn.get('arguments'):
state['function']['arguments'] = state['function']['arguments'] + fn['arguments']
message: Dict[str, Any] = {}
if content_parts:
message['content'] = ''.join(content_parts)
if reasoning_parts:
message['reasoning_content'] = ''.join(reasoning_parts)
if tool_calls_state:
message['tool_calls'] = list(tool_calls_state.values())
if not message:
return None
return {
'choices': [
{
'message': message
}
]
}
def is_valid_llm_request(self, request_body: Dict[str, Any]) -> bool:
if 'messages' in request_body:
return True
if 'prompt' in request_body:
return True
if 'input' in request_body:
return True
return False
def request(self, flow: http.HTTPFlow) -> None:
if not self.is_llm_request(flow):
return
try:
logger.info(f"Processing potential LLM request: {flow.request.method} {flow.request.host}{flow.request.path}")
request_body = json.loads(flow.request.content)
if not self.is_valid_llm_request(request_body):
return
request_id = str(uuid.uuid4())
model = request_body.get('model', 'unknown')
messages = request_body.get('messages', [])
conversation_id = self.extract_conversation_id(request_body)
flow.request_id = request_id
self.db.save_request(
request_id=request_id,
model=model,
messages=messages,
request_body=request_body,
conversation_id=conversation_id
)
msg = f"\033[94mSaved request: {request_id}, model: {model}, messages: {len(messages)}\033[0m"
logger.info(msg)
except json.JSONDecodeError:
err = f"Failed to parse LLM request body for {flow.request.method} {flow.request.path}"
logger.error(err)
except Exception as e:
err = f"Error processing request: {e}"
logger.error(err)
def response(self, flow: http.HTTPFlow) -> None:
if not hasattr(flow, 'request_id'):
return
try:
raw = flow.response.content
content_type = flow.response.headers.get('content-type', '')
response_body: Optional[Dict[str, Any]] = None
if 'text/event-stream' in content_type or raw.strip().startswith(b'data:'):
response_body = self.parse_sse_response(raw)
else:
response_body = json.loads(raw)
if not response_body:
return
reasoning_content = self.extract_reasoning(response_body)
tokens_used = self.extract_tokens_used(response_body)
self.db.save_response(
request_id=flow.request_id,
response_body=response_body,
reasoning_content=reasoning_content,
tokens_used=tokens_used
)
msg = f"\033[94mSaved response for request: {flow.request_id}, tokens: {tokens_used}\033[0m"
logger.info(msg)
except json.JSONDecodeError:
err = f"Failed to parse response body for {flow.request.path}"
logger.debug(err)
except Exception as e:
err = f"Error processing response: {e}"
logger.error(err)
def load_config(config_path: str = "config.json") -> Dict[str, Any]:
try:
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
logger.warning(f"Config file not found: {config_path}, using defaults")
return {
"proxy": {
"listen_port": 8080,
"listen_host": "127.0.0.1"
},
"database": {
"path": "llm_data.db"
},
"filter": {
"enabled": True,
"path_patterns": ["/v1/", "/chat/completions", "/completions"],
"host_patterns": ["deepseek.com", "openrouter.ai", "api.openai.com"],
"save_all_requests": False
},
"export": {
"output_dir": "exports",
"include_reasoning": True,
"include_metadata": False
}
}
config = load_config()
addons = [LLMProxyAddon(config)]

1
requirements.txt Normal file
View File

@@ -0,0 +1 @@
mitmproxy>=10.0.0

5
start.bat Normal file
View File

@@ -0,0 +1,5 @@
@echo off
echo Starting LLM Proxy Server...
echo.
python start_proxy.py
pause

111
start_proxy.py Normal file
View File

@@ -0,0 +1,111 @@
import sys
import argparse
import platform
import ctypes
import winreg
from mitmproxy.tools.main import mitmdump
from proxy_addon import load_config
class SystemProxyManager:
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.original_enable = None
self.original_server = None
def _apply_windows_internet_options(self):
option_refresh = 37
option_settings_changed = 39
internet_set_option = ctypes.windll.Wininet.InternetSetOptionW
internet_set_option(0, option_settings_changed, 0, 0)
internet_set_option(0, option_refresh, 0, 0)
def enable(self):
if platform.system().lower() != "windows":
return
key_path = r"Software\Microsoft\Windows\CurrentVersion\Internet Settings"
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_READ | winreg.KEY_WRITE)
try:
self.original_enable, _ = winreg.QueryValueEx(key, "ProxyEnable")
except FileNotFoundError:
self.original_enable = 0
try:
self.original_server, _ = winreg.QueryValueEx(key, "ProxyServer")
except FileNotFoundError:
self.original_server = ""
winreg.SetValueEx(key, "ProxyEnable", 0, winreg.REG_DWORD, 1)
winreg.SetValueEx(key, "ProxyServer", 0, winreg.REG_SZ, f"{self.host}:{self.port}")
winreg.CloseKey(key)
self._apply_windows_internet_options()
def disable(self):
if platform.system().lower() != "windows":
return
if self.original_enable is None or self.original_server is None:
return
key_path = r"Software\Microsoft\Windows\CurrentVersion\Internet Settings"
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_READ | winreg.KEY_WRITE)
winreg.SetValueEx(key, "ProxyEnable", 0, winreg.REG_DWORD, self.original_enable)
winreg.SetValueEx(key, "ProxyServer", 0, winreg.REG_SZ, self.original_server)
winreg.CloseKey(key)
self._apply_windows_internet_options()
def start_proxy(config_path: str = "config.json", port: int = 8080, host: str = "127.0.0.1", enable_system_proxy: bool = True):
config = load_config(config_path)
proxy_config = config.get('proxy', {})
listen_port = port or proxy_config.get('listen_port', 8080)
listen_host = host or proxy_config.get('listen_host', '127.0.0.1')
print(f"\n{'='*60}")
print(f"LLM Proxy Server")
print(f"{'='*60}")
print(f"Listening on: {listen_host}:{listen_port}")
print(f"Config file: {config_path}")
print(f"Database: {config.get('database', {}).get('path', 'llm_data.db')}")
if enable_system_proxy and platform.system().lower() == "windows":
print("System proxy: enabled for current session")
print(f"{'='*60}\n")
manager = None
if enable_system_proxy:
manager = SystemProxyManager(listen_host, listen_port)
manager.enable()
sys.argv = [
'mitmdump',
'-q',
'-s', 'proxy_addon.py',
'--listen-host', listen_host,
'--listen-port', str(listen_port),
'--set', 'block_global=false',
'--set', 'flow_detail=0'
]
try:
mitmdump()
finally:
if manager is not None:
manager.disable()
def cli_main():
parser = argparse.ArgumentParser(description='Start LLM Proxy Server')
parser.add_argument('--config', '-c', type=str, default='config.json',
help='Path to config file')
parser.add_argument('--port', '-p', type=int, default=None,
help='Listen port (overrides config)')
parser.add_argument('--host', '-H', type=str, default=None,
help='Listen host (overrides config)')
parser.add_argument('--no-system-proxy', action='store_true',
help='Do not modify system proxy settings')
args = parser.parse_args()
start_proxy(args.config, args.port, args.host, not args.no_system_proxy)
if __name__ == '__main__':
cli_main()