""" Django版本的ES连接和操作模块 迁移自Flask项目的ESConnect.py """ from elasticsearch import Elasticsearch from elasticsearch_dsl import connections import os from .documents import AchievementDocument, UserDocument, GlobalDocument from .indexes import ACHIEVEMENT_INDEX_NAME, USER_INDEX_NAME, GLOBAL_INDEX_NAME import hashlib import time from datetime import datetime, timezone, timedelta import threading # 使用环境变量配置ES连接,默认为本机 _ES_URL = os.environ.get('ELASTICSEARCH_URL', 'http://localhost:9200') if not (_ES_URL.startswith('http://') or _ES_URL.startswith('https://')): _ES_URL = 'http://' + _ES_URL connections.create_connection(hosts=[_ES_URL]) # 获取默认的ES客户端 es = connections.get_connection() DATA_INDEX_NAME = ACHIEVEMENT_INDEX_NAME USERS_INDEX_NAME = USER_INDEX_NAME GLOBAL_TYPES_INDEX_NAME = GLOBAL_INDEX_NAME def create_index_with_mapping(): """创建索引和映射配置(仅当索引不存在时)""" # 获取 Elasticsearch 客户端(与 Document 使用的客户端一致) try: # --- 1. 处理获奖数据索引 --- if not es.indices.exists(index=DATA_INDEX_NAME): AchievementDocument.init() print(f"✅ 创建索引 {DATA_INDEX_NAME} 并设置映射") else: print(f"ℹ️ 索引 {DATA_INDEX_NAME} 已存在,跳过创建") # --- 2. 处理用户索引 --- if not es.indices.exists(index=USERS_INDEX_NAME): UserDocument.init() print(f"✅ 创建索引 {USERS_INDEX_NAME} 并设置映射") else: print(f"ℹ️ 索引 {USERS_INDEX_NAME} 已存在,跳过创建") # --- 3. 处理全局类型索引 --- if not es.indices.exists(index=GLOBAL_TYPES_INDEX_NAME): GlobalDocument.init() default_types = ['软著', '专利', '奖状'] doc = GlobalDocument(type_list=default_types) doc.meta.id = 'types' doc.save() print(f"✅ 创建索引 {GLOBAL_TYPES_INDEX_NAME} 并写入默认类型") else: try: GlobalDocument.get(id='types') except Exception: default_types = ['软著', '专利', '奖状'] doc = GlobalDocument(type_list=default_types) doc.meta.id = 'types' doc.save() print("ℹ️ 全局类型文档缺失,已补充默认类型") # --- 4. 创建默认管理员用户(可选:也可检查用户是否已存在)--- # 这里简单处理:每次初始化都写入(可能重复),建议加唯一性判断 admin_user = { "user_id": 0, "username": "admin", "password": "admin", # ⚠️ 生产环境务必加密! "permission": 0 } # 可选:检查 admin 是否已存在(根据 user_id 或 username) from elasticsearch_dsl import Search s = Search(using=es, index=USERS_INDEX_NAME).query("match", username="admin") if s.count() == 0: write_user_data(admin_user) print("✅ 默认管理员用户已创建") else: print("ℹ️ 默认管理员用户已存在,跳过创建") except Exception as e: print(f"❌ 创建索引失败: {str(e)}") # raise # 可选:在 AppConfig 中捕获,这里可以 re-raise 便于调试 def get_type_list(): try: doc = GlobalDocument.get(id='types') lst = [str(t).strip().strip(';') for t in (doc.type_list or [])] return lst except Exception: return ['软著', '专利', '奖状'] def ensure_type_in_list(type_name: str): if not type_name: return False norm = str(type_name).strip().strip(';') try: try: doc = GlobalDocument.get(id='types') cur = list(doc.type_list or []) except Exception: cur = ['软著', '专利', '奖状'] doc = GlobalDocument(type_list=cur) doc.meta.id = 'types' cur_sanitized = {str(t).strip().strip(';') for t in cur} if norm not in cur_sanitized: cur.append(norm) doc.type_list = cur doc.save() return True return False except Exception: return False def get_doc_id(data): """ 根据数据内容生成唯一ID(用于去重) 参数: data (dict): 包含文档数据的字典 返回: str: 基于数据内容生成的MD5哈希值作为唯一ID """ data_str = data.get('data', '') image_str = data.get('image', '') unique_str = f"{data_str}{image_str}" return hashlib.md5(unique_str.encode('utf-8')).hexdigest() def insert_data(data): """ 向Elasticsearch插入数据 参数: data (dict): 要插入的数据 返回: bool: 插入成功返回True,失败返回False """ try: achievement = AchievementDocument( writer_id=data.get('writer_id', ''), data=data.get('data', ''), image=data.get('image', ''), time=datetime.now(timezone.utc).isoformat() ) achievement.save() print(f"文档写入成功,内容: {data}") return True except Exception as e: print(f"文档写入失败: {str(e)}, 数据: {data}") return False def search_data(query): """ 在Elasticsearch中搜索数据 参数: query (str): 搜索关键词 返回: list: 包含搜索结果的列表 """ try: # 使用Django-elasticsearch-dsl进行搜索 search = AchievementDocument.search() search = search.query("multi_match", query=query, fields=['*']) response = search.execute() results = [] for hit in response: results.append({ "_id": hit.meta.id, "writer_id": hit.writer_id, "data": hit.data, "image": hit.image }) return results except Exception as e: print(f"搜索失败: {str(e)}") return [] def search_all(): """获取所有文档""" try: search = AchievementDocument.search() search = search.query("match_all") response = search.execute() results = [] for hit in response: results.append({ "_id": hit.meta.id, "writer_id": hit.writer_id, "data": hit.data, "image": hit.image }) return results except Exception as e: print(f"获取所有文档失败: {str(e)}") return [] def delete_by_id(doc_id): """ 根据 doc_id 删除文档 参数: doc_id (str): 要删除的文档ID 返回: bool: 删除成功返回True,失败返回False """ try: # 使用Django-elasticsearch-dsl删除文档 achievement = AchievementDocument.get(id=doc_id) achievement.delete() print(f"文档 {doc_id} 删除成功") return True except Exception as e: print(f"删除失败: {str(e)}") return False def update_by_id(doc_id, updated_data): """ 根据文档ID更新数据 参数: doc_id (str): 要更新的文档ID updated_data (dict): 更新的数据内容 返回: bool: 更新成功返回True,失败返回False """ try: # 获取文档 achievement = AchievementDocument.get(id=doc_id) print(doc_id) # 更新字段 if 'writer_id' in updated_data: achievement.writer_id = updated_data['writer_id'] if 'data' in updated_data: achievement.data = updated_data['data'] if 'image' in updated_data: achievement.image = updated_data['image'] achievement.save() print(f"文档 {doc_id} 更新成功") return True except Exception as e: print(f"更新失败: {str(e)}") return False def get_by_id(doc_id): """ 根据文档ID获取单个文档 参数: doc_id (str): 要获取的文档ID 返回: dict or None: 成功返回文档数据,失败返回None """ try: achievement = AchievementDocument.get(id=doc_id) return { "_id": achievement.meta.id, "writer_id": achievement.writer_id, "data": achievement.data, "image": achievement.image } except Exception as e: print(f"获取文档失败: {str(e)}") return None def search_by_any_field(keyword): """ 在任意字段中搜索关键词(支持模糊搜索) 参数: keyword (str): 搜索关键词 返回: list: 包含搜索结果的列表 """ try: search = AchievementDocument.search() # 使用multi_match查询,在所有字段中搜索 search = search.query("multi_match", query=keyword, fields=['*'], fuzziness="AUTO") response = search.execute() results = [] for hit in response: results.append({ "_id": hit.meta.id, "writer_id": hit.writer_id, "data": hit.data, "image": hit.image }) return results except Exception as e: print(f"模糊搜索失败: {str(e)}") return [] ANALYTICS_CACHE = {"data": None, "ts": 0} def _compute_hist(range_gte: str, interval: str, fmt: str): from elasticsearch_dsl import Search s = AchievementDocument.search() s = s.filter('range', time={'gte': range_gte, 'lte': 'now'}) s = s.extra(size=0) s.aggs.bucket('b', 'date_histogram', field='time', calendar_interval=interval, format=fmt, min_doc_count=0) resp = s.execute() buckets = getattr(resp.aggs, 'b').buckets return [{"label": b.key_as_string, "count": b.doc_count} for b in buckets] def _compute_type_counts(range_gte: str, types: list): counts = [] for t in types: s = AchievementDocument.search() s = s.filter('range', time={'gte': range_gte, 'lte': 'now'}) s = s.query('match_phrase', data=str(t)) total = s.count() counts.append({"type": str(t), "count": int(total)}) return counts def compute_analytics(): types = get_type_list() days = _compute_hist('now-10d/d', 'day', 'yyyy-MM-dd') weeks = _compute_hist('now-10w/w', 'week', 'yyyy-ww') months = _compute_hist('now-10M/M', 'month', 'yyyy-MM') pie_1m = _compute_type_counts('now-1M/M', types) pie_12m = _compute_type_counts('now-12M/M', types) return { "last_10_days": days[-10:], "last_10_weeks": weeks[-10:], "last_10_months": months[-10:], "type_pie_1m": pie_1m, "type_pie_12m": pie_12m, } def get_analytics_overview(force: bool = False): now_ts = time.time() if force or ANALYTICS_CACHE["data"] is None or (now_ts - ANALYTICS_CACHE["ts"]) > 3600: ANALYTICS_CACHE["data"] = compute_analytics() ANALYTICS_CACHE["ts"] = now_ts return ANALYTICS_CACHE["data"] def _seconds_until_hour(h: int): now = datetime.now() tgt = now.replace(hour=h, minute=0, second=0, microsecond=0) if tgt <= now: tgt = tgt + timedelta(days=1) return max(0, int((tgt - now).total_seconds())) def start_daily_analytics_scheduler(): def _run_and_reschedule(): try: get_analytics_overview(force=True) except Exception as e: print(f"分析任务失败: {e}") finally: threading.Timer(24 * 3600, _run_and_reschedule).start() delay = _seconds_until_hour(3) threading.Timer(delay, _run_and_reschedule).start() def write_user_data(user_data): """ 写入用户数据到 ES 参数: user_data (dict): 用户数据 返回: bool: 写入成功返回True,失败返回False """ try: user = UserDocument( user_id=user_data.get('user_id'), username=user_data.get('username'), password=user_data.get('password'), permission=user_data.get('permission', 1) ) user.save() print(f"用户数据写入成功: {user_data.get('username')}") return True except Exception as e: print(f"用户数据写入失败: {str(e)}") return False def get_user_by_id(user_id): try: search = UserDocument.search() search = search.query("term", user_id=user_id) response = search.execute() if response.hits: hit = response.hits[0] return { "user_id": hit.user_id, "username": hit.username, "password": hit.password, "permission": hit.permission } return None except Exception as e: print(f"获取用户数据失败: {str(e)}") return None def get_user_by_username(username): """ 根据用户名获取用户数据 参数: username (str): 用户名 返回: dict or None: 用户数据或None """ try: search = UserDocument.search() search = search.query("term", username=username) response = search.execute() if response.hits: hit = response.hits[0] return { "user_id": hit.user_id, "username": hit.username, "password": hit.password, "permission": hit.permission } return None except Exception as e: print(f"获取用户数据失败: {str(e)}") return None def get_all_users(): """获取所有用户""" try: search = UserDocument.search() search = search.query("match_all") response = search.execute() users = [] for hit in response: users.append({ "user_id": hit.user_id, "username": hit.username, "permission": hit.permission }) return users except Exception as e: print(f"获取所有用户失败: {str(e)}") return [] def get_user_by_id(user_id): try: search = UserDocument.search() search = search.query("term", user_id=int(user_id)) response = search.execute() if response.hits: hit = response.hits[0] return { "user_id": hit.user_id, "username": hit.username, "permission": hit.permission, } return None except Exception as e: print(f"获取用户数据失败: {str(e)}") return None def delete_user_by_username(username): """ 根据用户名删除用户 参数: username (str): 用户名 返回: bool: 删除成功返回True,失败返回False """ try: search = UserDocument.search() search = search.query("term", username=username) response = search.execute() if response.hits: user = response.hits[0] user.delete() print(f"用户 {username} 删除成功") return True return False except Exception as e: print(f"删除用户失败: {str(e)}") return False def update_user_permission(username, new_permission): """ 更新用户权限 参数: username (str): 用户名 new_permission (int): 新权限级别 返回: bool: 更新成功返回True,失败返回False """ try: search = UserDocument.search() search = search.query("term", username=username) response = search.execute() if response.hits: user = response.hits[0] user.permission = new_permission user.save() print(f"用户 {username} 权限更新为 {new_permission}") return True return False except Exception as e: print(f"更新用户权限失败: {str(e)}") return False