""" Django版本的ES连接和操作模块 迁移自Flask项目的ESConnect.py """ from elasticsearch import Elasticsearch from elasticsearch_dsl import connections import os from .documents import AchievementDocument, UserDocument, GlobalDocument, RegistrationCodeDocument from accounts.crypto import hash_password_random_salt from .indexes import ACHIEVEMENT_INDEX_NAME, USER_INDEX_NAME, GLOBAL_INDEX_NAME import hashlib import time from datetime import datetime, timezone, timedelta import uuid import json # 使用环境变量配置ES连接,默认为本机 _ES_URL = os.environ.get('ELASTICSEARCH_URL', 'http://localhost:9200') if not (_ES_URL.startswith('http://') or _ES_URL.startswith('https://')): _ES_URL = 'http://' + _ES_URL connections.create_connection(hosts=[_ES_URL]) # 获取默认的ES客户端 es = connections.get_connection() DATA_INDEX_NAME = ACHIEVEMENT_INDEX_NAME USERS_INDEX_NAME = USER_INDEX_NAME GLOBAL_TYPES_INDEX_NAME = GLOBAL_INDEX_NAME def create_index_with_mapping(): """创建索引和映射配置(仅当索引不存在时)""" # 获取 Elasticsearch 客户端(与 Document 使用的客户端一致) try: # --- 1. 处理获奖数据索引 --- if not es.indices.exists(index=DATA_INDEX_NAME): AchievementDocument.init() print(f"✅ 创建索引 {DATA_INDEX_NAME} 并设置映射") else: print(f"ℹ️ 索引 {DATA_INDEX_NAME} 已存在,跳过创建") # --- 2. 处理用户索引 --- if not es.indices.exists(index=USERS_INDEX_NAME): UserDocument.init() print(f"✅ 创建索引 {USERS_INDEX_NAME} 并设置映射") else: print(f"ℹ️ 索引 {USERS_INDEX_NAME} 已存在,跳过创建") # --- 3. 处理全局类型索引 --- if not es.indices.exists(index=GLOBAL_TYPES_INDEX_NAME): GlobalDocument.init() default_types = ['软著', '专利', '奖状'] doc = GlobalDocument(type_list=default_types) doc.meta.id = 'types' doc.save() print(f"✅ 创建索引 {GLOBAL_TYPES_INDEX_NAME} 并写入默认类型") else: try: GlobalDocument.get(id='types') except Exception: default_types = ['软著', '专利', '奖状'] doc = GlobalDocument(type_list=default_types) doc.meta.id = 'types' doc.save() print("ℹ️ 全局类型文档缺失,已补充默认类型") # --- 4. 创建默认管理员用户(可选:也可检查用户是否已存在)--- # 这里简单处理:每次初始化都写入(可能重复),建议加唯一性判断 _salt_b64, _hash_b64 = hash_password_random_salt("admin") admin_user = { "user_id": 0, "username": "admin", "password_hash": _hash_b64, "password_salt": _salt_b64, "permission": 0 } # 可选:检查 admin 是否已存在(根据 user_id 或 username) from elasticsearch_dsl import Search s = Search(using=es, index=USERS_INDEX_NAME).query("match", username="admin") if s.count() == 0: write_user_data(admin_user) print("✅ 默认管理员用户已创建") else: print("ℹ️ 默认管理员用户已存在,跳过创建") except Exception as e: print(f"❌ 创建索引失败: {str(e)}") # raise # 可选:在 AppConfig 中捕获,这里可以 re-raise 便于调试 def get_type_list(): try: doc = GlobalDocument.get(id='types') lst = [str(t).strip().strip(';') for t in (doc.type_list or [])] return lst except Exception: return ['软著', '专利', '奖状'] def ensure_type_in_list(type_name: str): if not type_name: return False norm = str(type_name).strip().strip(';') try: try: doc = GlobalDocument.get(id='types') cur = list(doc.type_list or []) except Exception: cur = ['软著', '专利', '奖状'] doc = GlobalDocument(type_list=cur) doc.meta.id = 'types' cur_sanitized = {str(t).strip().strip(';') for t in cur} if norm not in cur_sanitized: cur.append(norm) doc.type_list = cur doc.save() return True return False except Exception: return False def get_keys_list(): try: try: doc = GlobalDocument.get(id='keys') cur = list(doc.keys_list or []) except Exception: cur = [] doc = GlobalDocument(keys_list=cur) doc.meta.id = 'keys' doc.save() return [str(t).strip().strip(';') for t in cur] except Exception: return [] def ensure_key_in_list(key_name: str): if not key_name: return False norm = str(key_name).strip().strip(';') try: try: doc = GlobalDocument.get(id='keys') cur = list(doc.keys_list or []) except Exception: cur = [] doc = GlobalDocument(keys_list=cur) doc.meta.id = 'keys' cur_sanitized = {str(t).strip().strip(';') for t in cur} if norm not in cur_sanitized: cur.append(norm) doc.keys_list = cur doc.save() return True return False except Exception: return False def generate_registration_code(keys=None, manage_keys=None, expires_in_days: int = 30, created_by: int = None): try: keys = list(keys or []) manage_keys = list(manage_keys or []) for k in list(keys): ensure_key_in_list(k) for mk in list(manage_keys): ensure_key_in_list(mk) code = uuid.uuid4().hex + str(int(time.time()))[-6:] now = datetime.now(timezone.utc) expires = now + timedelta(days=max(1, int(expires_in_days or 30))) doc = RegistrationCodeDocument( code=code, keys=keys, manage_keys=manage_keys, created_at=now.isoformat(), expires_at=expires.isoformat(), created_by=created_by, ) doc.meta.id = code doc.save() return { "code": code, "keys": keys, "manage_keys": manage_keys, "created_at": now.isoformat(), "expires_at": expires.isoformat(), } except Exception as e: return None def get_registration_code(code: str): try: doc = RegistrationCodeDocument.get(id=str(code)) return { "code": getattr(doc, 'code', str(code)), "keys": list(getattr(doc, 'keys', []) or []), "manage_keys": list(getattr(doc, 'manage_keys', []) or []), "created_at": getattr(doc, 'created_at', None), "expires_at": getattr(doc, 'expires_at', None), "created_by": getattr(doc, 'created_by', None), } except Exception: return None def list_registration_codes(): try: search = RegistrationCodeDocument.search() body = { "sort": [{"created_at": {"order": "desc"}}], "query": {"exists": {"field": "code"}} } search = search.update_from_dict(body) resp = search.execute() out = [] now = datetime.now(timezone.utc) for hit in resp: try: if not getattr(hit, 'code', None): continue except Exception: continue exp = getattr(hit, 'expires_at', None) try: if hasattr(exp, 'isoformat'): exp_dt = exp else: exp_dt = datetime.fromisoformat(str(exp)) except Exception: exp_dt = None active = bool(exp_dt and exp_dt > now) out.append({ "code": getattr(hit, 'code', ''), "keys": list(getattr(hit, 'keys', []) or []), "manage_keys": list(getattr(hit, 'manage_keys', []) or []), "created_at": getattr(hit, 'created_at', None), "expires_at": getattr(hit, 'expires_at', None), "created_by": getattr(hit, 'created_by', None), "active": active, }) return out except Exception: return [] def revoke_registration_code(code: str): try: doc = RegistrationCodeDocument.get(id=str(code)) now = datetime.now(timezone.utc).isoformat() doc.expires_at = now doc.save() return True except Exception: return False def get_doc_id(data): """ 根据数据内容生成唯一ID(用于去重) 参数: data (dict): 包含文档数据的字典 返回: str: 基于数据内容生成的MD5哈希值作为唯一ID """ data_str = data.get('data', '') image_str = data.get('image', '') unique_str = f"{data_str}{image_str}" return hashlib.md5(unique_str.encode('utf-8')).hexdigest() def insert_data(data): """ 向Elasticsearch插入数据 参数: data (dict): 要插入的数据 返回: bool: 插入成功返回True,失败返回False """ try: achievement = AchievementDocument( writer_id=data.get('writer_id', ''), data=data.get('data', ''), image=data.get('image', ''), time=datetime.now(timezone.utc).isoformat() ) achievement.save() print(f"文档写入成功,内容: {data}") return True except Exception as e: print(f"文档写入失败: {str(e)}, 数据: {data}") return False def search_data(query): """ 在Elasticsearch中搜索数据 参数: query (str): 搜索关键词 返回: list: 包含搜索结果的列表 """ try: # 使用Django-elasticsearch-dsl进行搜索 search = AchievementDocument.search() search = search.query("multi_match", query=query, fields=['*']) response = search.execute() results = [] for hit in response: results.append({ "_id": hit.meta.id, "writer_id": hit.writer_id, "data": hit.data, "image": hit.image }) return results except Exception as e: print(f"搜索失败: {str(e)}") return [] def search_all(): """获取所有文档""" try: search = AchievementDocument.search() search = search.query("match_all") response = search.execute() results = [] for hit in response: results.append({ "_id": hit.meta.id, "writer_id": hit.writer_id, "data": hit.data, "image": hit.image }) return results except Exception as e: print(f"获取所有文档失败: {str(e)}") return [] def delete_by_id(doc_id): """ 根据 doc_id 删除文档 参数: doc_id (str): 要删除的文档ID 返回: bool: 删除成功返回True,失败返回False """ try: # 使用Django-elasticsearch-dsl删除文档 achievement = AchievementDocument.get(id=doc_id) achievement.delete() print(f"文档 {doc_id} 删除成功") return True except Exception as e: print(f"删除失败: {str(e)}") return False def update_by_id(doc_id, updated_data): """ 根据文档ID更新数据 参数: doc_id (str): 要更新的文档ID updated_data (dict): 更新的数据内容 返回: bool: 更新成功返回True,失败返回False """ try: # 获取文档 achievement = AchievementDocument.get(id=doc_id) print(doc_id) # 更新字段 if 'writer_id' in updated_data: achievement.writer_id = updated_data['writer_id'] if 'data' in updated_data: achievement.data = updated_data['data'] if 'image' in updated_data: achievement.image = updated_data['image'] achievement.save() print(f"文档 {doc_id} 更新成功") return True except Exception as e: print(f"更新失败: {str(e)}") return False def get_by_id(doc_id): """ 根据文档ID获取单个文档 参数: doc_id (str): 要获取的文档ID 返回: dict or None: 成功返回文档数据,失败返回None """ try: achievement = AchievementDocument.get(id=doc_id) return { "_id": achievement.meta.id, "writer_id": achievement.writer_id, "data": achievement.data, "image": achievement.image } except Exception as e: print(f"获取文档失败: {str(e)}") return None def search_by_any_field(keyword): """ 在任意字段中搜索关键词(支持模糊搜索) 参数: keyword (str): 搜索关键词 返回: list: 包含搜索结果的列表 """ try: search = AchievementDocument.search() # 使用multi_match查询,在所有字段中搜索 search = search.query("multi_match", query=keyword, fields=['*'], fuzziness="AUTO") response = search.execute() results = [] for hit in response: results.append({ "_id": hit.meta.id, "writer_id": hit.writer_id, "data": hit.data, "image": hit.image }) return results except Exception as e: print(f"模糊搜索失败: {str(e)}") return [] def _type_filters_from_list(limit: int = None): try: types = get_type_list() except Exception: types = ['软著', '专利', '奖状'] if isinstance(limit, int) and limit > 0: types = types[:limit] filters = {} for t in types: key = str(t) # 精确匹配键与值之间的关系,避免其它字段中的同名值造成误匹配 pattern = f'*"数据类型": "{key}"*' filters[key] = {"wildcard": {"data.keyword": {"value": pattern}}} return filters def analytics_trend(gte: str = None, lte: str = None, interval: str = "day"): try: search = AchievementDocument.search() body = { "size": 0, "aggs": { "trend": { "date_histogram": { "field": "time", "calendar_interval": interval, "min_doc_count": 0 } } } } if gte or lte: rng = {} if gte: rng["gte"] = gte if lte: rng["lte"] = lte body["query"] = {"range": {"time": rng}} search = search.update_from_dict(body) resp = search.execute() buckets = resp.aggregations.trend.buckets if hasattr(resp, 'aggregations') else [] return [{"key_as_string": b.key_as_string, "key": b.key, "doc_count": b.doc_count} for b in buckets] except Exception as e: print(f"分析趋势失败: {str(e)}") return [] def analytics_types(gte: str = None, lte: str = None, size: int = 10): try: filters = _type_filters_from_list(limit=size) body = { "size": 0, "aggs": { "by_type": { "filters": { "filters": filters } } } } if gte or lte: rng = {} if gte: rng["gte"] = gte if lte: rng["lte"] = lte body["query"] = {"range": {"time": rng}} resp = es.search(index=DATA_INDEX_NAME, body=body) buckets = resp.get("aggregations", {}).get("by_type", {}).get("buckets", {}) out = [] for k, v in buckets.items(): try: out.append({"key": k, "doc_count": int(v.get("doc_count", 0))}) except Exception: out.append({"key": str(k), "doc_count": 0}) return out except Exception as e: print(f"分析类型占比失败: {str(e)}") return [] def analytics_types_trend(gte: str = None, lte: str = None, interval: str = "week", size: int = 8): try: filters = _type_filters_from_list(limit=size) body = { "size": 0, "aggs": { "by_interval": { "date_histogram": { "field": "time", "calendar_interval": interval, "min_doc_count": 0 }, "aggs": { "by_type": { "filters": {"filters": filters} } } } } } if gte or lte: rng = {} if gte: rng["gte"] = gte if lte: rng["lte"] = lte body["query"] = {"range": {"time": rng}} resp = es.search(index=DATA_INDEX_NAME, body=body) by_interval = resp.get("aggregations", {}).get("by_interval", {}).get("buckets", []) out = [] for ib in by_interval: t_buckets = ib.get("by_type", {}).get("buckets", {}) types_arr = [] for k, v in t_buckets.items(): types_arr.append({"key": k, "doc_count": int(v.get("doc_count", 0))}) out.append({ "key_as_string": ib.get("key_as_string"), "key": ib.get("key"), "doc_count": ib.get("doc_count", 0), "types": types_arr }) return out except Exception as e: print(f"分析类型变化失败: {str(e)}") return [] def analytics_recent(limit: int = 10, gte: str = None, lte: str = None): try: def _extract_type(s: str): if not s: return "" try: obj = json.loads(s) if isinstance(obj, dict): v = obj.get("数据类型") if isinstance(v, str) and v: return v except Exception: pass try: m = re.search(r'"数据类型"\s*:\s*"([^"]+)"', s) if m: return m.group(1) except Exception: pass return "" search = AchievementDocument.search() body = { "size": max(1, min(limit, 100)), "sort": [{"time": {"order": "desc"}}] } if gte or lte: rng = {} if gte: rng["gte"] = gte if lte: rng["lte"] = lte body["query"] = {"range": {"time": rng}} search = search.update_from_dict(body) resp = search.execute() results = [] for hit in resp: w = getattr(hit, 'writer_id', '') uname = None try: uname_lookup = get_user_by_id(w) uname = (uname_lookup or {}).get("username") except Exception: uname = None if not uname: try: uname_lookup = get_user_by_id(int(w)) uname = (uname_lookup or {}).get("username") except Exception: uname = None tval = _extract_type(getattr(hit, 'data', '')) results.append({ "_id": hit.meta.id, "writer_id": w, "username": uname or "", "type": tval or "", "time": getattr(hit, 'time', None) }) return results except Exception as e: print(f"获取最近活动失败: {str(e)}") return [] def write_user_data(user_data): """ 写入用户数据到 ES 参数: user_data (dict): 用户数据 返回: bool: 写入成功返回True,失败返回False """ try: # enforce integer permission try: perm_val = int(user_data.get('permission', 1)) except Exception: perm_val = 1 pwd = str(user_data.get('password') or '').strip() pwd_hash_b64 = user_data.get('password_hash') pwd_salt_b64 = user_data.get('password_salt') if pwd: salt_b64, hash_b64 = hash_password_random_salt(pwd) pwd_hash_b64, pwd_salt_b64 = hash_b64, salt_b64 user = UserDocument( user_id=user_data.get('user_id'), username=user_data.get('username'), password_hash=pwd_hash_b64, password_salt=pwd_salt_b64, permission=perm_val, email=user_data.get('email'), key=list(user_data.get('key') or []), manage_key=list(user_data.get('manage_key') or []), ) user.save() print(f"用户数据写入成功: {user_data.get('username')}") return True except Exception as e: print(f"用户数据写入失败: {str(e)}") return False def get_user_by_id(user_id): try: search = UserDocument.search() search = search.query("term", user_id=user_id) response = search.execute() if response.hits: hit = response.hits[0] return { "user_id": hit.user_id, "username": hit.username, "permission": hit.permission } return None except Exception as e: print(f"获取用户数据失败: {str(e)}") return None def get_user_by_username(username): """ 根据用户名获取用户数据 参数: username (str): 用户名 返回: dict or None: 用户数据或None """ try: search = UserDocument.search() search = search.query("term", username=username) response = search.execute() if response.hits: hit = response.hits[0] return { "user_id": hit.user_id, "username": hit.username, "password_hash": getattr(hit, 'password_hash', None), "password_salt": getattr(hit, 'password_salt', None), "permission": int(hit.permission) } return None except Exception as e: print(f"获取用户数据失败: {str(e)}") return None def get_all_users(): """获取所有用户""" try: search = UserDocument.search() search = search.query("match_all") response = search.execute() users = [] for hit in response: users.append({ "user_id": hit.user_id, "username": hit.username, "permission": int(hit.permission), "email": getattr(hit, 'email', None), "key": list(getattr(hit, 'key', []) or []), "manage_key": list(getattr(hit, 'manage_key', []) or []), }) return users except Exception as e: print(f"获取所有用户失败: {str(e)}") return [] def get_user_by_id(user_id): try: search = UserDocument.search() search = search.query("term", user_id=int(user_id)) response = search.execute() if response.hits: hit = response.hits[0] return { "user_id": hit.user_id, "username": hit.username, "permission": int(hit.permission), "email": getattr(hit, 'email', None), "key": list(getattr(hit, 'key', []) or []), "manage_key": list(getattr(hit, 'manage_key', []) or []), } return None except Exception as e: print(f"获取用户数据失败: {str(e)}") return None def delete_user_by_id(user_id): try: search = UserDocument.search() search = search.query("term", user_id=int(user_id)) response = search.execute() if response.hits: hit = response.hits[0] doc = UserDocument.get(id=hit.meta.id) doc.delete() return True return False except Exception as e: print(f"删除用户失败: {str(e)}") return False def update_user_by_id(user_id, username=None, permission=None, password=None): try: search = UserDocument.search() search = search.query("term", user_id=int(user_id)) response = search.execute() if response.hits: hit = response.hits[0] doc = UserDocument.get(id=hit.meta.id) if username is not None: doc.username = username if permission is not None: doc.permission = int(permission) if password is not None: salt_b64, hash_b64 = hash_password_random_salt(str(password)) doc.password_hash = hash_b64 doc.password_salt = salt_b64 doc.save() return True return False except Exception as e: print(f"更新用户失败: {str(e)}") return False