""" ES相关的API视图 """ import os import re import uuid import base64 import json from django.conf import settings from django.http import JsonResponse from django.shortcuts import render from django.views.decorators.http import require_http_methods from django.views.decorators.csrf import csrf_exempt from .es_connect import ( create_index_with_mapping, insert_data, search_data, search_by_any_field, search_all, delete_by_id, update_by_id, get_by_id, write_user_data, get_all_users, delete_user_by_username, update_user_permission ) from openai import OpenAI @require_http_methods(["GET", "POST"]) @csrf_exempt def init_index(request): """初始化ES索引""" try: create_index_with_mapping() return JsonResponse({"status": "success", "message": "索引初始化成功"}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_exempt def add_data(request): """添加数据到ES""" try: data = json.loads(request.body.decode('utf-8')) success = insert_data(data) if success: return JsonResponse({"status": "success", "message": "数据添加成功"}) else: return JsonResponse({"status": "error", "message": "数据添加失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def search(request): """搜索数据""" try: query = request.GET.get('q', '') if not query: return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400) results = search_data(query) return JsonResponse({"status": "success", "data": results}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def fuzzy_search(request): """模糊搜索""" try: keyword = request.GET.get('keyword', '') if not keyword: return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400) results = search_by_any_field(keyword) return JsonResponse({"status": "success", "data": results}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def get_all_data(request): """获取所有数据""" try: results = search_all() return JsonResponse({"status": "success", "data": results}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["DELETE"]) @csrf_exempt def delete_data(request, doc_id): """删除数据""" try: success = delete_by_id(doc_id) if success: return JsonResponse({"status": "success", "message": "数据删除成功"}) else: return JsonResponse({"status": "error", "message": "数据删除失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["PUT"]) @csrf_exempt def update_data(request, doc_id): """更新数据""" try: data = json.loads(request.body.decode('utf-8')) success = update_by_id(doc_id, data) if success: return JsonResponse({"status": "success", "message": "数据更新成功"}) else: return JsonResponse({"status": "error", "message": "数据更新失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def get_data(request, doc_id): """获取单个数据""" try: result = get_by_id(doc_id) if result: return JsonResponse({"status": "success", "data": result}) else: return JsonResponse({"status": "error", "message": "数据不存在"}, status=404) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_exempt def add_user(request): """添加用户""" try: data = json.loads(request.body.decode('utf-8')) success = write_user_data(data) if success: return JsonResponse({"status": "success", "message": "用户添加成功"}) else: return JsonResponse({"status": "error", "message": "用户添加失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def get_users(request): """获取所有用户""" try: users = get_all_users() return JsonResponse({"status": "success", "data": users}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["DELETE"]) @csrf_exempt def delete_user(request, username): """删除用户""" try: success = delete_user_by_username(username) if success: return JsonResponse({"status": "success", "message": "用户删除成功"}) else: return JsonResponse({"status": "error", "message": "用户删除失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["PUT"]) @csrf_exempt def update_user(request, username): """更新用户权限""" try: data = json.loads(request.body.decode('utf-8')) new_permission = data.get('permission', 1) success = update_user_permission(username, new_permission) if success: return JsonResponse({"status": "success", "message": "用户权限更新成功"}) else: return JsonResponse({"status": "error", "message": "用户权限更新失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) # 辅助:JSON 转换(兼容 a.py 行为) def json_to_string(obj): try: return json.dumps(obj, ensure_ascii=False) except Exception: return str(obj) def string_to_json(s): try: return json.loads(s) except Exception: return {} # 移植自 a.py 的核心:调用大模型进行 OCR/信息抽取 def ocr_and_extract_info(image_path: str): def encode_image(path: str) -> str: with open(path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") base64_image = encode_image(image_path) api_key = getattr(settings, "AISTUDIO_API_KEY", "") base_url = getattr(settings, "OPENAI_BASE_URL", "https://aistudio.baidu.com/llm/lmapi/v3") if not api_key: raise RuntimeError("缺少 AISTUDIO_API_KEY,请在环境变量或 settings 中配置") client = OpenAI(api_key=api_key, base_url=base_url) chat_completion = client.chat.completions.create( messages=[ {"role": "system", "content": "你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。"}, { "role": "user", "content": [ {"type": "text", "text": "请识别这张图片中的信息,将你认为重要的数据转换为不包含嵌套的json,不要显示其它信息以便于解析直接输出json结果即可你可以自行决定使用哪些json字段"}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}, ], }, ], model="ernie-4.5-turbo-vl-32k", ) response_text = chat_completion.choices[0].message.content def parse_response(text: str): try: result = json.loads(text) if result: return result except json.JSONDecodeError: pass m = re.search(r"```json\n(.*?)```", text, re.DOTALL) if m: try: result = json.loads(m.group(1)) if result: return result except json.JSONDecodeError: pass try: fixed = text.replace("'", '"') result = json.loads(fixed) if result: return result except json.JSONDecodeError: pass return None return parse_response(response_text) # 上传页面 @require_http_methods(["GET"]) def upload_page(request): # if not request.session.get("user_id"): # from django.shortcuts import redirect # return redirect("/accounts/login/") return render(request, "elastic/upload.html") # 上传并识别(不入库) @require_http_methods(["POST"]) def upload(request): if not request.session.get("user_id"): return JsonResponse({"status": "error", "message": "未登录"}, status=401) file = request.FILES.get("file") if not file: return JsonResponse({"status": "error", "message": "未选择文件"}, status=400) images_dir = os.path.join(settings.MEDIA_ROOT, "images") os.makedirs(images_dir, exist_ok=True) filename = f"{uuid.uuid4()}_{file.name}" abs_path = os.path.join(images_dir, filename) with open(abs_path, "wb") as dst: for chunk in file.chunks(): dst.write(chunk) try: data = ocr_and_extract_info(abs_path) if not data: return JsonResponse({"status": "error", "message": "无法识别图片内容"}, status=400) rel_path = f"images/{filename}" image_url = request.build_absolute_uri(settings.MEDIA_URL + rel_path) return JsonResponse({ "status": "success", "message": "识别成功,请确认数据后点击录入", "data": data, "image": rel_path, "image_url": image_url, }) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) # 确认并入库 @require_http_methods(["POST"]) def confirm(request): if not request.session.get("user_id"): return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: payload = json.loads(request.body.decode("utf-8")) except json.JSONDecodeError: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) edited = payload.get("data") or {} image_rel = payload.get("image") or "" if not isinstance(edited, dict) or not edited: return JsonResponse({"status": "error", "message": "数据不能为空"}, status=400) to_store = { "writer_id": str(request.session.get("user_id")), "data": json_to_string(edited), "image": image_rel, } ok = insert_data(to_store) if not ok: return JsonResponse({"status": "error", "message": "写入ES失败"}, status=500) return JsonResponse({"status": "success", "message": "数据录入成功", "data": edited})