""" ES相关的API视图 """ import os import re import uuid import base64 import json import tempfile import concurrent.futures from django.conf import settings from django.http import JsonResponse from django.shortcuts import render from django.views.decorators.http import require_http_methods from django.views.decorators.csrf import ensure_csrf_cookie from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie, csrf_protect from .es_connect import * from .es_connect import update_user_by_id as es_update_user_by_id, delete_user_by_id as es_delete_user_by_id from .es_connect import ( analytics_trend as es_analytics_trend, analytics_types as es_analytics_types, analytics_types_trend as es_analytics_types_trend, analytics_recent as es_analytics_recent, ) from PIL import Image try: import fitz # PyMuPDF HAS_PDF_SUPPORT = True PDF_ERROR = "" except ImportError as e: try: import pymupdf fitz = pymupdf HAS_PDF_SUPPORT = True PDF_ERROR = "" except ImportError: HAS_PDF_SUPPORT = False PDF_ERROR = str(e) def _filter_results_for_user(request, results): session_user_id = request.session.get("user_id") if session_user_id is None: return [] me = get_user_by_id(session_user_id) or {} is_admin = int(request.session.get("permission", 1)) == 0 if is_admin: return results uid = str(session_user_id) manage_keys = me.get("manage_key", []) or [] filtered = [] for r in results: # 1. 自己的提交 if str(r.get("writer_id", "")) == uid: filtered.append(r) continue # 2. 管理的提交 if manage_keys: r_data = str(r.get("data", "")) for mk in manage_keys: if mk and str(mk) in r_data: filtered.append(r) break return filtered def _image_ref_to_url(request, image_ref: str) -> str: s = str(image_ref or '').strip() if not s: return '' if s.startswith('minio:'): object_name = s[len('minio:'):].lstrip('/') if not object_name: return '' try: from minio_storage.minio_connect import presigned_get_url return presigned_get_url(object_name, expires_seconds=8 * 60 * 60) except Exception: return '' if s.startswith('local:'): rel_path = s[len('local:'):].lstrip('/') if not rel_path: return '' return request.build_absolute_uri(settings.MEDIA_URL + rel_path) return '' def _parse_image_refs(image_ref): if not image_ref: return [] if isinstance(image_ref, (list, tuple)): return [str(x) for x in image_ref if str(x).strip()] if isinstance(image_ref, str): s = image_ref.strip() if not s: return [] parsed = None if s[:1] in ('[', '"'): try: parsed = json.loads(s) except Exception: parsed = None if isinstance(parsed, list): return [str(x) for x in parsed if str(x).strip()] if isinstance(parsed, str): s = parsed.strip() return [s] if s else [] return [] def _attach_image_urls(request, items): out = [] for it in list(items or []): try: d = dict(it or {}) except Exception: continue refs = _parse_image_refs(d.get('image', '')) urls = [_image_ref_to_url(request, r) for r in refs if str(r).strip()] urls = [u for u in urls if u] d['image_urls'] = urls d['image_url'] = urls[0] if urls else _image_ref_to_url(request, d.get('image', '')) out.append(d) return out def _attach_writer_names(items): users = get_all_users() name_by_id = {str(u.get("user_id")): u.get("username") for u in (users or [])} out = [] for it in list(items or []): try: d = dict(it or {}) except Exception: continue wid = d.get("writer_id") d["writer_name"] = name_by_id.get(str(wid), str(wid) if wid is not None else "") out.append(d) return out @require_http_methods(["GET", "POST"]) @csrf_exempt def init_index(request): """初始化ES索引""" print("⚠️ init_index 被调用了!") try: create_index_with_mapping() return JsonResponse({"status": "success", "message": "索引初始化成功"}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_exempt def add_data(request): """添加数据到ES""" try: data = json.loads(request.body.decode('utf-8')) success = insert_data(data) if success: return JsonResponse({"status": "success", "message": "数据添加成功"}) else: return JsonResponse({"status": "error", "message": "数据添加失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def search(request): """搜索数据""" try: query = request.GET.get('q', '') if not query: return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400) results = search_data(query) results = _filter_results_for_user(request, results) data = _attach_writer_names(_attach_image_urls(request, results)) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def fuzzy_search(request): """模糊搜索""" try: keyword = request.GET.get('keyword', '') if not keyword: return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400) results = search_by_any_field(keyword) results = _filter_results_for_user(request, results) data = _attach_writer_names(_attach_image_urls(request, results)) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def get_all_data(request): """获取所有数据""" try: session_user_id = request.session.get("user_id") if session_user_id is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) results = search_all() results = _filter_results_for_user(request, results) data = _attach_writer_names(_attach_image_urls(request, results)) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["DELETE"]) @csrf_exempt def delete_data(request, doc_id): """删除数据(需登录;管理员或作者本人)""" request_user=request.session.get("user_id") # request_admin=request.session.get("permisssion") if request_user is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: existing = get_by_id(doc_id) user_existing=get_user_by_id(request_user) if not existing: return JsonResponse({"status": "error", "message": "数据不存在"}, status=404) is_admin = int(user_existing.get('permission')) == 0 is_owner = str(existing.get("writer_id", "")) == str(request.session.get("user_id")) if not (is_admin or is_owner): return JsonResponse({"status": "error", "message": "无权限"}, status=403) success = delete_by_id(doc_id) if success: return JsonResponse({"status": "success", "message": "数据删除成功"}) else: return JsonResponse({"status": "error", "message": "数据删除失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["PUT"]) @csrf_exempt def update_data(request, doc_id): """更新数据(需登录;管理员或作者本人)""" request_user = request.session.get("user_id") if request_user is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: payload = json.loads(request.body.decode('utf-8')) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) try: existing = get_by_id(doc_id) user_existing = get_user_by_id(request_user) if not existing: return JsonResponse({"status": "error", "message": "数据不存在"}, status=404) is_admin = int(user_existing.get('permission')) == 0 is_owner = str(existing.get("writer_id", "")) == str(request.session.get("user_id")) if not (is_admin or is_owner): return JsonResponse({"status": "error", "message": "无权限"}, status=403) updated = {} if "writer_id" in payload: updated["writer_id"] = payload["writer_id"] if "image" in payload: img_val = payload["image"] if isinstance(img_val, list): updated["image"] = json_to_string(img_val) else: updated["image"] = img_val if "data" in payload: v = payload["data"] if isinstance(v, dict): updated["data"] = json.dumps(v, ensure_ascii=False) else: try: obj = json.loads(str(v)) updated["data"] = json.dumps(obj, ensure_ascii=False) except Exception: updated["data"] = str(v) success = update_by_id(doc_id, updated) if success: return JsonResponse({"status": "success", "message": "数据更新成功"}) else: return JsonResponse({"status": "error", "message": "数据更新失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def get_data(request, doc_id): """获取单个数据""" try: result = get_by_id(doc_id) if result: wrapped = dict(result) wrapped['image_url'] = _image_ref_to_url(request, wrapped.get('image', '')) return JsonResponse({"status": "success", "data": wrapped}) else: return JsonResponse({"status": "error", "message": "数据不存在"}, status=404) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_protect def add_user(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) username = (payload.get("username") or "").strip() password = (payload.get("password") or "").strip() try: permission = int(payload.get("permission", 1)) except Exception: permission = 1 if not username: return JsonResponse({"status": "error", "message": "用户名不能为空"}, status=400) if password and len(password) < 6: return JsonResponse({"status": "error", "message": "密码长度至少为6位"}, status=400) existing = get_user_by_username(username) if existing: return JsonResponse({"status": "error", "message": "用户名已存在"}, status=409) users = get_all_users() next_id = (max([int(u.get("user_id", 0)) for u in users]) + 1) if users else 1 ok = write_user_data({ "user_id": next_id, "username": username, "password": password, "permission": permission, }) if not ok: return JsonResponse({"status": "error", "message": "用户添加失败"}, status=500) return JsonResponse({"status": "success", "message": "用户添加成功"}) @require_http_methods(["GET"]) def get_users(request): uid = request.session.get("user_id") if uid is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: is_admin = int(request.session.get("permission", 1)) == 0 requester = get_user_by_id(uid) or {} mgr_keys = set(requester.get("manage_key") or []) q = (request.GET.get("search") or "").strip() users = get_all_users() if is_admin: filtered = users elif mgr_keys: def match_manage(user): ukeys = set(user.get("key") or []) return bool(ukeys & mgr_keys) filtered = [u for u in users if match_manage(u)] else: filtered = [u for u in users if str(u.get("user_id")) == str(uid)] if q: filtered = [u for u in filtered if q in str(u.get("username", ""))] return JsonResponse({"status": "success", "data": filtered}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_protect def update_user_by_id_view(request, user_id): uid = request.session.get("user_id") if uid is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) new_username = (payload.get("username") or "").strip() new_permission = payload.get("permission") new_password = (payload.get("password") or "").strip() if new_password and len(new_password) < 6: return JsonResponse({"status": "error", "message": "密码长度至少为6位"}, status=400) is_admin = int(request.session.get("permission", 1)) == 0 requester = get_user_by_id(uid) or {} target = get_user_by_id(user_id) or {} requester_mgr = set(requester.get("manage_key") or []) target_keys = set(target.get("key") or []) if is_admin: if new_username: other = get_user_by_username(new_username) if other and int(other.get("user_id", -1)) != int(user_id): return JsonResponse({"status": "error", "message": "用户名已存在"}, status=409) ok = es_update_user_by_id( user_id, username=new_username if new_username else None, permission=int(new_permission) if new_permission is not None else None, password=new_password if new_password else None, ) return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500) if str(uid) == str(user_id): if not new_password: return JsonResponse({"status": "error", "message": "仅允许修改密码"}, status=400) ok = es_update_user_by_id(user_id, password=new_password) return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500) if requester_mgr and (target_keys & requester_mgr): if not new_password or new_username or new_permission is not None: return JsonResponse({"status": "error", "message": "导师仅允许修改密码"}, status=403) ok = es_update_user_by_id(user_id, password=new_password) return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500) return JsonResponse({"status": "error", "message": "无权限"}, status=403) @require_http_methods(["POST"]) @csrf_protect def delete_user_by_id_view(request, user_id): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) ok = es_delete_user_by_id(user_id) if not ok: return JsonResponse({"status": "error", "message": "用户删除失败"}, status=500) return JsonResponse({"status": "success", "message": "用户删除成功"}) # 辅助:JSON 转换 def json_to_string(obj): if isinstance(obj, str): # 如果已经是字符串,尝试解析一下验证是否为有效 JSON,或者是普通文本 try: json.loads(obj) return obj # 是有效 JSON 字符串,直接返回 except Exception: # 不是 JSON 字符串,可能是普通文本,将其封装成 JSON return json.dumps(obj, ensure_ascii=False) try: return json.dumps(obj, ensure_ascii=False) except Exception: return str(obj) def string_to_json(s): try: return json.loads(s) except Exception: return {} # 移植自 a.py 的核心:调用大模型进行 OCR/信息抽取 def ocr_and_extract_info(image_path: str): # from openai import OpenAI def encode_image(path: str) -> str: with open(path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") base64_image = encode_image(image_path) # api_key = getattr(settings, "AISTUDIO_API_KEY", "188f57db3766e02ed2c7e18373996d84f4112272") # base_url = getattr(settings, "OPENAI_BASE_URL", "https://aistudio.baidu.com/llm/lmapi/v3") # if not api_key: # raise RuntimeError("缺少 AISTUDIO_API_KEY,请在环境变量或 settings 中配置") # api_key = getattr(settings, "AISTUDIO_API_KEY", "") # base_url = getattr(settings, "OPENAI_BASE_URL", "") # if not api_key or not base_url: # raise RuntimeError("缺少模型服务配置,请设置 AISTUDIO_API_KEY 与 OPENAI_BASE_URL") # client = OpenAI(api_key=api_key, base_url=base_url) # types = get_type_list() # chat_completion = client.chat.completions.create( # messages=[ # {"role": "system", "content": "你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。"}, # { # "role": "user", # "content": [ # {"type": "text", "text": f"请识别这张图片中的信息,将你认为重要的数据转换为不包含嵌套的json,不要显示其它信息以便于解析,直接输出json结果即可。使用“数据类型”字段表示这个东西的大致类型,除此之外你可以自行决定使用哪些json字段。“数据类型”的内容有严格规定,请查看{json.dumps(types, ensure_ascii=False)}中是否包含你所需要的类型,确定不包含后你才可以填入你觉得合适的大致分类。"}, # {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}, # ], # }, # ], # model="glm-5", # ) # response_text = chat_completion.choices[0].message.content from zai import ZhipuAiClient import httpx # api_key = ( # getattr(settings, "ZHIPU_API_KEY", "") # or getattr(settings, "ZAI_API_KEY", "") # or getattr(settings, "AISTUDIO_API_KEY", "") # ) # if not api_key: # raise RuntimeError("缺少模型服务配置,请设置 ZHIPU_API_KEY") # base_url = ( # getattr(settings, "ZHIPU_BASE_URL", "") # or getattr(settings, "ZAI_BASE_URL", "") # or "https://open.bigmodel.cn/api/paas/v4/" # ) client = ZhipuAiClient(api_key="fb83a3f91e8c4e45af811236548765a2.cX4kUhigHm7VNowf") types = get_type_list() chat_completion = client.chat.completions.create( messages=[ {"role": "system", "content": "你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。"}, { "role": "user", "content": [ {"type": "text", "text": f"请识别这张图片中的信息,将你认为重要的数据转换为不包含嵌套的json,不要显示其它信息以便于解析,直接输出json结果即可。使用“数据类型”字段表示这个东西的大致类型,除此之外你可以自行决定使用哪些json字段。“数据类型”的内容有严格规定,请查看{json.dumps(types, ensure_ascii=False)}中是否包含你所需要的类型,确定不包含后你才可以填入你觉得合适的大致分类。"}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}, ], }, ], model="glm-4.6v", ) response_text = chat_completion.choices[0].message.content def parse_response(text: str): try: result = json.loads(text) if result: return result except json.JSONDecodeError: pass m = re.search(r"```json\n(.*?)```", text, re.DOTALL) if m: try: result = json.loads(m.group(1)) if result: return result except json.JSONDecodeError: pass try: fixed = text.replace("'", '"') result = json.loads(fixed) if result: return result except json.JSONDecodeError: pass return None return parse_response(response_text) @require_http_methods(["GET"]) @ensure_csrf_cookie def upload_page(request): session_user_id = request.session.get("user_id") if session_user_id is None: from django.shortcuts import redirect return redirect("/accounts/login/") user_id_qs = request.GET.get("user_id") me = get_user_by_id(session_user_id) or {} context = { "user_id": user_id_qs or session_user_id, "username": me.get("username"), } return render(request, "elastic/upload.html", context) # 上传并识别(不入库) @require_http_methods(["POST"]) def upload(request): if request.session.get("user_id") is None: fallback_uid = request.POST.get("user_id") or request.GET.get("user_id") if fallback_uid: request.session["user_id"] = fallback_uid request.session.setdefault("permission", 1) else: return JsonResponse({"status": "error", "message": "未登录"}, status=401) files = request.FILES.getlist("file") if not files: one = request.FILES.get("file") if one: files = [one] if not files: return JsonResponse({"status": "error", "message": "未选择文件"}, status=400) images_dir = os.path.join(settings.MEDIA_ROOT, "images") os.makedirs(images_dir, exist_ok=True) rel_paths = [] image_urls = [] data_list = [] # 预处理文件列表,处理PDF转换 processed_files = [] for f in files: if f.name.lower().endswith('.pdf'): if not HAS_PDF_SUPPORT: return JsonResponse({"status": "error", "message": f"服务器未安装PDF处理组件(PyMuPDF): {PDF_ERROR}"}, status=500) # 将PDF保存到临时文件 with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp: for chunk in f.chunks(): tmp.write(chunk) tmp_path = tmp.name try: # 转换PDF为图片列表 doc = fitz.open(tmp_path) for i in range(len(doc)): page = doc.load_page(i) # 降低 DPI 从 200 到 150,在保证准确率的同时显著减小图片体积,加快上传速度 pix = page.get_pixmap(dpi=150) img_filename = f"{uuid.uuid4()}_page_{i+1}.jpg" img_abs_path = os.path.join(images_dir, img_filename) pix.save(img_abs_path) processed_files.append((img_abs_path, img_filename, f"{f.name}_p{i+1}")) doc.close() except Exception as e: return JsonResponse({"status": "error", "message": f"PDF转换失败: {str(e)}"}, status=500) finally: if os.path.exists(tmp_path): os.remove(tmp_path) else: # 普通图片处理 filename = f"{uuid.uuid4()}_{f.name}" abs_path = os.path.join(images_dir, filename) with open(abs_path, "wb") as dst: for chunk in f.chunks(): dst.write(chunk) processed_files.append((abs_path, filename, f.name)) # 使用线程池并行调用 OCR 接口以提升速度 def run_ocr(file_info): abs_path, filename, original_name = file_info try: data = ocr_and_extract_info(abs_path) return (data, filename) except Exception as e: return (e, filename) rel_paths = [] image_urls = [] data_list = [] # 限制最大线程数为 8,避免过多并发导致 API 限制或资源耗尽 with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: future_to_file = {executor.submit(run_ocr, f_info): f_info for f_info in processed_files} for future in concurrent.futures.as_completed(future_to_file): result, filename = future.result() if isinstance(result, Exception): return JsonResponse({"status": "error", "message": f"文件 {filename} 识别失败: {str(result)}"}, status=500) if result: data_list.append(result) rel_path = f"images/{filename}" rel_paths.append(rel_path) image_urls.append(request.build_absolute_uri(settings.MEDIA_URL + rel_path)) if not data_list: return JsonResponse({"status": "error", "message": "无法识别图片内容"}, status=400) merged = {} for item in data_list: if not isinstance(item, dict): continue for k, v in item.items(): key = str(k).strip() if not key: continue if key not in merged or merged.get(key) in (None, ''): merged[key] = v continue if merged.get(key) == v: continue base = key idx = 2 while f"{base}_{idx}" in merged: idx += 1 merged[f"{base}_{idx}"] = v return JsonResponse({ "status": "success", "message": "识别成功,请确认数据后点击录入", "data": merged, "images": rel_paths, "image_urls": image_urls, "image": rel_paths[0] if rel_paths else "", "image_url": image_urls[0] if image_urls else "", }) # 确认并入库 @require_http_methods(["POST"]) def confirm(request): if request.session.get("user_id") is None: # 允许从payload中带入user_id作为后备(便于前端已知用户时继续操作) try: payload_for_uid = json.loads(request.body.decode("utf-8")) except Exception: payload_for_uid = {} fb_uid = (payload_for_uid or {}).get("user_id") if fb_uid: request.session["user_id"] = fb_uid request.session.setdefault("permission", 1) else: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: payload = json.loads(request.body.decode("utf-8")) except json.JSONDecodeError: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) edited = payload.get("data") or {} image_rel = payload.get("image") or "" if not isinstance(edited, dict) or not edited: return JsonResponse({"status": "error", "message": "数据不能为空"}, status=400) ensure_type_in_list(edited.get("数据类型")) image_ref_to_store = "" temp_files_to_delete = [] image_rels = _parse_image_refs(image_rel) if image_rels: images_dir = os.path.join(settings.MEDIA_ROOT, "images") os.makedirs(images_dir, exist_ok=True) image_refs = [] for rel in image_rels: src_abs = os.path.join(settings.MEDIA_ROOT, rel) if not os.path.isfile(src_abs): return JsonResponse({"status": "error", "message": "图片文件不存在"}, status=400) webp_name = f"{uuid.uuid4().hex}.webp" webp_abs = os.path.join(images_dir, webp_name) try: with Image.open(src_abs) as im: if im.mode in ("RGBA", "LA", "P"): im = im.convert("RGBA") else: im = im.convert("RGB") im.save(webp_abs, format="WEBP", quality=80) except Exception: try: if os.path.isfile(webp_abs): os.remove(webp_abs) except Exception: pass return JsonResponse({"status": "error", "message": "图片转换WEBP失败"}, status=500) try: object_name = f"images/{webp_name}" from minio_storage.minio_connect import upload_file, is_minio_configured if is_minio_configured(): upload_file(webp_abs, object_name, content_type="image/webp") image_refs.append(f"minio:{object_name}") temp_files_to_delete.extend([src_abs, webp_abs]) else: # Fallback to local storage image_refs.append(f"local:{object_name}") # In local case, we keep the webp file and only delete the original temporary file temp_files_to_delete.append(src_abs) except Exception as e: return JsonResponse({"status": "error", "message": f"存储图片失败: {e}"}, status=500) if len(image_refs) == 1: image_ref_to_store = image_refs[0] elif len(image_refs) > 1: image_ref_to_store = json_to_string(image_refs) to_store = { "writer_id": str(request.session.get("user_id")), "data": json_to_string(edited), "image": image_ref_to_store, } ok = insert_data(to_store) if not ok: return JsonResponse({"status": "error", "message": "写入ES失败"}, status=500) try: for p in temp_files_to_delete: if p and os.path.isfile(p): os.remove(p) except Exception: pass return JsonResponse({"status": "success", "message": "数据录入成功", "data": edited}) @require_http_methods(["GET"]) @ensure_csrf_cookie def manage_page(request): session_user_id = request.session.get("user_id") if session_user_id is None: from django.shortcuts import redirect return redirect("/accounts/login/") me = get_user_by_id(session_user_id) or {} is_admin = int(request.session.get("permission", 1)) == 0 if is_admin: raw_results = search_all() else: uid = str(session_user_id) manage_keys = me.get("manage_key", []) or [] all_data = search_all() raw_results = [] for r in all_data: # 1. 自己的提交 if str(r.get("writer_id", "")) == uid: raw_results.append(r) continue # 2. 管理的提交 (检查 data 中是否包含 manage_key) if manage_keys: r_data = str(r.get("data", "")) for mk in manage_keys: if mk and str(mk) in r_data: raw_results.append(r) break return render( request, "elastic/manage.html", { "is_admin": is_admin, "user_id": session_user_id, "username": me.get("username"), }, ) # 规范化键,避免模板点号访问下划线前缀字段 results = [] for r in raw_results: results.append({ "id": r.get("_id", ""), "writer_id": r.get("writer_id", ""), "image": r.get("image", ""), "data": r.get("data", ""), }) user_id_qs = request.GET.get("user_id") context = {"items": results, "user_id": user_id_qs or session_user_id} return render(request, "elastic/manage.html", context) @require_http_methods(["GET"]) def analytics_trend_view(request): try: gte = request.GET.get("from") lte = request.GET.get("to") interval = request.GET.get("interval", "day") data = es_analytics_trend(gte=gte, lte=lte, interval=interval) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def analytics_types_view(request): try: gte = request.GET.get("from") lte = request.GET.get("to") size = request.GET.get("size") try: size_int = int(size) if size is not None else 10 except Exception: size_int = 10 data = es_analytics_types(gte=gte, lte=lte, size=size_int) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def analytics_types_trend_view(request): try: gte = request.GET.get("from") lte = request.GET.get("to") interval = request.GET.get("interval", "week") size = request.GET.get("size") try: size_int = int(size) if size is not None else 8 except Exception: size_int = 8 data = es_analytics_types_trend(gte=gte, lte=lte, interval=interval, size=size_int) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def analytics_recent_view(request): try: limit = request.GET.get("limit") gte = request.GET.get("from") lte = request.GET.get("to") try: limit_int = int(limit) if limit is not None else 10 except Exception: limit_int = 10 data = es_analytics_recent(limit=limit_int, gte=gte, lte=lte) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_protect def remove_key_view(request): try: payload = json.loads(request.body.decode("utf-8")) key_to_remove = payload.get("key") if not key_to_remove: return JsonResponse({"status": "error", "message": "缺少key参数"}, status=400) from .es_connect import delete_key_globally ok, count = delete_key_globally(key_to_remove) if ok: return JsonResponse({"status": "success", "message": f"已成功全局删除 Key '{key_to_remove}',并同步清理了 {count} 个注册码。"}) else: return JsonResponse({"status": "error", "message": "删除失败"}, status=500) except json.JSONDecodeError: return HttpResponseBadRequest("Invalid JSON") except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) @ensure_csrf_cookie def user_manage(request): session_user_id = request.session.get("user_id") if session_user_id is None: from django.shortcuts import redirect return redirect("/accounts/login/") is_admin = int(request.session.get("permission", 1)) == 0 me = get_user_by_id(session_user_id) or {} has_manage = bool(me.get("manage_key")) user_id_qs = request.GET.get("user_id") context = { "user_id": user_id_qs or session_user_id, "username": me.get("username"), "is_admin": is_admin, "is_tutor": (not is_admin) and has_manage, "is_student": (not is_admin) and (not has_manage), } return render(request, "elastic/users.html", context) @require_http_methods(["GET"]) @ensure_csrf_cookie def registration_code_manage_page(request): session_user_id = request.session.get("user_id") if session_user_id is None: from django.shortcuts import redirect return redirect("/accounts/login/") if int(request.session.get("permission", 1)) != 0: from django.shortcuts import redirect return redirect("/main/home/") user_id_qs = request.GET.get("user_id") me = get_user_by_id(session_user_id) or {} context = { "user_id": user_id_qs or session_user_id, "username": me.get("username"), } return render(request, "elastic/registration_codes.html", context) @require_http_methods(["GET"]) def get_keys_list_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) lst = get_keys_list() return JsonResponse({"status": "success", "data": lst}) @require_http_methods(["POST"]) @csrf_protect def add_key_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) key_name = (payload.get("key") or "").strip() if not key_name: return JsonResponse({"status": "error", "message": "key不能为空"}, status=400) ok = ensure_key_in_list(key_name) if not ok: return JsonResponse({"status": "error", "message": "key已存在或写入失败"}, status=409) return JsonResponse({"status": "success"}) @require_http_methods(["POST"]) @csrf_protect def generate_registration_code_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) keys = list(payload.get("keys") or []) manage_keys = list(payload.get("manage_keys") or []) try: days = int(payload.get("expires_in_days", 30)) except Exception: days = 30 result = generate_registration_code(keys=keys, manage_keys=manage_keys, expires_in_days=days, created_by=request.session.get("user_id")) if not result: return JsonResponse({"status": "error", "message": "生成失败"}, status=500) return JsonResponse({"status": "success", "data": result}) @require_http_methods(["GET"]) def list_registration_codes_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) data = list_registration_codes() return JsonResponse({"status": "success", "data": data}) @require_http_methods(["POST"]) @csrf_protect def revoke_registration_code_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) code = (payload.get("code") or "").strip() if not code: return JsonResponse({"status": "error", "message": "缺少code"}, status=400) ok = revoke_registration_code(code) if not ok: return JsonResponse({"status": "error", "message": "作废失败"}, status=500) return JsonResponse({"status": "success"})