Files
Achievement_Inputing/elastic/es_connect.py

801 lines
25 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Django版本的ES连接和操作模块
迁移自Flask项目的ESConnect.py
"""
from elasticsearch import Elasticsearch
from elasticsearch_dsl import connections
import os
from .documents import AchievementDocument, UserDocument, GlobalDocument, RegistrationCodeDocument
from accounts.crypto import hash_password_random_salt
from .indexes import ACHIEVEMENT_INDEX_NAME, USER_INDEX_NAME, GLOBAL_INDEX_NAME
import hashlib
import time
from datetime import datetime, timezone, timedelta
import uuid
import json
# 使用环境变量配置ES连接默认为本机
_ES_URL = os.environ.get('ELASTICSEARCH_URL', 'http://localhost:9200')
if not (_ES_URL.startswith('http://') or _ES_URL.startswith('https://')):
_ES_URL = 'http://' + _ES_URL
connections.create_connection(hosts=[_ES_URL])
# 获取默认的ES客户端
es = connections.get_connection()
DATA_INDEX_NAME = ACHIEVEMENT_INDEX_NAME
USERS_INDEX_NAME = USER_INDEX_NAME
GLOBAL_TYPES_INDEX_NAME = GLOBAL_INDEX_NAME
def create_index_with_mapping():
"""创建索引和映射配置(仅当索引不存在时)"""
# 获取 Elasticsearch 客户端(与 Document 使用的客户端一致)
try:
# --- 1. 处理获奖数据索引 ---
if not es.indices.exists(index=DATA_INDEX_NAME):
AchievementDocument.init()
print(f"✅ 创建索引 {DATA_INDEX_NAME} 并设置映射")
else:
print(f" 索引 {DATA_INDEX_NAME} 已存在,跳过创建")
# --- 2. 处理用户索引 ---
if not es.indices.exists(index=USERS_INDEX_NAME):
UserDocument.init()
print(f"✅ 创建索引 {USERS_INDEX_NAME} 并设置映射")
else:
print(f" 索引 {USERS_INDEX_NAME} 已存在,跳过创建")
# --- 3. 处理全局类型索引 ---
if not es.indices.exists(index=GLOBAL_TYPES_INDEX_NAME):
GlobalDocument.init()
default_types = ['软著', '专利', '奖状']
doc = GlobalDocument(type_list=default_types)
doc.meta.id = 'types'
doc.save()
print(f"✅ 创建索引 {GLOBAL_TYPES_INDEX_NAME} 并写入默认类型")
else:
try:
GlobalDocument.get(id='types')
except Exception:
default_types = ['软著', '专利', '奖状']
doc = GlobalDocument(type_list=default_types)
doc.meta.id = 'types'
doc.save()
print(" 全局类型文档缺失,已补充默认类型")
# --- 4. 创建默认管理员用户(可选:也可检查用户是否已存在)---
# 这里简单处理:每次初始化都写入(可能重复),建议加唯一性判断
_salt_b64, _hash_b64 = hash_password_random_salt("admin")
admin_user = {
"user_id": 0,
"username": "admin",
"password_hash": _hash_b64,
"password_salt": _salt_b64,
"permission": 0
}
# 可选:检查 admin 是否已存在(根据 user_id 或 username
from elasticsearch_dsl import Search
s = Search(using=es, index=USERS_INDEX_NAME).query("match", username="admin")
if s.count() == 0:
write_user_data(admin_user)
print("✅ 默认管理员用户已创建")
else:
print(" 默认管理员用户已存在,跳过创建")
except Exception as e:
print(f"❌ 创建索引失败: {str(e)}")
# raise # 可选:在 AppConfig 中捕获,这里可以 re-raise 便于调试
def get_type_list():
try:
doc = GlobalDocument.get(id='types')
lst = [str(t).strip().strip(';') for t in (doc.type_list or [])]
return lst
except Exception:
return ['软著', '专利', '奖状']
def ensure_type_in_list(type_name: str):
if not type_name:
return False
norm = str(type_name).strip().strip(';')
try:
try:
doc = GlobalDocument.get(id='types')
cur = list(doc.type_list or [])
except Exception:
cur = ['软著', '专利', '奖状']
doc = GlobalDocument(type_list=cur)
doc.meta.id = 'types'
cur_sanitized = {str(t).strip().strip(';') for t in cur}
if norm not in cur_sanitized:
cur.append(norm)
doc.type_list = cur
doc.save()
return True
return False
except Exception:
return False
def get_keys_list():
try:
try:
doc = GlobalDocument.get(id='keys')
cur = list(doc.keys_list or [])
except Exception:
cur = []
doc = GlobalDocument(keys_list=cur)
doc.meta.id = 'keys'
doc.save()
return [str(t).strip().strip(';') for t in cur]
except Exception:
return []
def ensure_key_in_list(key_name: str):
if not key_name:
return False
norm = str(key_name).strip().strip(';')
try:
try:
doc = GlobalDocument.get(id='keys')
cur = list(doc.keys_list or [])
except Exception:
cur = []
doc = GlobalDocument(keys_list=cur)
doc.meta.id = 'keys'
cur_sanitized = {str(t).strip().strip(';') for t in cur}
if norm not in cur_sanitized:
cur.append(norm)
doc.keys_list = cur
doc.save()
return True
return False
except Exception:
return False
def generate_registration_code(keys=None, manage_keys=None, expires_in_days: int = 30, created_by: int = None):
try:
keys = list(keys or [])
manage_keys = list(manage_keys or [])
for k in list(keys):
ensure_key_in_list(k)
for mk in list(manage_keys):
ensure_key_in_list(mk)
code = uuid.uuid4().hex + str(int(time.time()))[-6:]
now = datetime.now(timezone.utc)
expires = now + timedelta(days=max(1, int(expires_in_days or 30)))
doc = RegistrationCodeDocument(
code=code,
keys=keys,
manage_keys=manage_keys,
created_at=now.isoformat(),
expires_at=expires.isoformat(),
created_by=created_by,
)
doc.meta.id = code
doc.save()
return {
"code": code,
"keys": keys,
"manage_keys": manage_keys,
"created_at": now.isoformat(),
"expires_at": expires.isoformat(),
}
except Exception as e:
return None
def get_registration_code(code: str):
try:
doc = RegistrationCodeDocument.get(id=str(code))
return {
"code": getattr(doc, 'code', str(code)),
"keys": list(getattr(doc, 'keys', []) or []),
"manage_keys": list(getattr(doc, 'manage_keys', []) or []),
"created_at": getattr(doc, 'created_at', None),
"expires_at": getattr(doc, 'expires_at', None),
"created_by": getattr(doc, 'created_by', None),
}
except Exception:
return None
def list_registration_codes():
try:
search = RegistrationCodeDocument.search()
body = {
"sort": [{"created_at": {"order": "desc"}}],
"query": {"exists": {"field": "code"}}
}
search = search.update_from_dict(body)
resp = search.execute()
out = []
now = datetime.now(timezone.utc)
for hit in resp:
try:
if not getattr(hit, 'code', None):
continue
except Exception:
continue
exp = getattr(hit, 'expires_at', None)
try:
if hasattr(exp, 'isoformat'):
exp_dt = exp
else:
exp_dt = datetime.fromisoformat(str(exp))
except Exception:
exp_dt = None
active = bool(exp_dt and exp_dt > now)
out.append({
"code": getattr(hit, 'code', ''),
"keys": list(getattr(hit, 'keys', []) or []),
"manage_keys": list(getattr(hit, 'manage_keys', []) or []),
"created_at": getattr(hit, 'created_at', None),
"expires_at": getattr(hit, 'expires_at', None),
"created_by": getattr(hit, 'created_by', None),
"active": active,
})
return out
except Exception:
return []
def revoke_registration_code(code: str):
try:
doc = RegistrationCodeDocument.get(id=str(code))
now = datetime.now(timezone.utc).isoformat()
doc.expires_at = now
doc.save()
return True
except Exception:
return False
def get_doc_id(data):
"""
根据数据内容生成唯一ID用于去重
参数:
data (dict): 包含文档数据的字典
返回:
str: 基于数据内容生成的MD5哈希值作为唯一ID
"""
data_str = data.get('data', '')
image_str = data.get('image', '')
unique_str = f"{data_str}{image_str}"
return hashlib.md5(unique_str.encode('utf-8')).hexdigest()
def insert_data(data):
"""
向Elasticsearch插入数据
参数:
data (dict): 要插入的数据
返回:
bool: 插入成功返回True失败返回False
"""
try:
achievement = AchievementDocument(
writer_id=data.get('writer_id', ''),
data=data.get('data', ''),
image=data.get('image', ''),
time=datetime.now(timezone.utc).isoformat()
)
achievement.save()
print(f"文档写入成功,内容: {data}")
return True
except Exception as e:
print(f"文档写入失败: {str(e)}, 数据: {data}")
return False
def search_data(query):
"""
在Elasticsearch中搜索数据
参数:
query (str): 搜索关键词
返回:
list: 包含搜索结果的列表
"""
try:
# 使用Django-elasticsearch-dsl进行搜索
search = AchievementDocument.search()
search = search.query("multi_match", query=query, fields=['*'])
response = search.execute()
results = []
for hit in response:
results.append({
"_id": hit.meta.id,
"writer_id": hit.writer_id,
"data": hit.data,
"image": hit.image
})
return results
except Exception as e:
print(f"搜索失败: {str(e)}")
return []
def search_all():
"""获取所有文档"""
try:
search = AchievementDocument.search()
search = search.query("match_all")
response = search.execute()
results = []
for hit in response:
results.append({
"_id": hit.meta.id,
"writer_id": hit.writer_id,
"data": hit.data,
"image": hit.image
})
return results
except Exception as e:
print(f"获取所有文档失败: {str(e)}")
return []
def delete_by_id(doc_id):
"""
根据 doc_id 删除文档
参数:
doc_id (str): 要删除的文档ID
返回:
bool: 删除成功返回True失败返回False
"""
try:
# 使用Django-elasticsearch-dsl删除文档
achievement = AchievementDocument.get(id=doc_id)
achievement.delete()
print(f"文档 {doc_id} 删除成功")
return True
except Exception as e:
print(f"删除失败: {str(e)}")
return False
def update_by_id(doc_id, updated_data):
"""
根据文档ID更新数据
参数:
doc_id (str): 要更新的文档ID
updated_data (dict): 更新的数据内容
返回:
bool: 更新成功返回True失败返回False
"""
try:
# 获取文档
achievement = AchievementDocument.get(id=doc_id)
print(doc_id)
# 更新字段
if 'writer_id' in updated_data:
achievement.writer_id = updated_data['writer_id']
if 'data' in updated_data:
achievement.data = updated_data['data']
if 'image' in updated_data:
achievement.image = updated_data['image']
achievement.save()
print(f"文档 {doc_id} 更新成功")
return True
except Exception as e:
print(f"更新失败: {str(e)}")
return False
def get_by_id(doc_id):
"""
根据文档ID获取单个文档
参数:
doc_id (str): 要获取的文档ID
返回:
dict or None: 成功返回文档数据失败返回None
"""
try:
achievement = AchievementDocument.get(id=doc_id)
return {
"_id": achievement.meta.id,
"writer_id": achievement.writer_id,
"data": achievement.data,
"image": achievement.image
}
except Exception as e:
print(f"获取文档失败: {str(e)}")
return None
def search_by_any_field(keyword):
"""
在任意字段中搜索关键词(支持模糊搜索)
参数:
keyword (str): 搜索关键词
返回:
list: 包含搜索结果的列表
"""
try:
search = AchievementDocument.search()
# 使用multi_match查询在所有字段中搜索
search = search.query("multi_match",
query=keyword,
fields=['*'],
fuzziness="AUTO")
response = search.execute()
results = []
for hit in response:
results.append({
"_id": hit.meta.id,
"writer_id": hit.writer_id,
"data": hit.data,
"image": hit.image
})
return results
except Exception as e:
print(f"模糊搜索失败: {str(e)}")
return []
def _type_filters_from_list(limit: int = None):
try:
types = get_type_list()
except Exception:
types = ['软著', '专利', '奖状']
if isinstance(limit, int) and limit > 0:
types = types[:limit]
filters = {}
for t in types:
key = str(t)
# 精确匹配键与值之间的关系,避免其它字段中的同名值造成误匹配
pattern = f'*"数据类型": "{key}"*'
filters[key] = {"wildcard": {"data.keyword": {"value": pattern}}}
return filters
def analytics_trend(gte: str = None, lte: str = None, interval: str = "day"):
try:
search = AchievementDocument.search()
body = {
"size": 0,
"aggs": {
"trend": {
"date_histogram": {
"field": "time",
"calendar_interval": interval,
"min_doc_count": 0
}
}
}
}
if gte or lte:
rng = {}
if gte:
rng["gte"] = gte
if lte:
rng["lte"] = lte
body["query"] = {"range": {"time": rng}}
search = search.update_from_dict(body)
resp = search.execute()
buckets = resp.aggregations.trend.buckets if hasattr(resp, 'aggregations') else []
return [{"key_as_string": b.key_as_string, "key": b.key, "doc_count": b.doc_count} for b in buckets]
except Exception as e:
print(f"分析趋势失败: {str(e)}")
return []
def analytics_types(gte: str = None, lte: str = None, size: int = 10):
try:
filters = _type_filters_from_list(limit=size)
body = {
"size": 0,
"aggs": {
"by_type": {
"filters": {
"filters": filters
}
}
}
}
if gte or lte:
rng = {}
if gte:
rng["gte"] = gte
if lte:
rng["lte"] = lte
body["query"] = {"range": {"time": rng}}
resp = es.search(index=DATA_INDEX_NAME, body=body)
buckets = resp.get("aggregations", {}).get("by_type", {}).get("buckets", {})
out = []
for k, v in buckets.items():
try:
out.append({"key": k, "doc_count": int(v.get("doc_count", 0))})
except Exception:
out.append({"key": str(k), "doc_count": 0})
return out
except Exception as e:
print(f"分析类型占比失败: {str(e)}")
return []
def analytics_types_trend(gte: str = None, lte: str = None, interval: str = "week", size: int = 8):
try:
filters = _type_filters_from_list(limit=size)
body = {
"size": 0,
"aggs": {
"by_interval": {
"date_histogram": {
"field": "time",
"calendar_interval": interval,
"min_doc_count": 0
},
"aggs": {
"by_type": {
"filters": {"filters": filters}
}
}
}
}
}
if gte or lte:
rng = {}
if gte:
rng["gte"] = gte
if lte:
rng["lte"] = lte
body["query"] = {"range": {"time": rng}}
resp = es.search(index=DATA_INDEX_NAME, body=body)
by_interval = resp.get("aggregations", {}).get("by_interval", {}).get("buckets", [])
out = []
for ib in by_interval:
t_buckets = ib.get("by_type", {}).get("buckets", {})
types_arr = []
for k, v in t_buckets.items():
types_arr.append({"key": k, "doc_count": int(v.get("doc_count", 0))})
out.append({
"key_as_string": ib.get("key_as_string"),
"key": ib.get("key"),
"doc_count": ib.get("doc_count", 0),
"types": types_arr
})
return out
except Exception as e:
print(f"分析类型变化失败: {str(e)}")
return []
def analytics_recent(limit: int = 10, gte: str = None, lte: str = None):
try:
def _extract_type(s: str):
if not s:
return ""
try:
obj = json.loads(s)
if isinstance(obj, dict):
v = obj.get("数据类型")
if isinstance(v, str) and v:
return v
except Exception:
pass
try:
m = re.search(r'"数据类型"\s*:\s*"([^"]+)"', s)
if m:
return m.group(1)
except Exception:
pass
return ""
search = AchievementDocument.search()
body = {
"size": max(1, min(limit, 100)),
"sort": [{"time": {"order": "desc"}}]
}
if gte or lte:
rng = {}
if gte:
rng["gte"] = gte
if lte:
rng["lte"] = lte
body["query"] = {"range": {"time": rng}}
search = search.update_from_dict(body)
resp = search.execute()
results = []
for hit in resp:
w = getattr(hit, 'writer_id', '')
uname = None
try:
uname_lookup = get_user_by_id(w)
uname = (uname_lookup or {}).get("username")
except Exception:
uname = None
if not uname:
try:
uname_lookup = get_user_by_id(int(w))
uname = (uname_lookup or {}).get("username")
except Exception:
uname = None
tval = _extract_type(getattr(hit, 'data', ''))
results.append({
"_id": hit.meta.id,
"writer_id": w,
"username": uname or "",
"type": tval or "",
"time": getattr(hit, 'time', None)
})
return results
except Exception as e:
print(f"获取最近活动失败: {str(e)}")
return []
def write_user_data(user_data):
"""
写入用户数据到 ES
参数:
user_data (dict): 用户数据
返回:
bool: 写入成功返回True失败返回False
"""
try:
# enforce integer permission
try:
perm_val = int(user_data.get('permission', 1))
except Exception:
perm_val = 1
pwd = str(user_data.get('password') or '').strip()
pwd_hash_b64 = user_data.get('password_hash')
pwd_salt_b64 = user_data.get('password_salt')
if pwd:
salt_b64, hash_b64 = hash_password_random_salt(pwd)
pwd_hash_b64, pwd_salt_b64 = hash_b64, salt_b64
user = UserDocument(
user_id=user_data.get('user_id'),
username=user_data.get('username'),
password_hash=pwd_hash_b64,
password_salt=pwd_salt_b64,
permission=perm_val,
email=user_data.get('email'),
key=list(user_data.get('key') or []),
manage_key=list(user_data.get('manage_key') or []),
)
user.save()
print(f"用户数据写入成功: {user_data.get('username')}")
return True
except Exception as e:
print(f"用户数据写入失败: {str(e)}")
return False
def get_user_by_id(user_id):
try:
search = UserDocument.search()
search = search.query("term", user_id=user_id)
response = search.execute()
if response.hits:
hit = response.hits[0]
return {
"user_id": hit.user_id,
"username": hit.username,
"permission": hit.permission
}
return None
except Exception as e:
print(f"获取用户数据失败: {str(e)}")
return None
def get_user_by_username(username):
"""
根据用户名获取用户数据
参数:
username (str): 用户名
返回:
dict or None: 用户数据或None
"""
try:
search = UserDocument.search()
search = search.query("term", username=username)
response = search.execute()
if response.hits:
hit = response.hits[0]
return {
"user_id": hit.user_id,
"username": hit.username,
"password_hash": getattr(hit, 'password_hash', None),
"password_salt": getattr(hit, 'password_salt', None),
"permission": int(hit.permission)
}
return None
except Exception as e:
print(f"获取用户数据失败: {str(e)}")
return None
def get_all_users():
"""获取所有用户"""
try:
search = UserDocument.search()
search = search.query("match_all")
response = search.execute()
users = []
for hit in response:
users.append({
"user_id": hit.user_id,
"username": hit.username,
"permission": int(hit.permission),
"email": getattr(hit, 'email', None),
"key": list(getattr(hit, 'key', []) or []),
"manage_key": list(getattr(hit, 'manage_key', []) or []),
})
return users
except Exception as e:
print(f"获取所有用户失败: {str(e)}")
return []
def get_user_by_id(user_id):
try:
search = UserDocument.search()
search = search.query("term", user_id=int(user_id))
response = search.execute()
if response.hits:
hit = response.hits[0]
return {
"user_id": hit.user_id,
"username": hit.username,
"permission": int(hit.permission),
"email": getattr(hit, 'email', None),
"key": list(getattr(hit, 'key', []) or []),
"manage_key": list(getattr(hit, 'manage_key', []) or []),
}
return None
except Exception as e:
print(f"获取用户数据失败: {str(e)}")
return None
def delete_user_by_id(user_id):
try:
search = UserDocument.search()
search = search.query("term", user_id=int(user_id))
response = search.execute()
if response.hits:
hit = response.hits[0]
doc = UserDocument.get(id=hit.meta.id)
doc.delete()
return True
return False
except Exception as e:
print(f"删除用户失败: {str(e)}")
return False
def update_user_by_id(user_id, username=None, permission=None, password=None):
try:
search = UserDocument.search()
search = search.query("term", user_id=int(user_id))
response = search.execute()
if response.hits:
hit = response.hits[0]
doc = UserDocument.get(id=hit.meta.id)
if username is not None:
doc.username = username
if permission is not None:
doc.permission = int(permission)
if password is not None:
salt_b64, hash_b64 = hash_password_random_salt(str(password))
doc.password_hash = hash_b64
doc.password_salt = salt_b64
doc.save()
return True
return False
except Exception as e:
print(f"更新用户失败: {str(e)}")
return False