523 lines
16 KiB
Python
523 lines
16 KiB
Python
"""
|
||
Django版本的ES连接和操作模块
|
||
迁移自Flask项目的ESConnect.py
|
||
"""
|
||
from elasticsearch import Elasticsearch
|
||
from elasticsearch_dsl import connections
|
||
import os
|
||
from .documents import AchievementDocument, UserDocument, GlobalDocument
|
||
from .indexes import ACHIEVEMENT_INDEX_NAME, USER_INDEX_NAME, GLOBAL_INDEX_NAME
|
||
import hashlib
|
||
import time
|
||
from datetime import datetime, timezone, timedelta
|
||
import threading
|
||
|
||
# 使用环境变量配置ES连接,默认为本机
|
||
_ES_URL = os.environ.get('ELASTICSEARCH_URL', 'http://localhost:9200')
|
||
if not (_ES_URL.startswith('http://') or _ES_URL.startswith('https://')):
|
||
_ES_URL = 'http://' + _ES_URL
|
||
connections.create_connection(hosts=[_ES_URL])
|
||
|
||
# 获取默认的ES客户端
|
||
es = connections.get_connection()
|
||
|
||
DATA_INDEX_NAME = ACHIEVEMENT_INDEX_NAME
|
||
USERS_INDEX_NAME = USER_INDEX_NAME
|
||
GLOBAL_TYPES_INDEX_NAME = GLOBAL_INDEX_NAME
|
||
|
||
def create_index_with_mapping():
|
||
"""创建索引和映射配置(仅当索引不存在时)"""
|
||
# 获取 Elasticsearch 客户端(与 Document 使用的客户端一致)
|
||
try:
|
||
# --- 1. 处理获奖数据索引 ---
|
||
if not es.indices.exists(index=DATA_INDEX_NAME):
|
||
AchievementDocument.init()
|
||
print(f"✅ 创建索引 {DATA_INDEX_NAME} 并设置映射")
|
||
else:
|
||
print(f"ℹ️ 索引 {DATA_INDEX_NAME} 已存在,跳过创建")
|
||
|
||
# --- 2. 处理用户索引 ---
|
||
if not es.indices.exists(index=USERS_INDEX_NAME):
|
||
UserDocument.init()
|
||
print(f"✅ 创建索引 {USERS_INDEX_NAME} 并设置映射")
|
||
else:
|
||
print(f"ℹ️ 索引 {USERS_INDEX_NAME} 已存在,跳过创建")
|
||
|
||
# --- 3. 处理全局类型索引 ---
|
||
if not es.indices.exists(index=GLOBAL_TYPES_INDEX_NAME):
|
||
GlobalDocument.init()
|
||
default_types = ['软著', '专利', '奖状']
|
||
doc = GlobalDocument(type_list=default_types)
|
||
doc.meta.id = 'types'
|
||
doc.save()
|
||
print(f"✅ 创建索引 {GLOBAL_TYPES_INDEX_NAME} 并写入默认类型")
|
||
else:
|
||
try:
|
||
GlobalDocument.get(id='types')
|
||
except Exception:
|
||
default_types = ['软著', '专利', '奖状']
|
||
doc = GlobalDocument(type_list=default_types)
|
||
doc.meta.id = 'types'
|
||
doc.save()
|
||
print("ℹ️ 全局类型文档缺失,已补充默认类型")
|
||
|
||
# --- 4. 创建默认管理员用户(可选:也可检查用户是否已存在)---
|
||
# 这里简单处理:每次初始化都写入(可能重复),建议加唯一性判断
|
||
admin_user = {
|
||
"user_id": 0,
|
||
"username": "admin",
|
||
"password": "admin", # ⚠️ 生产环境务必加密!
|
||
"permission": 0
|
||
}
|
||
# 可选:检查 admin 是否已存在(根据 user_id 或 username)
|
||
from elasticsearch_dsl import Search
|
||
s = Search(using=es, index=USERS_INDEX_NAME).query("match", username="admin")
|
||
if s.count() == 0:
|
||
write_user_data(admin_user)
|
||
print("✅ 默认管理员用户已创建")
|
||
else:
|
||
print("ℹ️ 默认管理员用户已存在,跳过创建")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 创建索引失败: {str(e)}")
|
||
# raise # 可选:在 AppConfig 中捕获,这里可以 re-raise 便于调试
|
||
|
||
def get_type_list():
|
||
try:
|
||
doc = GlobalDocument.get(id='types')
|
||
lst = [str(t).strip().strip(';') for t in (doc.type_list or [])]
|
||
return lst
|
||
except Exception:
|
||
return ['软著', '专利', '奖状']
|
||
|
||
def ensure_type_in_list(type_name: str):
|
||
if not type_name:
|
||
return False
|
||
norm = str(type_name).strip().strip(';')
|
||
try:
|
||
try:
|
||
doc = GlobalDocument.get(id='types')
|
||
cur = list(doc.type_list or [])
|
||
except Exception:
|
||
cur = ['软著', '专利', '奖状']
|
||
doc = GlobalDocument(type_list=cur)
|
||
doc.meta.id = 'types'
|
||
cur_sanitized = {str(t).strip().strip(';') for t in cur}
|
||
if norm not in cur_sanitized:
|
||
cur.append(norm)
|
||
doc.type_list = cur
|
||
doc.save()
|
||
return True
|
||
return False
|
||
except Exception:
|
||
return False
|
||
|
||
def get_doc_id(data):
|
||
"""
|
||
根据数据内容生成唯一ID(用于去重)
|
||
|
||
参数:
|
||
data (dict): 包含文档数据的字典
|
||
|
||
返回:
|
||
str: 基于数据内容生成的MD5哈希值作为唯一ID
|
||
"""
|
||
data_str = data.get('data', '')
|
||
image_str = data.get('image', '')
|
||
unique_str = f"{data_str}{image_str}"
|
||
return hashlib.md5(unique_str.encode('utf-8')).hexdigest()
|
||
|
||
def insert_data(data):
|
||
"""
|
||
向Elasticsearch插入数据
|
||
|
||
参数:
|
||
data (dict): 要插入的数据
|
||
|
||
返回:
|
||
bool: 插入成功返回True,失败返回False
|
||
"""
|
||
try:
|
||
achievement = AchievementDocument(
|
||
writer_id=data.get('writer_id', ''),
|
||
data=data.get('data', ''),
|
||
image=data.get('image', ''),
|
||
time=datetime.now(timezone.utc).isoformat()
|
||
)
|
||
achievement.save()
|
||
print(f"文档写入成功,内容: {data}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"文档写入失败: {str(e)}, 数据: {data}")
|
||
return False
|
||
|
||
def search_data(query):
|
||
"""
|
||
在Elasticsearch中搜索数据
|
||
|
||
参数:
|
||
query (str): 搜索关键词
|
||
|
||
返回:
|
||
list: 包含搜索结果的列表
|
||
"""
|
||
try:
|
||
# 使用Django-elasticsearch-dsl进行搜索
|
||
search = AchievementDocument.search()
|
||
search = search.query("multi_match", query=query, fields=['*'])
|
||
response = search.execute()
|
||
|
||
results = []
|
||
for hit in response:
|
||
results.append({
|
||
"_id": hit.meta.id,
|
||
"writer_id": hit.writer_id,
|
||
"data": hit.data,
|
||
"image": hit.image
|
||
})
|
||
|
||
return results
|
||
except Exception as e:
|
||
print(f"搜索失败: {str(e)}")
|
||
return []
|
||
|
||
def search_all():
|
||
"""获取所有文档"""
|
||
try:
|
||
search = AchievementDocument.search()
|
||
search = search.query("match_all")
|
||
response = search.execute()
|
||
|
||
results = []
|
||
for hit in response:
|
||
results.append({
|
||
"_id": hit.meta.id,
|
||
"writer_id": hit.writer_id,
|
||
"data": hit.data,
|
||
"image": hit.image
|
||
})
|
||
|
||
return results
|
||
except Exception as e:
|
||
print(f"获取所有文档失败: {str(e)}")
|
||
return []
|
||
|
||
def delete_by_id(doc_id):
|
||
"""
|
||
根据 doc_id 删除文档
|
||
|
||
参数:
|
||
doc_id (str): 要删除的文档ID
|
||
|
||
返回:
|
||
bool: 删除成功返回True,失败返回False
|
||
"""
|
||
try:
|
||
# 使用Django-elasticsearch-dsl删除文档
|
||
achievement = AchievementDocument.get(id=doc_id)
|
||
achievement.delete()
|
||
print(f"文档 {doc_id} 删除成功")
|
||
return True
|
||
except Exception as e:
|
||
print(f"删除失败: {str(e)}")
|
||
return False
|
||
|
||
def update_by_id(doc_id, updated_data):
|
||
"""
|
||
根据文档ID更新数据
|
||
|
||
参数:
|
||
doc_id (str): 要更新的文档ID
|
||
updated_data (dict): 更新的数据内容
|
||
|
||
返回:
|
||
bool: 更新成功返回True,失败返回False
|
||
"""
|
||
try:
|
||
# 获取文档
|
||
achievement = AchievementDocument.get(id=doc_id)
|
||
print(doc_id)
|
||
# 更新字段
|
||
if 'writer_id' in updated_data:
|
||
achievement.writer_id = updated_data['writer_id']
|
||
if 'data' in updated_data:
|
||
achievement.data = updated_data['data']
|
||
if 'image' in updated_data:
|
||
achievement.image = updated_data['image']
|
||
|
||
achievement.save()
|
||
print(f"文档 {doc_id} 更新成功")
|
||
return True
|
||
except Exception as e:
|
||
print(f"更新失败: {str(e)}")
|
||
return False
|
||
|
||
def get_by_id(doc_id):
|
||
"""
|
||
根据文档ID获取单个文档
|
||
|
||
参数:
|
||
doc_id (str): 要获取的文档ID
|
||
|
||
返回:
|
||
dict or None: 成功返回文档数据,失败返回None
|
||
"""
|
||
try:
|
||
achievement = AchievementDocument.get(id=doc_id)
|
||
return {
|
||
"_id": achievement.meta.id,
|
||
"writer_id": achievement.writer_id,
|
||
"data": achievement.data,
|
||
"image": achievement.image
|
||
}
|
||
except Exception as e:
|
||
print(f"获取文档失败: {str(e)}")
|
||
return None
|
||
|
||
def search_by_any_field(keyword):
|
||
"""
|
||
在任意字段中搜索关键词(支持模糊搜索)
|
||
|
||
参数:
|
||
keyword (str): 搜索关键词
|
||
|
||
返回:
|
||
list: 包含搜索结果的列表
|
||
"""
|
||
try:
|
||
search = AchievementDocument.search()
|
||
|
||
# 使用multi_match查询,在所有字段中搜索
|
||
search = search.query("multi_match",
|
||
query=keyword,
|
||
fields=['*'],
|
||
fuzziness="AUTO")
|
||
|
||
response = search.execute()
|
||
|
||
results = []
|
||
for hit in response:
|
||
results.append({
|
||
"_id": hit.meta.id,
|
||
"writer_id": hit.writer_id,
|
||
"data": hit.data,
|
||
"image": hit.image
|
||
})
|
||
|
||
return results
|
||
except Exception as e:
|
||
print(f"模糊搜索失败: {str(e)}")
|
||
return []
|
||
|
||
ANALYTICS_CACHE = {"data": None, "ts": 0}
|
||
|
||
def _compute_hist(range_gte: str, interval: str, fmt: str):
|
||
from elasticsearch_dsl import Search
|
||
s = AchievementDocument.search()
|
||
s = s.filter('range', time={'gte': range_gte, 'lte': 'now'})
|
||
s = s.extra(size=0)
|
||
s.aggs.bucket('b', 'date_histogram', field='time', calendar_interval=interval, format=fmt, min_doc_count=0)
|
||
resp = s.execute()
|
||
buckets = getattr(resp.aggs, 'b').buckets
|
||
return [{"label": b.key_as_string, "count": b.doc_count} for b in buckets]
|
||
|
||
def _compute_type_counts(range_gte: str, types: list):
|
||
counts = []
|
||
for t in types:
|
||
s = AchievementDocument.search()
|
||
s = s.filter('range', time={'gte': range_gte, 'lte': 'now'})
|
||
s = s.query('match_phrase', data=str(t))
|
||
total = s.count()
|
||
counts.append({"type": str(t), "count": int(total)})
|
||
return counts
|
||
|
||
def compute_analytics():
|
||
types = get_type_list()
|
||
days = _compute_hist('now-10d/d', 'day', 'yyyy-MM-dd')
|
||
weeks = _compute_hist('now-10w/w', 'week', 'yyyy-ww')
|
||
months = _compute_hist('now-10M/M', 'month', 'yyyy-MM')
|
||
pie_1m = _compute_type_counts('now-1M/M', types)
|
||
pie_12m = _compute_type_counts('now-12M/M', types)
|
||
return {
|
||
"last_10_days": days[-10:],
|
||
"last_10_weeks": weeks[-10:],
|
||
"last_10_months": months[-10:],
|
||
"type_pie_1m": pie_1m,
|
||
"type_pie_12m": pie_12m,
|
||
}
|
||
|
||
def get_analytics_overview(force: bool = False):
|
||
now_ts = time.time()
|
||
if force or ANALYTICS_CACHE["data"] is None or (now_ts - ANALYTICS_CACHE["ts"]) > 3600:
|
||
ANALYTICS_CACHE["data"] = compute_analytics()
|
||
ANALYTICS_CACHE["ts"] = now_ts
|
||
return ANALYTICS_CACHE["data"]
|
||
|
||
def _seconds_until_hour(h: int):
|
||
now = datetime.now()
|
||
tgt = now.replace(hour=h, minute=0, second=0, microsecond=0)
|
||
if tgt <= now:
|
||
tgt = tgt + timedelta(days=1)
|
||
return max(0, int((tgt - now).total_seconds()))
|
||
|
||
def start_daily_analytics_scheduler():
|
||
def _run_and_reschedule():
|
||
try:
|
||
get_analytics_overview(force=True)
|
||
except Exception as e:
|
||
print(f"分析任务失败: {e}")
|
||
finally:
|
||
threading.Timer(24 * 3600, _run_and_reschedule).start()
|
||
delay = _seconds_until_hour(3)
|
||
threading.Timer(delay, _run_and_reschedule).start()
|
||
|
||
def write_user_data(user_data):
|
||
"""
|
||
写入用户数据到 ES
|
||
|
||
参数:
|
||
user_data (dict): 用户数据
|
||
|
||
返回:
|
||
bool: 写入成功返回True,失败返回False
|
||
"""
|
||
try:
|
||
# enforce integer permission
|
||
try:
|
||
perm_val = int(user_data.get('permission', 1))
|
||
except Exception:
|
||
perm_val = 1
|
||
user = UserDocument(
|
||
user_id=user_data.get('user_id'),
|
||
username=user_data.get('username'),
|
||
password=user_data.get('password'),
|
||
permission=perm_val
|
||
)
|
||
user.save()
|
||
print(f"用户数据写入成功: {user_data.get('username')}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"用户数据写入失败: {str(e)}")
|
||
return False
|
||
|
||
def get_user_by_id(user_id):
|
||
try:
|
||
search = UserDocument.search()
|
||
search = search.query("term", user_id=user_id)
|
||
response = search.execute()
|
||
|
||
if response.hits:
|
||
hit = response.hits[0]
|
||
return {
|
||
"user_id": hit.user_id,
|
||
"username": hit.username,
|
||
"password": hit.password,
|
||
"permission": hit.permission
|
||
}
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"获取用户数据失败: {str(e)}")
|
||
return None
|
||
|
||
def get_user_by_username(username):
|
||
"""
|
||
根据用户名获取用户数据
|
||
|
||
参数:
|
||
username (str): 用户名
|
||
|
||
返回:
|
||
dict or None: 用户数据或None
|
||
"""
|
||
try:
|
||
search = UserDocument.search()
|
||
search = search.query("term", username=username)
|
||
response = search.execute()
|
||
|
||
if response.hits:
|
||
hit = response.hits[0]
|
||
return {
|
||
"user_id": hit.user_id,
|
||
"username": hit.username,
|
||
"password": hit.password,
|
||
"permission": int(hit.permission)
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
print(f"获取用户数据失败: {str(e)}")
|
||
return None
|
||
|
||
def get_all_users():
|
||
"""获取所有用户"""
|
||
try:
|
||
search = UserDocument.search()
|
||
search = search.query("match_all")
|
||
response = search.execute()
|
||
|
||
users = []
|
||
for hit in response:
|
||
users.append({
|
||
"user_id": hit.user_id,
|
||
"username": hit.username,
|
||
"permission": int(hit.permission)
|
||
})
|
||
|
||
return users
|
||
except Exception as e:
|
||
print(f"获取所有用户失败: {str(e)}")
|
||
return []
|
||
|
||
def get_user_by_id(user_id):
|
||
try:
|
||
search = UserDocument.search()
|
||
search = search.query("term", user_id=int(user_id))
|
||
response = search.execute()
|
||
if response.hits:
|
||
hit = response.hits[0]
|
||
return {
|
||
"user_id": hit.user_id,
|
||
"username": hit.username,
|
||
"permission": int(hit.permission),
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
print(f"获取用户数据失败: {str(e)}")
|
||
return None
|
||
|
||
def delete_user_by_id(user_id):
|
||
try:
|
||
search = UserDocument.search()
|
||
search = search.query("term", user_id=int(user_id))
|
||
response = search.execute()
|
||
if response.hits:
|
||
hit = response.hits[0]
|
||
doc = UserDocument.get(id=hit.meta.id)
|
||
doc.delete()
|
||
return True
|
||
return False
|
||
except Exception as e:
|
||
print(f"删除用户失败: {str(e)}")
|
||
return False
|
||
|
||
def update_user_by_id(user_id, username=None, permission=None, password=None):
|
||
try:
|
||
search = UserDocument.search()
|
||
search = search.query("term", user_id=int(user_id))
|
||
response = search.execute()
|
||
if response.hits:
|
||
hit = response.hits[0]
|
||
doc = UserDocument.get(id=hit.meta.id)
|
||
if username is not None:
|
||
doc.username = username
|
||
if permission is not None:
|
||
doc.permission = int(permission)
|
||
if password is not None:
|
||
doc.password = password
|
||
doc.save()
|
||
return True
|
||
return False
|
||
except Exception as e:
|
||
print(f"更新用户失败: {str(e)}")
|
||
return False
|