289 lines
10 KiB
Python
289 lines
10 KiB
Python
import json
|
|
import uuid
|
|
import logging
|
|
from typing import Optional, Dict, Any, List
|
|
from mitmproxy import http
|
|
from database import LLMDatabase
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LLMProxyAddon:
|
|
def __init__(self, config: Dict[str, Any]):
|
|
self.config = config
|
|
self.db = LLMDatabase(config['database']['path'])
|
|
self.path_patterns = config['filter'].get('path_patterns', ['/v1/'])
|
|
self.host_patterns = config['filter'].get('host_patterns', [])
|
|
self.save_all = config['filter'].get('save_all_requests', False)
|
|
logger.info("LLMProxyAddon initialized")
|
|
|
|
def is_llm_request(self, flow: http.HTTPFlow) -> bool:
|
|
path = flow.request.path
|
|
host = flow.request.host
|
|
|
|
if host.startswith("clerk.openrouter.ai"):
|
|
return False
|
|
|
|
for pattern in self.path_patterns:
|
|
if pattern in path:
|
|
logger.info(f"LLM path match: host={host}, path={path}")
|
|
return True
|
|
|
|
for pattern in self.host_patterns:
|
|
if pattern in host:
|
|
logger.info(f"LLM host match: host={host}, path={path}")
|
|
return True
|
|
|
|
return False
|
|
|
|
def extract_conversation_id(self, request_body: Dict[str, Any]) -> Optional[str]:
|
|
if 'conversation_id' in request_body:
|
|
return request_body['conversation_id']
|
|
|
|
messages = request_body.get('messages', [])
|
|
if messages and len(messages) > 0:
|
|
first_msg = messages[0]
|
|
if 'conversation_id' in first_msg:
|
|
return first_msg['conversation_id']
|
|
|
|
if not messages:
|
|
return None
|
|
|
|
system_content = None
|
|
first_user_content = None
|
|
|
|
for msg in messages:
|
|
role = msg.get('role')
|
|
if role == 'system' and system_content is None:
|
|
system_content = msg.get('content', '')
|
|
if role == 'user' and first_user_content is None:
|
|
first_user_content = msg.get('content', '')
|
|
if system_content is not None and first_user_content is not None:
|
|
break
|
|
|
|
if first_user_content is None:
|
|
return None
|
|
|
|
key = (system_content or '') + '\n---\n' + first_user_content
|
|
conv_id = uuid.uuid5(uuid.NAMESPACE_URL, key)
|
|
return str(conv_id)
|
|
|
|
def extract_reasoning(self, response_body: Dict[str, Any]) -> Optional[str]:
|
|
reasoning = None
|
|
|
|
if 'choices' in response_body:
|
|
for choice in response_body['choices']:
|
|
message = choice.get('message', {})
|
|
if 'reasoning_content' in message:
|
|
reasoning = message['reasoning_content']
|
|
break
|
|
if 'reasoning' in message:
|
|
reasoning = message['reasoning']
|
|
break
|
|
|
|
if 'reasoning_content' in response_body:
|
|
reasoning = response_body['reasoning_content']
|
|
|
|
if 'reasoning' in response_body:
|
|
reasoning = response_body['reasoning']
|
|
|
|
return reasoning
|
|
|
|
def extract_tokens_used(self, response_body: Dict[str, Any]) -> Optional[int]:
|
|
usage = response_body.get('usage', {})
|
|
if usage:
|
|
total_tokens = usage.get('total_tokens')
|
|
if total_tokens is not None:
|
|
return total_tokens
|
|
|
|
prompt_tokens = usage.get('prompt_tokens', 0)
|
|
completion_tokens = usage.get('completion_tokens', 0)
|
|
return prompt_tokens + completion_tokens
|
|
|
|
return None
|
|
|
|
def parse_sse_response(self, raw_content: bytes) -> Optional[Dict[str, Any]]:
|
|
text = raw_content.decode('utf-8', errors='ignore')
|
|
lines = text.splitlines()
|
|
data_lines = []
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line.startswith(':'):
|
|
continue
|
|
if not line.startswith('data:'):
|
|
continue
|
|
payload = line[5:].strip()
|
|
if payload == '[DONE]':
|
|
break
|
|
data_lines.append(payload)
|
|
if not data_lines:
|
|
return None
|
|
content_parts = []
|
|
reasoning_parts = []
|
|
tool_calls_state: Dict[str, Dict[str, Any]] = {}
|
|
for payload in data_lines:
|
|
try:
|
|
obj = json.loads(payload)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
choices = obj.get('choices', [])
|
|
for choice in choices:
|
|
delta = choice.get('delta') or choice.get('message') or {}
|
|
if 'reasoning_content' in delta:
|
|
reasoning_parts.append(delta.get('reasoning_content') or '')
|
|
if 'content' in delta:
|
|
content_parts.append(delta.get('content') or '')
|
|
if 'tool_calls' in delta:
|
|
for idx, tc in enumerate(delta.get('tool_calls') or []):
|
|
tc_id = tc.get('id') or str(idx)
|
|
state = tool_calls_state.get(tc_id)
|
|
if state is None:
|
|
state = {
|
|
'id': tc.get('id'),
|
|
'type': tc.get('type'),
|
|
'function': {
|
|
'name': None,
|
|
'arguments': ''
|
|
}
|
|
}
|
|
tool_calls_state[tc_id] = state
|
|
fn = tc.get('function') or {}
|
|
if fn.get('name'):
|
|
state['function']['name'] = fn['name']
|
|
if fn.get('arguments'):
|
|
state['function']['arguments'] = state['function']['arguments'] + fn['arguments']
|
|
message: Dict[str, Any] = {}
|
|
if content_parts:
|
|
message['content'] = ''.join(content_parts)
|
|
if reasoning_parts:
|
|
message['reasoning_content'] = ''.join(reasoning_parts)
|
|
if tool_calls_state:
|
|
message['tool_calls'] = list(tool_calls_state.values())
|
|
if not message:
|
|
return None
|
|
return {
|
|
'choices': [
|
|
{
|
|
'message': message
|
|
}
|
|
]
|
|
}
|
|
|
|
def is_valid_llm_request(self, request_body: Dict[str, Any]) -> bool:
|
|
if 'messages' in request_body:
|
|
return True
|
|
|
|
if 'prompt' in request_body:
|
|
return True
|
|
|
|
if 'input' in request_body:
|
|
return True
|
|
|
|
return False
|
|
|
|
def request(self, flow: http.HTTPFlow) -> None:
|
|
if not self.is_llm_request(flow):
|
|
return
|
|
|
|
try:
|
|
logger.info(f"Processing potential LLM request: {flow.request.method} {flow.request.host}{flow.request.path}")
|
|
request_body = json.loads(flow.request.content)
|
|
|
|
if not self.is_valid_llm_request(request_body):
|
|
return
|
|
|
|
request_id = str(uuid.uuid4())
|
|
model = request_body.get('model', 'unknown')
|
|
messages = request_body.get('messages', [])
|
|
conversation_id = self.extract_conversation_id(request_body)
|
|
|
|
flow.request_id = request_id
|
|
|
|
self.db.save_request(
|
|
request_id=request_id,
|
|
model=model,
|
|
messages=messages,
|
|
request_body=request_body,
|
|
conversation_id=conversation_id
|
|
)
|
|
|
|
msg = f"\033[94mSaved request: {request_id}, model: {model}, messages: {len(messages)}\033[0m"
|
|
logger.info(msg)
|
|
|
|
except json.JSONDecodeError:
|
|
err = f"Failed to parse LLM request body for {flow.request.method} {flow.request.path}"
|
|
logger.error(err)
|
|
except Exception as e:
|
|
err = f"Error processing request: {e}"
|
|
logger.error(err)
|
|
|
|
def response(self, flow: http.HTTPFlow) -> None:
|
|
if not hasattr(flow, 'request_id'):
|
|
return
|
|
|
|
try:
|
|
raw = flow.response.content
|
|
content_type = flow.response.headers.get('content-type', '')
|
|
response_body: Optional[Dict[str, Any]] = None
|
|
if 'text/event-stream' in content_type or raw.strip().startswith(b'data:'):
|
|
response_body = self.parse_sse_response(raw)
|
|
else:
|
|
response_body = json.loads(raw)
|
|
if not response_body:
|
|
return
|
|
|
|
reasoning_content = self.extract_reasoning(response_body)
|
|
tokens_used = self.extract_tokens_used(response_body)
|
|
|
|
self.db.save_response(
|
|
request_id=flow.request_id,
|
|
response_body=response_body,
|
|
reasoning_content=reasoning_content,
|
|
tokens_used=tokens_used
|
|
)
|
|
|
|
msg = f"\033[94mSaved response for request: {flow.request_id}, tokens: {tokens_used}\033[0m"
|
|
logger.info(msg)
|
|
|
|
except json.JSONDecodeError:
|
|
err = f"Failed to parse response body for {flow.request.path}"
|
|
logger.debug(err)
|
|
except Exception as e:
|
|
err = f"Error processing response: {e}"
|
|
logger.error(err)
|
|
|
|
|
|
def load_config(config_path: str = "config.json") -> Dict[str, Any]:
|
|
try:
|
|
with open(config_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
logger.warning(f"Config file not found: {config_path}, using defaults")
|
|
return {
|
|
"proxy": {
|
|
"listen_port": 8080,
|
|
"listen_host": "127.0.0.1"
|
|
},
|
|
"database": {
|
|
"path": "llm_data.db"
|
|
},
|
|
"filter": {
|
|
"enabled": True,
|
|
"path_patterns": ["/v1/", "/chat/completions", "/completions"],
|
|
"host_patterns": ["deepseek.com", "openrouter.ai", "api.openai.com"],
|
|
"save_all_requests": False
|
|
},
|
|
"export": {
|
|
"output_dir": "exports",
|
|
"include_reasoning": True,
|
|
"include_metadata": False
|
|
}
|
|
}
|
|
|
|
|
|
config = load_config()
|
|
addons = [LLMProxyAddon(config)]
|