import sqlite3 import json import uuid from datetime import datetime from typing import Optional, Dict, Any, List from pathlib import Path class LLMDatabase: def __init__(self, db_path: str = "llm_data.db"): self.db_path = db_path self.init_database() def get_connection(self): conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row return conn def init_database(self): conn = self.get_connection() cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS conversations ( id INTEGER PRIMARY KEY AUTOINCREMENT, conversation_id TEXT UNIQUE NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS requests ( id INTEGER PRIMARY KEY AUTOINCREMENT, request_id TEXT UNIQUE NOT NULL, conversation_id TEXT, model TEXT, messages TEXT, request_body TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (conversation_id) REFERENCES conversations(conversation_id) ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS responses ( id INTEGER PRIMARY KEY AUTOINCREMENT, request_id TEXT NOT NULL, response_body TEXT, reasoning_content TEXT, tokens_used INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (request_id) REFERENCES requests(request_id) ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_conversation_id ON requests(conversation_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_request_id ON responses(request_id) """) conn.commit() conn.close() def get_or_create_conversation(self, conversation_id: Optional[str] = None) -> str: if conversation_id is None: conversation_id = str(uuid.uuid4()) conn = self.get_connection() cursor = conn.cursor() cursor.execute(""" INSERT OR IGNORE INTO conversations (conversation_id) VALUES (?) """, (conversation_id,)) cursor.execute(""" UPDATE conversations SET updated_at = CURRENT_TIMESTAMP WHERE conversation_id = ? """, (conversation_id,)) conn.commit() conn.close() return conversation_id def save_request(self, request_id: str, model: str, messages: List[Dict[str, Any]], request_body: Dict[str, Any], conversation_id: Optional[str] = None) -> None: conversation_id = self.get_or_create_conversation(conversation_id) conn = self.get_connection() cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO requests (request_id, conversation_id, model, messages, request_body) VALUES (?, ?, ?, ?, ?) """, ( request_id, conversation_id, model, json.dumps(messages, ensure_ascii=False), json.dumps(request_body, ensure_ascii=False) )) conn.commit() conn.close() def save_response(self, request_id: str, response_body: Dict[str, Any], reasoning_content: Optional[str] = None, tokens_used: Optional[int] = None) -> None: conn = self.get_connection() cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO responses (request_id, response_body, reasoning_content, tokens_used) VALUES (?, ?, ?, ?) """, ( request_id, json.dumps(response_body, ensure_ascii=False), reasoning_content, tokens_used )) conn.commit() conn.close() def get_conversation_messages(self, conversation_id: str) -> List[Dict[str, Any]]: conn = self.get_connection() cursor = conn.cursor() cursor.execute(""" SELECT r.messages, resp.response_body, resp.reasoning_content FROM requests r LEFT JOIN responses resp ON r.request_id = resp.request_id WHERE r.conversation_id = ? ORDER BY r.created_at """, (conversation_id,)) rows = cursor.fetchall() conn.close() messages = [] for row in rows: request_messages = json.loads(row['messages']) response_body = json.loads(row['response_body']) if row['response_body'] else None reasoning_content = row['reasoning_content'] if not messages: for msg in request_messages: messages.append(msg) else: max_prefix = min(len(messages), len(request_messages)) prefix_len = 0 while prefix_len < max_prefix and messages[prefix_len] == request_messages[prefix_len]: prefix_len += 1 for msg in request_messages[prefix_len:]: messages.append(msg) if response_body and 'choices' in response_body: for choice in response_body['choices']: assistant_msg = { 'role': 'assistant', 'content': choice.get('message', {}).get('content', '') } if reasoning_content: assistant_msg['reasoning'] = reasoning_content messages.append(assistant_msg) return messages def get_all_conversations(self) -> List[Dict[str, Any]]: conn = self.get_connection() cursor = conn.cursor() cursor.execute(""" SELECT conversation_id, created_at, updated_at FROM conversations ORDER BY updated_at DESC """) rows = cursor.fetchall() conn.close() return [ { 'conversation_id': row['conversation_id'], 'created_at': row['created_at'], 'updated_at': row['updated_at'] } for row in rows ] def export_to_jsonl(self, output_path: str, include_reasoning: bool = True) -> int: conversations = self.get_all_conversations() count = 0 with open(output_path, 'w', encoding='utf-8') as f: for conv in conversations: messages = self.get_conversation_messages(conv['conversation_id']) if not messages: continue if not include_reasoning: messages = [ {k: v for k, v in msg.items() if k != 'reasoning'} for msg in messages ] jsonl_line = json.dumps({'messages': messages}, ensure_ascii=False) f.write(jsonl_line + '\n') count += 1 return count def get_stats(self) -> Dict[str, Any]: conn = self.get_connection() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) as count FROM conversations") conversation_count = cursor.fetchone()['count'] cursor.execute("SELECT COUNT(*) as count FROM requests") request_count = cursor.fetchone()['count'] cursor.execute("SELECT COUNT(*) as count FROM responses") response_count = cursor.fetchone()['count'] cursor.execute("SELECT SUM(tokens_used) as total FROM responses") total_tokens = cursor.fetchone()['total'] or 0 conn.close() return { 'conversations': conversation_count, 'requests': request_count, 'responses': response_count, 'total_tokens': total_tokens }