Files
Achievement_Inputing/elastic/es_connect.py
2025-11-13 21:58:52 +08:00

380 lines
11 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Django版本的ES连接和操作模块
迁移自Flask项目的ESConnect.py
"""
from elasticsearch import Elasticsearch
from elasticsearch_dsl import connections
from .documents import AchievementDocument, UserDocument
from .indexes import ACHIEVEMENT_INDEX_NAME, USER_INDEX_NAME
import hashlib
import time
# 使用Django的ES连接配置
connections.create_connection(hosts=['localhost:9200'])
# 获取默认的ES客户端
es = connections.get_connection()
DATA_INDEX_NAME = ACHIEVEMENT_INDEX_NAME
USERS_INDEX_NAME = USER_INDEX_NAME
def create_index_with_mapping():
"""创建索引和映射配置(仅当索引不存在时)"""
# 获取 Elasticsearch 客户端(与 Document 使用的客户端一致)
try:
# --- 1. 处理获奖数据索引 ---
if not es.indices.exists(index=DATA_INDEX_NAME):
AchievementDocument.init()
print(f"✅ 创建索引 {DATA_INDEX_NAME} 并设置映射")
else:
print(f" 索引 {DATA_INDEX_NAME} 已存在,跳过创建")
# --- 2. 处理用户索引 ---
if not es.indices.exists(index=USERS_INDEX_NAME):
UserDocument.init()
print(f"✅ 创建索引 {USERS_INDEX_NAME} 并设置映射")
else:
print(f" 索引 {USERS_INDEX_NAME} 已存在,跳过创建")
# --- 3. 创建默认管理员用户(可选:也可检查用户是否已存在)---
# 这里简单处理:每次初始化都写入(可能重复),建议加唯一性判断
admin_user = {
"user_id": 0,
"username": "admin",
"password": "admin", # ⚠️ 生产环境务必加密!
"permission": 0
}
# 可选:检查 admin 是否已存在(根据 user_id 或 username
from elasticsearch_dsl import Search
s = Search(using=es, index=USERS_INDEX_NAME).query("match", username="admin")
if s.count() == 0:
write_user_data(admin_user)
print("✅ 默认管理员用户已创建")
else:
print(" 默认管理员用户已存在,跳过创建")
except Exception as e:
print(f"❌ 创建索引失败: {str(e)}")
# raise # 可选:在 AppConfig 中捕获,这里可以 re-raise 便于调试
def get_doc_id(data):
"""
根据数据内容生成唯一ID用于去重
参数:
data (dict): 包含文档数据的字典
返回:
str: 基于数据内容生成的MD5哈希值作为唯一ID
"""
data_str = data.get('data', '')
image_str = data.get('image', '')
unique_str = f"{data_str}{image_str}"
return hashlib.md5(unique_str.encode('utf-8')).hexdigest()
def insert_data(data):
"""
向Elasticsearch插入数据
参数:
data (dict): 要插入的数据
返回:
bool: 插入成功返回True失败返回False
"""
try:
# 使用Django-elasticsearch-dsl的方式插入数据
achievement = AchievementDocument(
writer_id=data.get('writer_id', ''),
data=data.get('data', ''),
image=data.get('image', '')
)
achievement.save()
print(f"文档写入成功,内容: {data}")
return True
except Exception as e:
print(f"文档写入失败: {str(e)}, 数据: {data}")
return False
def search_data(query):
"""
在Elasticsearch中搜索数据
参数:
query (str): 搜索关键词
返回:
list: 包含搜索结果的列表
"""
try:
# 使用Django-elasticsearch-dsl进行搜索
search = AchievementDocument.search()
search = search.query("multi_match", query=query, fields=['*'])
response = search.execute()
results = []
for hit in response:
results.append({
"_id": hit.meta.id,
"writer_id": hit.writer_id,
"data": hit.data,
"image": hit.image
})
return results
except Exception as e:
print(f"搜索失败: {str(e)}")
return []
def search_all():
"""获取所有文档"""
try:
search = AchievementDocument.search()
search = search.query("match_all")
response = search.execute()
results = []
for hit in response:
results.append({
"_id": hit.meta.id,
"writer_id": hit.writer_id,
"data": hit.data,
"image": hit.image
})
return results
except Exception as e:
print(f"获取所有文档失败: {str(e)}")
return []
def delete_by_id(doc_id):
"""
根据 doc_id 删除文档
参数:
doc_id (str): 要删除的文档ID
返回:
bool: 删除成功返回True失败返回False
"""
try:
# 使用Django-elasticsearch-dsl删除文档
achievement = AchievementDocument.get(id=doc_id)
achievement.delete()
print(f"文档 {doc_id} 删除成功")
return True
except Exception as e:
print(f"删除失败: {str(e)}")
return False
def update_by_id(doc_id, updated_data):
"""
根据文档ID更新数据
参数:
doc_id (str): 要更新的文档ID
updated_data (dict): 更新的数据内容
返回:
bool: 更新成功返回True失败返回False
"""
try:
# 获取文档
achievement = AchievementDocument.get(id=doc_id)
# 更新字段
if 'writer_id' in updated_data:
achievement.writer_id = updated_data['writer_id']
if 'data' in updated_data:
achievement.data = updated_data['data']
if 'image' in updated_data:
achievement.image = updated_data['image']
achievement.save()
print(f"文档 {doc_id} 更新成功")
return True
except Exception as e:
print(f"更新失败: {str(e)}")
return False
def get_by_id(doc_id):
"""
根据文档ID获取单个文档
参数:
doc_id (str): 要获取的文档ID
返回:
dict or None: 成功返回文档数据失败返回None
"""
try:
achievement = AchievementDocument.get(id=doc_id)
return {
"_id": achievement.meta.id,
"writer_id": achievement.writer_id,
"data": achievement.data,
"image": achievement.image
}
except Exception as e:
print(f"获取文档失败: {str(e)}")
return None
def search_by_any_field(keyword):
"""
在任意字段中搜索关键词(支持模糊搜索)
参数:
keyword (str): 搜索关键词
返回:
list: 包含搜索结果的列表
"""
try:
search = AchievementDocument.search()
# 使用multi_match查询在所有字段中搜索
search = search.query("multi_match",
query=keyword,
fields=['*'],
fuzziness="AUTO")
response = search.execute()
results = []
for hit in response:
results.append({
"_id": hit.meta.id,
"writer_id": hit.writer_id,
"data": hit.data,
"image": hit.image
})
return results
except Exception as e:
print(f"模糊搜索失败: {str(e)}")
return []
def write_user_data(user_data):
"""
写入用户数据到 ES
参数:
user_data (dict): 用户数据
返回:
bool: 写入成功返回True失败返回False
"""
try:
user = UserDocument(
user_id=user_data.get('user_id'),
username=user_data.get('username'),
password=user_data.get('password'),
permission=user_data.get('permission', 1)
)
user.save()
print(f"用户数据写入成功: {user_data.get('username')}")
return True
except Exception as e:
print(f"用户数据写入失败: {str(e)}")
return False
def get_user_by_username(username):
"""
根据用户名获取用户数据
参数:
username (str): 用户名
返回:
dict or None: 用户数据或None
"""
try:
search = UserDocument.search()
search = search.query("term", username=username)
response = search.execute()
if response.hits:
hit = response.hits[0]
return {
"user_id": hit.user_id,
"username": hit.username,
"password": hit.password,
"permission": hit.permission
}
return None
except Exception as e:
print(f"获取用户数据失败: {str(e)}")
return None
def get_all_users():
"""获取所有用户"""
try:
search = UserDocument.search()
search = search.query("match_all")
response = search.execute()
users = []
for hit in response:
users.append({
"user_id": hit.user_id,
"username": hit.username,
"permission": hit.permission
})
return users
except Exception as e:
print(f"获取所有用户失败: {str(e)}")
return []
def delete_user_by_username(username):
"""
根据用户名删除用户
参数:
username (str): 用户名
返回:
bool: 删除成功返回True失败返回False
"""
try:
search = UserDocument.search()
search = search.query("term", username=username)
response = search.execute()
if response.hits:
user = response.hits[0]
user.delete()
print(f"用户 {username} 删除成功")
return True
return False
except Exception as e:
print(f"删除用户失败: {str(e)}")
return False
def update_user_permission(username, new_permission):
"""
更新用户权限
参数:
username (str): 用户名
new_permission (int): 新权限级别
返回:
bool: 更新成功返回True失败返回False
"""
try:
search = UserDocument.search()
search = search.query("term", username=username)
response = search.execute()
if response.hits:
user = response.hits[0]
user.permission = new_permission
user.save()
print(f"用户 {username} 权限更新为 {new_permission}")
return True
return False
except Exception as e:
print(f"更新用户权限失败: {str(e)}")
return False