Files
Achievement_Inputing/elastic/views.py
2025-11-13 19:31:15 +08:00

404 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
ES相关的API视图
"""
import os
import re
import uuid
import base64
import json
from django.conf import settings
from django.http import JsonResponse
from django.shortcuts import render
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import ensure_csrf_cookie
from django.views.decorators.csrf import csrf_exempt
from .es_connect import (
create_index_with_mapping,
insert_data,
search_data,
search_by_any_field,
search_all,
delete_by_id,
update_by_id,
get_by_id,
write_user_data,
get_all_users,
delete_user_by_username,
update_user_permission
)
from openai import OpenAI
@require_http_methods(["GET", "POST"])
@csrf_exempt
def init_index(request):
"""初始化ES索引"""
try:
create_index_with_mapping()
return JsonResponse({"status": "success", "message": "索引初始化成功"})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["POST"])
@csrf_exempt
def add_data(request):
"""添加数据到ES"""
try:
data = json.loads(request.body.decode('utf-8'))
success = insert_data(data)
if success:
return JsonResponse({"status": "success", "message": "数据添加成功"})
else:
return JsonResponse({"status": "error", "message": "数据添加失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def search(request):
"""搜索数据"""
try:
query = request.GET.get('q', '')
if not query:
return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400)
results = search_data(query)
return JsonResponse({"status": "success", "data": results})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def fuzzy_search(request):
"""模糊搜索"""
try:
keyword = request.GET.get('keyword', '')
if not keyword:
return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400)
results = search_by_any_field(keyword)
return JsonResponse({"status": "success", "data": results})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def get_all_data(request):
"""获取所有数据"""
try:
results = search_all()
return JsonResponse({"status": "success", "data": results})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["DELETE"])
@csrf_exempt
def delete_data(request, doc_id):
"""删除数据(需登录;管理员或作者本人)"""
if not request.session.get("user_id"):
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
try:
existing = get_by_id(doc_id)
if not existing:
return JsonResponse({"status": "error", "message": "数据不存在"}, status=404)
is_admin = (request.session.get("permission", 1) == 0)
is_owner = str(existing.get("writer_id", "")) == str(request.session.get("user_id"))
if not (is_admin or is_owner):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
success = delete_by_id(doc_id)
if success:
return JsonResponse({"status": "success", "message": "数据删除成功"})
else:
return JsonResponse({"status": "error", "message": "数据删除失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["PUT"])
@csrf_exempt
def update_data(request, doc_id):
"""更新数据(需登录;管理员或作者本人)"""
if not request.session.get("user_id"):
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
try:
payload = json.loads(request.body.decode('utf-8'))
except Exception:
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
try:
existing = get_by_id(doc_id)
if not existing:
return JsonResponse({"status": "error", "message": "数据不存在"}, status=404)
is_admin = (request.session.get("permission", 1) == 0)
is_owner = str(existing.get("writer_id", "")) == str(request.session.get("user_id"))
if not (is_admin or is_owner):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
updated = {}
if "writer_id" in payload:
updated["writer_id"] = payload["writer_id"]
if "image" in payload:
updated["image"] = payload["image"]
if "data" in payload:
v = payload["data"]
if isinstance(v, dict):
updated["data"] = json.dumps(v, ensure_ascii=False)
else:
updated["data"] = str(v)
success = update_by_id(doc_id, updated)
if success:
return JsonResponse({"status": "success", "message": "数据更新成功"})
else:
return JsonResponse({"status": "error", "message": "数据更新失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def get_data(request, doc_id):
"""获取单个数据"""
try:
result = get_by_id(doc_id)
if result:
return JsonResponse({"status": "success", "data": result})
else:
return JsonResponse({"status": "error", "message": "数据不存在"}, status=404)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["POST"])
@csrf_exempt
def add_user(request):
"""添加用户"""
try:
data = json.loads(request.body.decode('utf-8'))
success = write_user_data(data)
if success:
return JsonResponse({"status": "success", "message": "用户添加成功"})
else:
return JsonResponse({"status": "error", "message": "用户添加失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def get_users(request):
"""获取所有用户"""
try:
users = get_all_users()
return JsonResponse({"status": "success", "data": users})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["DELETE"])
@csrf_exempt
def delete_user(request, username):
"""删除用户"""
try:
success = delete_user_by_username(username)
if success:
return JsonResponse({"status": "success", "message": "用户删除成功"})
else:
return JsonResponse({"status": "error", "message": "用户删除失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["PUT"])
@csrf_exempt
def update_user(request, username):
"""更新用户权限"""
try:
data = json.loads(request.body.decode('utf-8'))
new_permission = data.get('permission', 1)
success = update_user_permission(username, new_permission)
if success:
return JsonResponse({"status": "success", "message": "用户权限更新成功"})
else:
return JsonResponse({"status": "error", "message": "用户权限更新失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
# 辅助JSON 转换(兼容 a.py 行为)
def json_to_string(obj):
try:
return json.dumps(obj, ensure_ascii=False)
except Exception:
return str(obj)
def string_to_json(s):
try:
return json.loads(s)
except Exception:
return {}
# 移植自 a.py 的核心:调用大模型进行 OCR/信息抽取
def ocr_and_extract_info(image_path: str):
def encode_image(path: str) -> str:
with open(path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
base64_image = encode_image(image_path)
api_key = getattr(settings, "AISTUDIO_API_KEY", "")
base_url = getattr(settings, "OPENAI_BASE_URL", "https://aistudio.baidu.com/llm/lmapi/v3")
if not api_key:
raise RuntimeError("缺少 AISTUDIO_API_KEY请在环境变量或 settings 中配置")
client = OpenAI(api_key=api_key, base_url=base_url)
chat_completion = client.chat.completions.create(
messages=[
{"role": "system", "content": "你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。"},
{
"role": "user",
"content": [
{"type": "text", "text": "请识别这张图片中的信息将你认为重要的数据转换为不包含嵌套的json不要显示其它信息以便于解析直接输出json结果即可你可以自行决定使用哪些json字段"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}},
],
},
],
model="ernie-4.5-turbo-vl-32k",
)
response_text = chat_completion.choices[0].message.content
def parse_response(text: str):
try:
result = json.loads(text)
if result:
return result
except json.JSONDecodeError:
pass
m = re.search(r"```json\n(.*?)```", text, re.DOTALL)
if m:
try:
result = json.loads(m.group(1))
if result:
return result
except json.JSONDecodeError:
pass
try:
fixed = text.replace("'", '"')
result = json.loads(fixed)
if result:
return result
except json.JSONDecodeError:
pass
return None
return parse_response(response_text)
@require_http_methods(["GET"])
def upload_page(request):
session_user_id = request.session.get("user_id")
if session_user_id is None:
from django.shortcuts import redirect
return redirect("/accounts/login/")
user_id_qs = request.GET.get("user_id")
context = {"user_id": user_id_qs or session_user_id}
return render(request, "elastic/upload.html", context)
# 上传并识别(不入库)
@require_http_methods(["POST"])
def upload(request):
if not request.session.get("user_id"):
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
file = request.FILES.get("file")
if not file:
return JsonResponse({"status": "error", "message": "未选择文件"}, status=400)
images_dir = os.path.join(settings.MEDIA_ROOT, "images")
os.makedirs(images_dir, exist_ok=True)
filename = f"{uuid.uuid4()}_{file.name}"
abs_path = os.path.join(images_dir, filename)
with open(abs_path, "wb") as dst:
for chunk in file.chunks():
dst.write(chunk)
try:
data = ocr_and_extract_info(abs_path)
if not data:
return JsonResponse({"status": "error", "message": "无法识别图片内容"}, status=400)
rel_path = f"images/{filename}"
image_url = request.build_absolute_uri(settings.MEDIA_URL + rel_path)
return JsonResponse({
"status": "success",
"message": "识别成功,请确认数据后点击录入",
"data": data,
"image": rel_path,
"image_url": image_url,
})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
# 确认并入库
@require_http_methods(["POST"])
def confirm(request):
if not request.session.get("user_id"):
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
try:
payload = json.loads(request.body.decode("utf-8"))
except json.JSONDecodeError:
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
edited = payload.get("data") or {}
image_rel = payload.get("image") or ""
if not isinstance(edited, dict) or not edited:
return JsonResponse({"status": "error", "message": "数据不能为空"}, status=400)
to_store = {
"writer_id": str(request.session.get("user_id")),
"data": json_to_string(edited),
"image": image_rel,
}
ok = insert_data(to_store)
if not ok:
return JsonResponse({"status": "error", "message": "写入ES失败"}, status=500)
return JsonResponse({"status": "success", "message": "数据录入成功", "data": edited})
@require_http_methods(["GET"])
@ensure_csrf_cookie
def manage_page(request):
session_user_id = request.session.get("user_id")
if session_user_id is None:
from django.shortcuts import redirect
return redirect("/accounts/login/")
is_admin = (request.session.get("permission", 1) == 0)
raw_results = search_all()
if not is_admin:
uid = str(session_user_id)
raw_results = [r for r in raw_results if str(r.get("writer_id", "")) == uid]
# 规范化键,避免模板点号访问下划线前缀字段
results = []
for r in raw_results:
results.append({
"id": r.get("_id", ""),
"writer_id": r.get("writer_id", ""),
"image": r.get("image", ""),
"data": r.get("data", ""),
})
user_id_qs = request.GET.get("user_id")
context = {"items": results, "user_id": user_id_qs or session_user_id}
return render(request, "elastic/manage.html", context)