新增“数据编辑”

This commit is contained in:
2025-11-10 09:32:08 +08:00
parent 61b1d93718
commit f3aec9a18d
11 changed files with 681 additions and 0 deletions

0
elastic/__init__.py Normal file
View File

3
elastic/admin.py Normal file
View File

@@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

6
elastic/apps.py Normal file
View File

@@ -0,0 +1,6 @@
from django.apps import AppConfig
class ElasticConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "elastic"

53
elastic/documents.py Normal file
View File

@@ -0,0 +1,53 @@
from django_elasticsearch_dsl import Document, fields, Index
from .models import AchievementData, User, ElasticNews
# 获奖数据索引 - 对应Flask项目中的wordsearch266666
ACHIEVEMENT_INDEX = Index('wordsearch266666')
ACHIEVEMENT_INDEX.settings(number_of_shards=1, number_of_replicas=0)
# 用户数据索引 - 对应Flask项目中的users
USER_INDEX = Index('users')
USER_INDEX.settings(number_of_shards=1, number_of_replicas=0)
# 新闻索引 - 保留原有功能
NEWS_INDEX = Index('elastic_news')
NEWS_INDEX.settings(number_of_shards=1, number_of_replicas=0)
@ACHIEVEMENT_INDEX.doc_type
class AchievementDocument(Document):
"""获奖数据文档映射"""
writer_id = fields.TextField(fields={'keyword': {'type': 'keyword'}})
data = fields.TextField(
analyzer='ik_max_word',
search_analyzer='ik_smart',
fields={'keyword': {'type': 'keyword'}}
)
image = fields.KeywordField()
class Django:
model = AchievementData
# fields列表应该只包含需要特殊处理的字段或者可以完全省略
# 因为我们已经显式定义了所有字段
@USER_INDEX.doc_type
class UserDocument(Document):
"""用户数据文档映射"""
user_id = fields.LongField()
username = fields.KeywordField()
password = fields.KeywordField()
permission = fields.IntegerField()
class Django:
model = User
# fields列表应该只包含需要特殊处理的字段或者可以完全省略
# 因为我们已经显式定义了所有字段
@NEWS_INDEX.doc_type
class NewsDocument(Document):
"""新闻文档映射 - 保留原有功能"""
id = fields.IntegerField(attr='id')
title = fields.TextField(fields={'keyword': {'type': 'keyword'}})
content = fields.TextField(fields={'keyword': {'type': 'keyword'}})
class Django:
model = ElasticNews

363
elastic/es_connect.py Normal file
View File

@@ -0,0 +1,363 @@
"""
Django版本的ES连接和操作模块
迁移自Flask项目的ESConnect.py
"""
from elasticsearch import Elasticsearch
from elasticsearch_dsl import connections
from .documents import AchievementDocument, UserDocument
import hashlib
import time
# 使用Django的ES连接配置
connections.create_connection(hosts=['localhost:9200'])
# 获取默认的ES客户端
es = connections.get_connection()
# 索引名称与Flask项目保持一致
DATA_INDEX_NAME = "wordsearch_sb"
USERS_INDEX_NAME = "users"
def create_index_with_mapping():
"""创建索引和映射配置"""
try:
# 创建获奖数据索引
AchievementDocument.init()
print(f"创建索引 {DATA_INDEX_NAME} 并设置映射")
# 创建用户索引
UserDocument.init()
print(f"创建索引 {USERS_INDEX_NAME} 并设置映射")
# 创建默认管理员用户
admin_user = {
"user_id": 0000000000,
"username": "admin",
"password": "admin",
"permission": 0
}
write_user_data(admin_user)
except Exception as e:
print(f"创建索引失败: {str(e)}")
def get_doc_id(data):
"""
根据数据内容生成唯一ID用于去重
参数:
data (dict): 包含文档数据的字典
返回:
str: 基于数据内容生成的MD5哈希值作为唯一ID
"""
data_str = data.get('data', '')
image_str = data.get('image', '')
unique_str = f"{data_str}{image_str}"
return hashlib.md5(unique_str.encode('utf-8')).hexdigest()
def insert_data(data):
"""
向Elasticsearch插入数据
参数:
data (dict): 要插入的数据
返回:
bool: 插入成功返回True失败返回False
"""
try:
# 使用Django-elasticsearch-dsl的方式插入数据
achievement = AchievementDocument(
writer_id=data.get('writer_id', ''),
data=data.get('data', ''),
image=data.get('image', '')
)
achievement.save()
print(f"文档写入成功,内容: {data}")
return True
except Exception as e:
print(f"文档写入失败: {str(e)}, 数据: {data}")
return False
def search_data(query):
"""
在Elasticsearch中搜索数据
参数:
query (str): 搜索关键词
返回:
list: 包含搜索结果的列表
"""
try:
# 使用Django-elasticsearch-dsl进行搜索
search = AchievementDocument.search()
search = search.query("multi_match", query=query, fields=['*'])
response = search.execute()
results = []
for hit in response:
results.append({
"_id": hit.meta.id,
"writer_id": hit.writer_id,
"data": hit.data,
"image": hit.image
})
return results
except Exception as e:
print(f"搜索失败: {str(e)}")
return []
def search_all():
"""获取所有文档"""
try:
search = AchievementDocument.search()
search = search.query("match_all")
response = search.execute()
results = []
for hit in response:
results.append({
"_id": hit.meta.id,
"writer_id": hit.writer_id,
"data": hit.data,
"image": hit.image
})
return results
except Exception as e:
print(f"获取所有文档失败: {str(e)}")
return []
def delete_by_id(doc_id):
"""
根据 doc_id 删除文档
参数:
doc_id (str): 要删除的文档ID
返回:
bool: 删除成功返回True失败返回False
"""
try:
# 使用Django-elasticsearch-dsl删除文档
achievement = AchievementDocument.get(id=doc_id)
achievement.delete()
print(f"文档 {doc_id} 删除成功")
return True
except Exception as e:
print(f"删除失败: {str(e)}")
return False
def update_by_id(doc_id, updated_data):
"""
根据文档ID更新数据
参数:
doc_id (str): 要更新的文档ID
updated_data (dict): 更新的数据内容
返回:
bool: 更新成功返回True失败返回False
"""
try:
# 获取文档
achievement = AchievementDocument.get(id=doc_id)
# 更新字段
if 'writer_id' in updated_data:
achievement.writer_id = updated_data['writer_id']
if 'data' in updated_data:
achievement.data = updated_data['data']
if 'image' in updated_data:
achievement.image = updated_data['image']
achievement.save()
print(f"文档 {doc_id} 更新成功")
return True
except Exception as e:
print(f"更新失败: {str(e)}")
return False
def get_by_id(doc_id):
"""
根据文档ID获取单个文档
参数:
doc_id (str): 要获取的文档ID
返回:
dict or None: 成功返回文档数据失败返回None
"""
try:
achievement = AchievementDocument.get(id=doc_id)
return {
"_id": achievement.meta.id,
"writer_id": achievement.writer_id,
"data": achievement.data,
"image": achievement.image
}
except Exception as e:
print(f"获取文档失败: {str(e)}")
return None
def search_by_any_field(keyword):
"""
在任意字段中搜索关键词(支持模糊搜索)
参数:
keyword (str): 搜索关键词
返回:
list: 包含搜索结果的列表
"""
try:
search = AchievementDocument.search()
# 使用multi_match查询在所有字段中搜索
search = search.query("multi_match",
query=keyword,
fields=['*'],
fuzziness="AUTO")
response = search.execute()
results = []
for hit in response:
results.append({
"_id": hit.meta.id,
"writer_id": hit.writer_id,
"data": hit.data,
"image": hit.image
})
return results
except Exception as e:
print(f"模糊搜索失败: {str(e)}")
return []
def write_user_data(user_data):
"""
写入用户数据到ES
参数:
user_data (dict): 用户数据
返回:
bool: 写入成功返回True失败返回False
"""
try:
user = UserDocument(
user_id=user_data.get('user_id'),
username=user_data.get('username'),
password=user_data.get('password'),
permission=user_data.get('permission', 1)
)
user.save()
print(f"用户数据写入成功: {user_data.get('username')}")
return True
except Exception as e:
print(f"用户数据写入失败: {str(e)}")
return False
def get_user_by_username(username):
"""
根据用户名获取用户数据
参数:
username (str): 用户名
返回:
dict or None: 用户数据或None
"""
try:
search = UserDocument.search()
search = search.query("term", username=username)
response = search.execute()
if response.hits:
hit = response.hits[0]
return {
"user_id": hit.user_id,
"username": hit.username,
"password": hit.password,
"permission": hit.permission
}
return None
except Exception as e:
print(f"获取用户数据失败: {str(e)}")
return None
def get_all_users():
"""获取所有用户"""
try:
search = UserDocument.search()
search = search.query("match_all")
response = search.execute()
users = []
for hit in response:
users.append({
"user_id": hit.user_id,
"username": hit.username,
"permission": hit.permission
})
return users
except Exception as e:
print(f"获取所有用户失败: {str(e)}")
return []
def delete_user_by_username(username):
"""
根据用户名删除用户
参数:
username (str): 用户名
返回:
bool: 删除成功返回True失败返回False
"""
try:
search = UserDocument.search()
search = search.query("term", username=username)
response = search.execute()
if response.hits:
user = response.hits[0]
user.delete()
print(f"用户 {username} 删除成功")
return True
return False
except Exception as e:
print(f"删除用户失败: {str(e)}")
return False
def update_user_permission(username, new_permission):
"""
更新用户权限
参数:
username (str): 用户名
new_permission (int): 新权限级别
返回:
bool: 更新成功返回True失败返回False
"""
try:
search = UserDocument.search()
search = search.query("term", username=username)
response = search.execute()
if response.hits:
user = response.hits[0]
user.permission = new_permission
user.save()
print(f"用户 {username} 权限更新为 {new_permission}")
return True
return False
except Exception as e:
print(f"更新用户权限失败: {str(e)}")
return False

View File

41
elastic/models.py Normal file
View File

@@ -0,0 +1,41 @@
from django.db import models
class AchievementData(models.Model):
"""获奖数据模型对应Flask项目中的wordsearch266666索引"""
writer_id = models.CharField(max_length=100, verbose_name="作者ID")
data = models.TextField(verbose_name="数据内容")
image = models.CharField(max_length=500, blank=True, null=True, verbose_name="图片路径")
created_at = models.DateTimeField(auto_now_add=True, verbose_name="创建时间")
updated_at = models.DateTimeField(auto_now=True, verbose_name="更新时间")
class Meta:
verbose_name = "获奖数据"
verbose_name_plural = verbose_name
def __str__(self):
return f"{self.writer_id} - {self.data[:50]}"
class User(models.Model):
"""用户模型对应Flask项目中的users索引"""
user_id = models.BigIntegerField(unique=True, verbose_name="用户ID")
username = models.CharField(max_length=100, unique=True, verbose_name="用户名")
password = models.CharField(max_length=100, verbose_name="密码")
permission = models.IntegerField(default=1, verbose_name="权限级别")
created_at = models.DateTimeField(auto_now_add=True, verbose_name="创建时间")
class Meta:
verbose_name = "用户"
verbose_name_plural = verbose_name
def __str__(self):
return self.username
# 保留原有的ElasticNews模型用于兼容
class ElasticNews(models.Model):
title = models.CharField(max_length=100)
content = models.TextField()
class Meta:
verbose_name = "新闻"
verbose_name_plural = verbose_name

3
elastic/tests.py Normal file
View File

@@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

26
elastic/urls.py Normal file
View File

@@ -0,0 +1,26 @@
from django.urls import path
from . import views
app_name = 'elastic'
urlpatterns = [
# ES索引管理
path('init-index/', views.init_index, name='init_index'),
# 数据操作
path('data/', views.add_data, name='add_data'),
path('data/<str:doc_id>/', views.get_data, name='get_data'),
path('data/<str:doc_id>/update/', views.update_data, name='update_data'),
path('data/<str:doc_id>/delete/', views.delete_data, name='delete_data'),
# 搜索功能
path('search/', views.search, name='search'),
path('fuzzy-search/', views.fuzzy_search, name='fuzzy_search'),
path('all-data/', views.get_all_data, name='get_all_data'),
# 用户管理
path('users/', views.get_users, name='get_users'),
path('users/add/', views.add_user, name='add_user'),
path('users/<str:username>/delete/', views.delete_user, name='delete_user'),
path('users/<str:username>/update/', views.update_user, name='update_user'),
]

182
elastic/views.py Normal file
View File

@@ -0,0 +1,182 @@
"""
ES相关的API视图
"""
import json
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from .es_connect import (
create_index_with_mapping,
insert_data,
search_data,
search_by_any_field,
search_all,
delete_by_id,
update_by_id,
get_by_id,
write_user_data,
get_all_users,
delete_user_by_username,
update_user_permission
)
@require_http_methods(["GET", "POST"])
@csrf_exempt
def init_index(request):
"""初始化ES索引"""
try:
create_index_with_mapping()
return JsonResponse({"status": "success", "message": "索引初始化成功"})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["POST"])
@csrf_exempt
def add_data(request):
"""添加数据到ES"""
try:
data = json.loads(request.body.decode('utf-8'))
success = insert_data(data)
if success:
return JsonResponse({"status": "success", "message": "数据添加成功"})
else:
return JsonResponse({"status": "error", "message": "数据添加失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def search(request):
"""搜索数据"""
try:
query = request.GET.get('q', '')
if not query:
return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400)
results = search_data(query)
return JsonResponse({"status": "success", "data": results})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def fuzzy_search(request):
"""模糊搜索"""
try:
keyword = request.GET.get('keyword', '')
if not keyword:
return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400)
results = search_by_any_field(keyword)
return JsonResponse({"status": "success", "data": results})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def get_all_data(request):
"""获取所有数据"""
try:
results = search_all()
return JsonResponse({"status": "success", "data": results})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["DELETE"])
@csrf_exempt
def delete_data(request, doc_id):
"""删除数据"""
try:
success = delete_by_id(doc_id)
if success:
return JsonResponse({"status": "success", "message": "数据删除成功"})
else:
return JsonResponse({"status": "error", "message": "数据删除失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["PUT"])
@csrf_exempt
def update_data(request, doc_id):
"""更新数据"""
try:
data = json.loads(request.body.decode('utf-8'))
success = update_by_id(doc_id, data)
if success:
return JsonResponse({"status": "success", "message": "数据更新成功"})
else:
return JsonResponse({"status": "error", "message": "数据更新失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def get_data(request, doc_id):
"""获取单个数据"""
try:
result = get_by_id(doc_id)
if result:
return JsonResponse({"status": "success", "data": result})
else:
return JsonResponse({"status": "error", "message": "数据不存在"}, status=404)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["POST"])
@csrf_exempt
def add_user(request):
"""添加用户"""
try:
data = json.loads(request.body.decode('utf-8'))
success = write_user_data(data)
if success:
return JsonResponse({"status": "success", "message": "用户添加成功"})
else:
return JsonResponse({"status": "error", "message": "用户添加失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def get_users(request):
"""获取所有用户"""
try:
users = get_all_users()
return JsonResponse({"status": "success", "data": users})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["DELETE"])
@csrf_exempt
def delete_user(request, username):
"""删除用户"""
try:
success = delete_user_by_username(username)
if success:
return JsonResponse({"status": "success", "message": "用户删除成功"})
else:
return JsonResponse({"status": "error", "message": "用户删除失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["PUT"])
@csrf_exempt
def update_user(request, username):
"""更新用户权限"""
try:
data = json.loads(request.body.decode('utf-8'))
new_permission = data.get('permission', 1)
success = update_user_permission(username, new_permission)
if success:
return JsonResponse({"status": "success", "message": "用户权限更新成功"})
else:
return JsonResponse({"status": "error", "message": "用户权限更新失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
Django==5.2.8
elasticsearch==8.17.1
django-elasticsearch-dsl==7.3.0
requests==2.32.3