""" ES相关的API视图 """ import os import re import uuid import base64 import json from django.conf import settings from django.http import JsonResponse from django.shortcuts import render from django.views.decorators.http import require_http_methods from django.views.decorators.csrf import ensure_csrf_cookie from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie, csrf_protect from .es_connect import * from .es_connect import update_user_by_id as es_update_user_by_id, delete_user_by_id as es_delete_user_by_id from .es_connect import ( analytics_trend as es_analytics_trend, analytics_types as es_analytics_types, analytics_types_trend as es_analytics_types_trend, analytics_recent as es_analytics_recent, ) from PIL import Image @require_http_methods(["GET", "POST"]) @csrf_exempt def init_index(request): """初始化ES索引""" print("⚠️ init_index 被调用了!") try: create_index_with_mapping() return JsonResponse({"status": "success", "message": "索引初始化成功"}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_exempt def add_data(request): """添加数据到ES""" try: data = json.loads(request.body.decode('utf-8')) success = insert_data(data) if success: return JsonResponse({"status": "success", "message": "数据添加成功"}) else: return JsonResponse({"status": "error", "message": "数据添加失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def search(request): """搜索数据""" try: query = request.GET.get('q', '') if not query: return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400) results = search_data(query) return JsonResponse({"status": "success", "data": results}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def fuzzy_search(request): """模糊搜索""" try: keyword = request.GET.get('keyword', '') if not keyword: return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400) results = search_by_any_field(keyword) return JsonResponse({"status": "success", "data": results}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def get_all_data(request): """获取所有数据""" try: results = search_all() return JsonResponse({"status": "success", "data": results}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["DELETE"]) @csrf_exempt def delete_data(request, doc_id): """删除数据(需登录;管理员或作者本人)""" request_user=request.session.get("user_id") # request_admin=request.session.get("permisssion") if request_user is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: existing = get_by_id(doc_id) user_existing=get_user_by_id(request_user) if not existing: return JsonResponse({"status": "error", "message": "数据不存在"}, status=404) is_admin = int(user_existing.get('permission')) == 0 is_owner = str(existing.get("writer_id", "")) == str(request.session.get("user_id")) if not (is_admin or is_owner): return JsonResponse({"status": "error", "message": "无权限"}, status=403) success = delete_by_id(doc_id) if success: return JsonResponse({"status": "success", "message": "数据删除成功"}) else: return JsonResponse({"status": "error", "message": "数据删除失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["PUT"]) @csrf_exempt def update_data(request, doc_id): """更新数据(需登录;管理员或作者本人)""" request_user = request.session.get("user_id") if request_user is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: payload = json.loads(request.body.decode('utf-8')) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) try: existing = get_by_id(doc_id) user_existing = get_user_by_id(request_user) if not existing: return JsonResponse({"status": "error", "message": "数据不存在"}, status=404) is_admin = int(user_existing.get('permission')) == 0 is_owner = str(existing.get("writer_id", "")) == str(request.session.get("user_id")) if not (is_admin or is_owner): return JsonResponse({"status": "error", "message": "无权限"}, status=403) updated = {} if "writer_id" in payload: updated["writer_id"] = payload["writer_id"] if "image" in payload: updated["image"] = payload["image"] if "data" in payload: v = payload["data"] if isinstance(v, dict): updated["data"] = json.dumps(v, ensure_ascii=False) else: try: obj = json.loads(str(v)) updated["data"] = json.dumps(obj, ensure_ascii=False) except Exception: updated["data"] = str(v) success = update_by_id(doc_id, updated) if success: return JsonResponse({"status": "success", "message": "数据更新成功"}) else: return JsonResponse({"status": "error", "message": "数据更新失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def get_data(request, doc_id): """获取单个数据""" try: result = get_by_id(doc_id) if result: return JsonResponse({"status": "success", "data": result}) else: return JsonResponse({"status": "error", "message": "数据不存在"}, status=404) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_protect def add_user(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) username = (payload.get("username") or "").strip() password = (payload.get("password") or "").strip() try: permission = int(payload.get("permission", 1)) except Exception: permission = 1 if not username: return JsonResponse({"status": "error", "message": "用户名不能为空"}, status=400) if password and len(password) < 6: return JsonResponse({"status": "error", "message": "密码长度至少为6位"}, status=400) existing = get_user_by_username(username) if existing: return JsonResponse({"status": "error", "message": "用户名已存在"}, status=409) users = get_all_users() next_id = (max([int(u.get("user_id", 0)) for u in users]) + 1) if users else 1 ok = write_user_data({ "user_id": next_id, "username": username, "password": password, "permission": permission, }) if not ok: return JsonResponse({"status": "error", "message": "用户添加失败"}, status=500) return JsonResponse({"status": "success", "message": "用户添加成功"}) @require_http_methods(["GET"]) def get_users(request): uid = request.session.get("user_id") if uid is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: is_admin = int(request.session.get("permission", 1)) == 0 requester = get_user_by_id(uid) or {} mgr_keys = set(requester.get("manage_key") or []) q = (request.GET.get("search") or "").strip() users = get_all_users() if is_admin: filtered = users elif mgr_keys: def match_manage(user): ukeys = set(user.get("key") or []) return bool(ukeys & mgr_keys) filtered = [u for u in users if match_manage(u)] else: filtered = [u for u in users if str(u.get("user_id")) == str(uid)] if q: filtered = [u for u in filtered if q in str(u.get("username", ""))] return JsonResponse({"status": "success", "data": filtered}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_protect def update_user_by_id_view(request, user_id): uid = request.session.get("user_id") if uid is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) new_username = (payload.get("username") or "").strip() new_permission = payload.get("permission") new_password = (payload.get("password") or "").strip() if new_password and len(new_password) < 6: return JsonResponse({"status": "error", "message": "密码长度至少为6位"}, status=400) is_admin = int(request.session.get("permission", 1)) == 0 requester = get_user_by_id(uid) or {} target = get_user_by_id(user_id) or {} requester_mgr = set(requester.get("manage_key") or []) target_keys = set(target.get("key") or []) if is_admin: if new_username: other = get_user_by_username(new_username) if other and int(other.get("user_id", -1)) != int(user_id): return JsonResponse({"status": "error", "message": "用户名已存在"}, status=409) ok = es_update_user_by_id( user_id, username=new_username if new_username else None, permission=int(new_permission) if new_permission is not None else None, password=new_password if new_password else None, ) return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500) if str(uid) == str(user_id): if not new_password: return JsonResponse({"status": "error", "message": "仅允许修改密码"}, status=400) ok = es_update_user_by_id(user_id, password=new_password) return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500) if requester_mgr and (target_keys & requester_mgr): if not new_password or new_username or new_permission is not None: return JsonResponse({"status": "error", "message": "导师仅允许修改密码"}, status=403) ok = es_update_user_by_id(user_id, password=new_password) return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500) return JsonResponse({"status": "error", "message": "无权限"}, status=403) @require_http_methods(["POST"]) @csrf_protect def delete_user_by_id_view(request, user_id): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) ok = es_delete_user_by_id(user_id) if not ok: return JsonResponse({"status": "error", "message": "用户删除失败"}, status=500) return JsonResponse({"status": "success", "message": "用户删除成功"}) # 辅助:JSON 转换(兼容 a.py 行为) def json_to_string(obj): try: return json.dumps(obj, ensure_ascii=False) except Exception: return str(obj) def string_to_json(s): try: return json.loads(s) except Exception: return {} # 移植自 a.py 的核心:调用大模型进行 OCR/信息抽取 def ocr_and_extract_info(image_path: str): from openai import OpenAI def encode_image(path: str) -> str: with open(path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") base64_image = encode_image(image_path) # api_key = getattr(settings, "AISTUDIO_API_KEY", "188f57db3766e02ed2c7e18373996d84f4112272") # base_url = getattr(settings, "OPENAI_BASE_URL", "https://aistudio.baidu.com/llm/lmapi/v3") # if not api_key: # raise RuntimeError("缺少 AISTUDIO_API_KEY,请在环境变量或 settings 中配置") api_key = getattr(settings, "AISTUDIO_API_KEY", "") base_url = getattr(settings, "OPENAI_BASE_URL", "") if not api_key or not base_url: raise RuntimeError("缺少模型服务配置,请设置 AISTUDIO_API_KEY 与 OPENAI_BASE_URL") client = OpenAI(api_key=api_key, base_url=base_url) types = get_type_list() chat_completion = client.chat.completions.create( messages=[ {"role": "system", "content": "你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。"}, { "role": "user", "content": [ {"type": "text", "text": f"请识别这张图片中的信息,将你认为重要的数据转换为不包含嵌套的json,不要显示其它信息以便于解析,直接输出json结果即可。使用“数据类型”字段表示这个东西的大致类型,除此之外你可以自行决定使用哪些json字段。“数据类型”的内容有严格规定,请查看{json.dumps(types, ensure_ascii=False)}中是否包含你所需要的类型,确定不包含后你才可以填入你觉得合适的大致分类。"}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}, ], }, ], model="ernie-4.5-turbo-vl-32k", ) response_text = chat_completion.choices[0].message.content def parse_response(text: str): try: result = json.loads(text) if result: return result except json.JSONDecodeError: pass m = re.search(r"```json\n(.*?)```", text, re.DOTALL) if m: try: result = json.loads(m.group(1)) if result: return result except json.JSONDecodeError: pass try: fixed = text.replace("'", '"') result = json.loads(fixed) if result: return result except json.JSONDecodeError: pass return None return parse_response(response_text) @require_http_methods(["GET"]) @ensure_csrf_cookie def upload_page(request): session_user_id = request.session.get("user_id") if session_user_id is None: from django.shortcuts import redirect return redirect("/accounts/login/") user_id_qs = request.GET.get("user_id") context = {"user_id": user_id_qs or session_user_id} return render(request, "elastic/upload.html", context) # 上传并识别(不入库) @require_http_methods(["POST"]) def upload(request): if request.session.get("user_id") is None: fallback_uid = request.POST.get("user_id") or request.GET.get("user_id") if fallback_uid: request.session["user_id"] = fallback_uid request.session.setdefault("permission", 1) else: return JsonResponse({"status": "error", "message": "未登录"}, status=401) file = request.FILES.get("file") if not file: return JsonResponse({"status": "error", "message": "未选择文件"}, status=400) images_dir = os.path.join(settings.MEDIA_ROOT, "images") os.makedirs(images_dir, exist_ok=True) filename = f"{uuid.uuid4()}_{file.name}" abs_path = os.path.join(images_dir, filename) with open(abs_path, "wb") as dst: for chunk in file.chunks(): dst.write(chunk) try: data = ocr_and_extract_info(abs_path) if not data: return JsonResponse({"status": "error", "message": "无法识别图片内容"}, status=400) rel_path = f"images/{filename}" image_url = request.build_absolute_uri(settings.MEDIA_URL + rel_path) return JsonResponse({ "status": "success", "message": "识别成功,请确认数据后点击录入", "data": data, "image": rel_path, "image_url": image_url, }) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) # 确认并入库 @require_http_methods(["POST"]) def confirm(request): if request.session.get("user_id") is None: # 允许从payload中带入user_id作为后备(便于前端已知用户时继续操作) try: payload_for_uid = json.loads(request.body.decode("utf-8")) except Exception: payload_for_uid = {} fb_uid = (payload_for_uid or {}).get("user_id") if fb_uid: request.session["user_id"] = fb_uid request.session.setdefault("permission", 1) else: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: payload = json.loads(request.body.decode("utf-8")) except json.JSONDecodeError: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) edited = payload.get("data") or {} image_rel = payload.get("image") or "" if not isinstance(edited, dict) or not edited: return JsonResponse({"status": "error", "message": "数据不能为空"}, status=400) ensure_type_in_list(edited.get("数据类型")) final_image_rel = image_rel try: if image_rel: images_dir = os.path.join(settings.MEDIA_ROOT, "images") os.makedirs(images_dir, exist_ok=True) src_abs = os.path.join(settings.MEDIA_ROOT, image_rel) base = os.path.splitext(os.path.basename(image_rel))[0] webp_name = base + ".webp" webp_abs = os.path.join(images_dir, webp_name) with Image.open(src_abs) as im: if im.mode in ("RGBA", "LA", "P"): im = im.convert("RGBA") else: im = im.convert("RGB") im.save(webp_abs, format="WEBP", quality=80) final_image_rel = f"images/{webp_name}" except Exception: final_image_rel = image_rel to_store = { "writer_id": str(request.session.get("user_id")), "data": json_to_string(edited), "image": final_image_rel, } ok = insert_data(to_store) if not ok: return JsonResponse({"status": "error", "message": "写入ES失败"}, status=500) try: if image_rel and final_image_rel != image_rel: orig_abs = os.path.join(settings.MEDIA_ROOT, image_rel) if os.path.isfile(orig_abs): os.remove(orig_abs) except Exception: pass return JsonResponse({"status": "success", "message": "数据录入成功", "data": edited}) @require_http_methods(["GET"]) @ensure_csrf_cookie def manage_page(request): session_user_id = request.session.get("user_id") if session_user_id is None: from django.shortcuts import redirect return redirect("/accounts/login/") is_admin = int(request.session.get("permission", 1)) == 0 if is_admin: raw_results = search_all() else: uid = str(session_user_id) raw_results = [r for r in search_all() if str(r.get("writer_id", "")) == uid] results = [] for r in raw_results: try: r_data = string_to_json(r.get("data", "{}")) r_data["_id"] = r["id"] r_data["_image"] = r.get("image", "") results.append(r_data) except Exception: pass return render( request, "elastic/manage.html", { "results": results, "is_admin": is_admin, "user_id": session_user_id, }, ) # 规范化键,避免模板点号访问下划线前缀字段 results = [] for r in raw_results: results.append({ "id": r.get("_id", ""), "writer_id": r.get("writer_id", ""), "image": r.get("image", ""), "data": r.get("data", ""), }) user_id_qs = request.GET.get("user_id") context = {"items": results, "user_id": user_id_qs or session_user_id} return render(request, "elastic/manage.html", context) @require_http_methods(["GET"]) def analytics_trend_view(request): try: gte = request.GET.get("from") lte = request.GET.get("to") interval = request.GET.get("interval", "day") data = es_analytics_trend(gte=gte, lte=lte, interval=interval) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def analytics_types_view(request): try: gte = request.GET.get("from") lte = request.GET.get("to") size = request.GET.get("size") try: size_int = int(size) if size is not None else 10 except Exception: size_int = 10 data = es_analytics_types(gte=gte, lte=lte, size=size_int) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def analytics_types_trend_view(request): try: gte = request.GET.get("from") lte = request.GET.get("to") interval = request.GET.get("interval", "week") size = request.GET.get("size") try: size_int = int(size) if size is not None else 8 except Exception: size_int = 8 data = es_analytics_types_trend(gte=gte, lte=lte, interval=interval, size=size_int) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def analytics_recent_view(request): try: limit = request.GET.get("limit") gte = request.GET.get("from") lte = request.GET.get("to") try: limit_int = int(limit) if limit is not None else 10 except Exception: limit_int = 10 data = es_analytics_recent(limit=limit_int, gte=gte, lte=lte) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) @ensure_csrf_cookie def user_manage(request): session_user_id = request.session.get("user_id") if session_user_id is None: from django.shortcuts import redirect return redirect("/accounts/login/") is_admin = int(request.session.get("permission", 1)) == 0 me = get_user_by_id(session_user_id) or {} has_manage = bool(me.get("manage_key")) user_id_qs = request.GET.get("user_id") context = { "user_id": user_id_qs or session_user_id, "is_admin": is_admin, "is_tutor": (not is_admin) and has_manage, "is_student": (not is_admin) and (not has_manage), } return render(request, "elastic/users.html", context) @require_http_methods(["GET"]) @ensure_csrf_cookie def registration_code_manage_page(request): session_user_id = request.session.get("user_id") if session_user_id is None: from django.shortcuts import redirect return redirect("/accounts/login/") if int(request.session.get("permission", 1)) != 0: from django.shortcuts import redirect return redirect("/main/home/") user_id_qs = request.GET.get("user_id") context = {"user_id": user_id_qs or session_user_id} return render(request, "elastic/registration_codes.html", context) @require_http_methods(["GET"]) def get_keys_list_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) lst = get_keys_list() return JsonResponse({"status": "success", "data": lst}) @require_http_methods(["POST"]) @csrf_protect def add_key_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) key_name = (payload.get("key") or "").strip() if not key_name: return JsonResponse({"status": "error", "message": "key不能为空"}, status=400) ok = ensure_key_in_list(key_name) if not ok: return JsonResponse({"status": "error", "message": "key已存在或写入失败"}, status=409) return JsonResponse({"status": "success"}) @require_http_methods(["POST"]) @csrf_protect def generate_registration_code_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) keys = list(payload.get("keys") or []) manage_keys = list(payload.get("manage_keys") or []) try: days = int(payload.get("expires_in_days", 30)) except Exception: days = 30 result = generate_registration_code(keys=keys, manage_keys=manage_keys, expires_in_days=days, created_by=request.session.get("user_id")) if not result: return JsonResponse({"status": "error", "message": "生成失败"}, status=500) return JsonResponse({"status": "success", "data": result}) @require_http_methods(["GET"]) def list_registration_codes_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) data = list_registration_codes() return JsonResponse({"status": "success", "data": data}) @require_http_methods(["POST"]) @csrf_protect def revoke_registration_code_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) code = (payload.get("code") or "").strip() if not code: return JsonResponse({"status": "error", "message": "缺少code"}, status=400) ok = revoke_registration_code(code) if not ok: return JsonResponse({"status": "error", "message": "作废失败"}, status=500) return JsonResponse({"status": "success"})