""" ES相关的API视图 """ import os import re import uuid import base64 import json import csv import io import mimetypes try: import openpyxl from openpyxl.utils import get_column_letter from openpyxl.drawing.image import Image as XLImage HAS_OPENPYXL = True except ImportError: HAS_OPENPYXL = False from datetime import datetime, timezone, timedelta import tempfile import concurrent.futures from django.conf import settings from django.http import JsonResponse, HttpResponse from django.shortcuts import render from django.views.decorators.http import require_http_methods from django.views.decorators.csrf import ensure_csrf_cookie from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie, csrf_protect from .es_connect import * from .es_connect import update_user_by_id as es_update_user_by_id, delete_user_by_id as es_delete_user_by_id from .es_connect import ( analytics_trend as es_analytics_trend, analytics_types as es_analytics_types, analytics_types_trend as es_analytics_types_trend, analytics_recent as es_analytics_recent, ) from PIL import Image try: import fitz # PyMuPDF HAS_PDF_SUPPORT = True PDF_ERROR = "" except ImportError as e: try: import pymupdf fitz = pymupdf HAS_PDF_SUPPORT = True PDF_ERROR = "" except ImportError: HAS_PDF_SUPPORT = False PDF_ERROR = str(e) MAX_SINGLE_UPLOAD_COUNT = int(getattr(settings, "MAX_SINGLE_UPLOAD_COUNT", 3)) def _filter_results_for_user(request, results): session_user_id = request.session.get("user_id") if session_user_id is None: return [] me = get_user_by_id(session_user_id) or {} is_admin = int(request.session.get("permission", 1)) == 0 if is_admin: return results uid = str(session_user_id) manage_keys = me.get("manage_key", []) or [] manage_keys_set = {str(k).strip() for k in manage_keys if str(k).strip()} writer_keys_by_id = None if manage_keys_set: try: users = get_all_users() or [] except Exception: users = [] writer_keys_by_id = {} for u in users: try: u_id = str(u.get("user_id", "")).strip() except Exception: u_id = "" if not u_id: continue try: u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()} except Exception: u_keys = set() writer_keys_by_id[u_id] = u_keys filtered = [] for r in results: # 1. 自己的提交 if str(r.get("writer_id", "")) == uid: filtered.append(r) continue # 2. 管理的提交:优先按“作者的 key 与我的 manage_key 交集”判断;缺失时回退为 data 字符串包含判断 if manage_keys_set: writer_id = str(r.get("writer_id", "")).strip() writer_keys = (writer_keys_by_id or {}).get(writer_id) if writer_keys and (writer_keys & manage_keys_set): filtered.append(r) continue r_data = str(r.get("data", "")) for mk in manage_keys_set: if mk and mk in r_data: filtered.append(r) break return filtered def _image_ref_to_url(request, image_ref: str) -> str: s = str(image_ref or '').strip() if not s: return '' if s.startswith('minio:'): object_name = s[len('minio:'):].lstrip('/') if not object_name: return '' try: from minio_storage.minio_connect import presigned_get_url return presigned_get_url(object_name, expires_seconds=8 * 60 * 60) except Exception: return '' if s.startswith('local:'): rel_path = s[len('local:'):].lstrip('/') if not rel_path: return '' return request.build_absolute_uri(settings.MEDIA_URL + rel_path) return '' def _parse_image_refs(image_ref): if not image_ref: return [] if isinstance(image_ref, (list, tuple)): return [str(x) for x in image_ref if str(x).strip()] if isinstance(image_ref, str): s = image_ref.strip() if not s: return [] parsed = None if s[:1] in ('[', '"'): try: parsed = json.loads(s) except Exception: parsed = None if isinstance(parsed, list): return [str(x) for x in parsed if str(x).strip()] if isinstance(parsed, str): s = parsed.strip() return [s] if s else [] return [] def _attach_image_urls(request, items): out = [] for it in list(items or []): try: d = dict(it or {}) except Exception: continue refs = _parse_image_refs(d.get('image', '')) urls = [_image_ref_to_url(request, r) for r in refs if str(r).strip()] urls = [u for u in urls if u] d['image_urls'] = urls d['image_url'] = urls[0] if urls else _image_ref_to_url(request, d.get('image', '')) out.append(d) return out def _attach_writer_names(items): users = get_all_users() name_by_id = {str(u.get("user_id")): u.get("username") for u in (users or [])} out = [] for it in list(items or []): try: d = dict(it or {}) except Exception: continue wid = d.get("writer_id") d["writer_name"] = name_by_id.get(str(wid), str(wid) if wid is not None else "") out.append(d) return out @require_http_methods(["GET", "POST"]) @csrf_exempt def init_index(request): """初始化ES索引""" print("⚠️ init_index 被调用了!") try: create_index_with_mapping() return JsonResponse({"status": "success", "message": "索引初始化成功"}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_exempt def add_data(request): """添加数据到ES""" try: data = json.loads(request.body.decode('utf-8')) success = insert_data(data) if success: return JsonResponse({"status": "success", "message": "数据添加成功"}) else: return JsonResponse({"status": "error", "message": "数据添加失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def search(request): """搜索数据""" try: query = request.GET.get('q', '') if not query: return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400) results = search_data(query) results = _filter_results_for_user(request, results) data = _attach_writer_names(_attach_image_urls(request, results)) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def fuzzy_search(request): """模糊搜索""" try: keyword = request.GET.get('keyword', '') if not keyword: return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400) results = search_by_any_field(keyword) results = _filter_results_for_user(request, results) data = _attach_writer_names(_attach_image_urls(request, results)) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def get_all_data(request): """获取所有数据""" try: session_user_id = request.session.get("user_id") if session_user_id is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) results = search_all() results = _filter_results_for_user(request, results) data = _attach_writer_names(_attach_image_urls(request, results)) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def filter_by_key(request): try: session_user_id = request.session.get("user_id") if session_user_id is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) key = (request.GET.get("key") or "").strip() results = search_all() results = _filter_results_for_user(request, results) if not key: data = _attach_writer_names(_attach_image_urls(request, results)) return JsonResponse({"status": "success", "data": data}) selected = str(key).strip() try: users = get_all_users() or [] except Exception: users = [] writer_keys_by_id = {} for u in users: try: u_id = str(u.get("user_id", "")).strip() except Exception: u_id = "" if not u_id: continue try: u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()} except Exception: u_keys = set() writer_keys_by_id[u_id] = u_keys filtered = [] for r in results: writer_id = str(r.get("writer_id", "")).strip() writer_keys = writer_keys_by_id.get(writer_id) if writer_keys and selected in writer_keys: filtered.append(r) continue if selected and selected in str(r.get("data", "")): filtered.append(r) data = _attach_writer_names(_attach_image_urls(request, filtered)) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["DELETE"]) @csrf_exempt def delete_data(request, doc_id): """删除数据(需登录;管理员或作者本人)""" request_user=request.session.get("user_id") # request_admin=request.session.get("permisssion") if request_user is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: existing = get_by_id(doc_id) user_existing=get_user_by_id(request_user) if not existing: return JsonResponse({"status": "error", "message": "数据不存在"}, status=404) is_admin = int(user_existing.get('permission')) == 0 is_owner = str(existing.get("writer_id", "")) == str(request.session.get("user_id")) if not (is_admin or is_owner): return JsonResponse({"status": "error", "message": "无权限"}, status=403) success = delete_by_id(doc_id) if success: return JsonResponse({"status": "success", "message": "数据删除成功"}) else: return JsonResponse({"status": "error", "message": "数据删除失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["PUT"]) @csrf_exempt def update_data(request, doc_id): """更新数据(需登录;管理员或作者本人)""" request_user = request.session.get("user_id") if request_user is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: payload = json.loads(request.body.decode('utf-8')) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) try: existing = get_by_id(doc_id) user_existing = get_user_by_id(request_user) if not existing: return JsonResponse({"status": "error", "message": "数据不存在"}, status=404) is_admin = int(user_existing.get('permission')) == 0 is_owner = str(existing.get("writer_id", "")) == str(request.session.get("user_id")) if not (is_admin or is_owner): return JsonResponse({"status": "error", "message": "无权限"}, status=403) updated = {} if "writer_id" in payload: updated["writer_id"] = payload["writer_id"] if "image" in payload: img_val = payload["image"] if isinstance(img_val, list): updated["image"] = json_to_string(img_val) else: updated["image"] = img_val if "data" in payload: v = payload["data"] if isinstance(v, dict): updated["data"] = json.dumps(v, ensure_ascii=False) else: try: obj = json.loads(str(v)) updated["data"] = json.dumps(obj, ensure_ascii=False) except Exception: updated["data"] = str(v) success = update_by_id(doc_id, updated) if success: return JsonResponse({"status": "success", "message": "数据更新成功"}) else: return JsonResponse({"status": "error", "message": "数据更新失败"}, status=500) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def get_data(request, doc_id): """获取单个数据""" try: result = get_by_id(doc_id) if result: wrapped = dict(result) wrapped['image_url'] = _image_ref_to_url(request, wrapped.get('image', '')) return JsonResponse({"status": "success", "data": wrapped}) else: return JsonResponse({"status": "error", "message": "数据不存在"}, status=404) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_protect def add_user(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) username = (payload.get("username") or "").strip() password = (payload.get("password") or "").strip() try: permission = int(payload.get("permission", 1)) except Exception: permission = 1 if not username: return JsonResponse({"status": "error", "message": "用户名不能为空"}, status=400) if password and len(password) < 6: return JsonResponse({"status": "error", "message": "密码长度至少为6位"}, status=400) existing = get_user_by_username(username) if existing: return JsonResponse({"status": "error", "message": "用户名已存在"}, status=409) users = get_all_users() next_id = (max([int(u.get("user_id", 0)) for u in users]) + 1) if users else 1 ok = write_user_data({ "user_id": next_id, "username": username, "password": password, "permission": permission, }) if not ok: return JsonResponse({"status": "error", "message": "用户添加失败"}, status=500) return JsonResponse({"status": "success", "message": "用户添加成功"}) @require_http_methods(["GET"]) def get_users(request): uid = request.session.get("user_id") if uid is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: is_admin = int(request.session.get("permission", 1)) == 0 requester = get_user_by_id(uid) or {} mgr_keys = set(requester.get("manage_key") or []) key_q = (request.GET.get("key") or "").strip() q = (request.GET.get("search") or "").strip() users = get_all_users() if is_admin: filtered = users elif mgr_keys: def match_manage(user): ukeys = set(user.get("key") or []) return bool(ukeys & mgr_keys) filtered = [u for u in users if match_manage(u)] else: filtered = [u for u in users if str(u.get("user_id")) == str(uid)] if key_q: k = str(key_q).strip() def match_key(user): try: ukeys = {str(x).strip() for x in (user.get("key") or []) if str(x).strip()} except Exception: ukeys = set() try: umkeys = {str(x).strip() for x in (user.get("manage_key") or []) if str(x).strip()} except Exception: umkeys = set() return (k in ukeys) or (k in umkeys) filtered = [u for u in filtered if match_key(u)] if q: filtered = [u for u in filtered if q in str(u.get("username", ""))] return JsonResponse({"status": "success", "data": filtered}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_protect def update_user_by_id_view(request, user_id): uid = request.session.get("user_id") if uid is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) new_username = (payload.get("username") or "").strip() new_permission = payload.get("permission") new_password = (payload.get("password") or "").strip() raw_keys = payload.get("key", None) raw_manage_keys = payload.get("manage_key", None) if new_password and len(new_password) < 6: return JsonResponse({"status": "error", "message": "密码长度至少为6位"}, status=400) is_admin = int(request.session.get("permission", 1)) == 0 requester = get_user_by_id(uid) or {} target = get_user_by_id(user_id) or {} requester_mgr = set(requester.get("manage_key") or []) target_keys = set(target.get("key") or []) def normalize_keys(v): if v is None: return None if isinstance(v, list): parts = v elif isinstance(v, str): parts = re.split(r"[,,;;、\r\n]+", v) else: parts = [v] out = [] seen = set() for p in parts: s = str(p).strip().strip(";") if not s or s in seen: continue seen.add(s) out.append(s) return out new_keys = normalize_keys(raw_keys) new_manage_keys = normalize_keys(raw_manage_keys) if is_admin: if new_username: other = get_user_by_username(new_username) if other and int(other.get("user_id", -1)) != int(user_id): return JsonResponse({"status": "error", "message": "用户名已存在"}, status=409) ok = es_update_user_by_id( user_id, username=new_username if new_username else None, permission=int(new_permission) if new_permission is not None else None, password=new_password if new_password else None, key=new_keys, manage_key=new_manage_keys, ) return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500) if str(uid) == str(user_id): if not new_password: return JsonResponse({"status": "error", "message": "仅允许修改密码"}, status=400) ok = es_update_user_by_id(user_id, password=new_password) return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500) if requester_mgr and (target_keys & requester_mgr): if new_username or new_permission is not None or new_manage_keys is not None: return JsonResponse({"status": "error", "message": "无权限"}, status=403) if not new_password and new_keys is None: return JsonResponse({"status": "error", "message": "缺少更新内容"}, status=400) merged_keys = None if new_keys is not None: try: new_keys_set = set(new_keys) except Exception: return JsonResponse({"status": "error", "message": "无权限"}, status=403) requester_locked = set(normalize_keys(requester.get("key")) or []) if new_keys_set & requester_locked: return JsonResponse({"status": "error", "message": "无权限"}, status=403) existing_keys = [str(v).strip() for v in list(target.get("key") or []) if str(v).strip()] preserved = [k for k in existing_keys if k in requester_locked] merged_keys = preserved + [str(v).strip() for v in list(new_keys) if str(v).strip() and str(v).strip() not in requester_locked] seen = set() merged_keys = [k for k in merged_keys if not (k in seen or seen.add(k))] ok = es_update_user_by_id(user_id, password=new_password if new_password else None, key=merged_keys) return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500) return JsonResponse({"status": "error", "message": "无权限"}, status=403) @require_http_methods(["POST"]) @csrf_protect def delete_user_by_id_view(request, user_id): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) != 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) ok = es_delete_user_by_id(user_id) if not ok: return JsonResponse({"status": "error", "message": "用户删除失败"}, status=500) return JsonResponse({"status": "success", "message": "用户删除成功"}) # 辅助:JSON 转换 def json_to_string(obj): if isinstance(obj, str): # 如果已经是字符串,尝试解析一下验证是否为有效 JSON,或者是普通文本 try: json.loads(obj) return obj # 是有效 JSON 字符串,直接返回 except Exception: # 不是 JSON 字符串,可能是普通文本,将其封装成 JSON return json.dumps(obj, ensure_ascii=False) try: return json.dumps(obj, ensure_ascii=False) except Exception: return str(obj) def string_to_json(s): try: return json.loads(s) except Exception: return {} # 移植自 a.py 的核心:调用大模型进行 OCR/信息抽取 def ocr_and_extract_info(image_path: str): # from openai import OpenAI def encode_image(path: str) -> str: with open(path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") base64_image = encode_image(image_path) mime_type = mimetypes.guess_type(image_path)[0] or "image/jpeg" # api_key = getattr(settings, "AISTUDIO_API_KEY", "188f57db3766e02ed2c7e18373996d84f4112272") # base_url = getattr(settings, "OPENAI_BASE_URL", "https://aistudio.baidu.com/llm/lmapi/v3") # if not api_key: # raise RuntimeError("缺少 AISTUDIO_API_KEY,请在环境变量或 settings 中配置") # api_key = getattr(settings, "AISTUDIO_API_KEY", "") # base_url = getattr(settings, "OPENAI_BASE_URL", "") # if not api_key or not base_url: # raise RuntimeError("缺少模型服务配置,请设置 AISTUDIO_API_KEY 与 OPENAI_BASE_URL") # client = OpenAI(api_key=api_key, base_url=base_url) # types = get_type_list() # chat_completion = client.chat.completions.create( # messages=[ # {"role": "system", "content": "你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。"}, # { # "role": "user", # "content": [ # {"type": "text", "text": f"请识别这张图片中的信息,将你认为重要的数据转换为不包含嵌套的json,不要显示其它信息以便于解析,直接输出json结果即可。使用“数据类型”字段表示这个东西的大致类型,除此之外你可以自行决定使用哪些json字段。“数据类型”的内容有严格规定,请查看{json.dumps(types, ensure_ascii=False)}中是否包含你所需要的类型,确定不包含后你才可以填入你觉得合适的大致分类。"}, # {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}, # ], # }, # ], # model="glm-5", # ) # response_text = chat_completion.choices[0].message.content from zai import ZhipuAiClient import httpx # api_key = ( # getattr(settings, "ZHIPU_API_KEY", "") # or getattr(settings, "ZAI_API_KEY", "") # or getattr(settings, "AISTUDIO_API_KEY", "") # ) # if not api_key: # raise RuntimeError("缺少模型服务配置,请设置 ZHIPU_API_KEY") # base_url = ( # getattr(settings, "ZHIPU_BASE_URL", "") # or getattr(settings, "ZAI_BASE_URL", "") # or "https://open.bigmodel.cn/api/paas/v4/" # ) client = ZhipuAiClient(api_key="982e9dc9d27a4f2f9db7e9effdb3c484.OfiJCdncJnTn0Q9h") types = get_type_list() chat_completion = client.chat.completions.create( messages=[ {"role": "system", "content": "你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。"}, { "role": "user", "content": [ {"type": "text", "text": f"请识别这张图片中的信息,将你认为重要的数据转换为不包含嵌套的json,不要显示其它信息以便于解析,直接输出json结果即可。使用“数据类型”字段表示这个东西的大致类型,除此之外你可以自行决定使用哪些json字段。“数据类型”的内容有严格规定,请查看{json.dumps(types, ensure_ascii=False)}中是否包含你所需要的类型,确定不包含后你才可以填入你觉得合适的大致分类。"}, {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{base64_image}"}}, ], }, ], model="glm-4.6v", ) response_text = chat_completion.choices[0].message.content def parse_response(text: str): try: result = json.loads(text) if result: return result except json.JSONDecodeError: pass m = re.search(r"```json\n(.*?)```", text, re.DOTALL) if m: try: result = json.loads(m.group(1)) if result: return result except json.JSONDecodeError: pass try: fixed = text.replace("'", '"') result = json.loads(fixed) if result: return result except json.JSONDecodeError: pass return None return parse_response(response_text) @require_http_methods(["GET"]) @ensure_csrf_cookie def upload_page(request): session_user_id = request.session.get("user_id") if session_user_id is None: from django.shortcuts import redirect return redirect("/accounts/login/") user_id_qs = request.GET.get("user_id") me = get_user_by_id(session_user_id) or {} context = { "user_id": user_id_qs or session_user_id, "username": me.get("username"), "max_single_upload_count": MAX_SINGLE_UPLOAD_COUNT, } return render(request, "elastic/upload.html", context) # 上传并识别(不入库) @require_http_methods(["POST"]) def upload(request): try: if request.session.get("user_id") is None: fallback_uid = request.POST.get("user_id") or request.GET.get("user_id") if fallback_uid: request.session["user_id"] = fallback_uid request.session.setdefault("permission", 1) else: return JsonResponse({"status": "error", "message": "未登录"}, status=401) files = request.FILES.getlist("file") if not files: one = request.FILES.get("file") if one: files = [one] if not files: return JsonResponse({"status": "error", "message": "未选择文件"}, status=400) if len(files) > MAX_SINGLE_UPLOAD_COUNT: return JsonResponse( { "status": "error", "message": f"单次最多上传 {MAX_SINGLE_UPLOAD_COUNT} 个文件,请分批上传", }, status=400, ) images_dir = os.path.join(settings.MEDIA_ROOT, "images") os.makedirs(images_dir, exist_ok=True) file_results = [] for f in files: group_images = [] is_pdf = f.name.lower().endswith('.pdf') if is_pdf: if not HAS_PDF_SUPPORT: return JsonResponse({"status": "error", "message": f"服务器未安装PDF处理组件(PyMuPDF): {PDF_ERROR}"}, status=500) with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp: for chunk in f.chunks(): tmp.write(chunk) tmp_path = tmp.name try: doc = fitz.open(tmp_path) for i in range(len(doc)): page = doc.load_page(i) pix = page.get_pixmap(dpi=150) img_filename = f"{uuid.uuid4()}_page_{i+1}.jpg" img_abs_path = os.path.join(images_dir, img_filename) pix.save(img_abs_path) group_images.append((img_abs_path, img_filename)) doc.close() except Exception as e: return JsonResponse({"status": "error", "message": f"PDF {f.name} 转换失败: {str(e)}"}, status=500) finally: if os.path.exists(tmp_path): os.remove(tmp_path) else: filename = f"{uuid.uuid4()}_{f.name}" abs_path = os.path.join(images_dir, filename) with open(abs_path, "wb") as dst: for chunk in f.chunks(): dst.write(chunk) group_images.append((abs_path, filename)) def run_ocr(img_info): abs_p, fname = img_info try: data = ocr_and_extract_info(abs_p) return data, None except Exception as e: return None, f"{fname}: {str(e)}" group_data_list = [] group_errors = [] with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(group_images), 8)) as executor: futures = [executor.submit(run_ocr, img_info) for img_info in group_images] for future in concurrent.futures.as_completed(futures): res, err = future.result() if res: group_data_list.append(res) elif err: group_errors.append(err) merged_group_data = {} for item in group_data_list: if not isinstance(item, dict): continue for k, v in item.items(): key = str(k).strip() if not key: continue if key not in merged_group_data or merged_group_data.get(key) in (None, ''): merged_group_data[key] = v elif merged_group_data.get(key) != v: base = key idx = 2 while f"{base}_{idx}" in merged_group_data: idx += 1 merged_group_data[f"{base}_{idx}"] = v if not merged_group_data: merged_group_data = { "文件名": f.name, "提示": "未识别到具体内容" if not group_errors else "识别失败", } if group_errors: merged_group_data["错误信息"] = ";".join(group_errors[:3]) rel_paths = [f"images/{img[1]}" for img in group_images] # 改进:如果配置了 MinIO,则在上传阶段就同步到 MinIO,确保在线版本待处理列表能显示图片 image_urls = [] from minio_storage.minio_connect import is_minio_configured, upload_file, presigned_get_url minio_enabled = is_minio_configured() for rp in rel_paths: abs_p = os.path.join(settings.MEDIA_ROOT, rp) if minio_enabled: try: # 上传到 MinIO upload_file(abs_p, rp) # 生成预签名 URL url = presigned_get_url(rp) image_urls.append(url) except Exception as e: print(f"上传临时图片到 MinIO 失败: {e}") image_urls.append(request.build_absolute_uri(settings.MEDIA_URL + rp)) else: image_urls.append(request.build_absolute_uri(settings.MEDIA_URL + rp)) file_results.append({ "name": f.name, "data": merged_group_data, "images": rel_paths, "image_urls": image_urls, }) return JsonResponse({ "status": "success", "message": f"成功处理 {len(file_results)} 个文件,请确认数据后点击录入", "items": file_results, }) except Exception as e: return JsonResponse({"status": "error", "message": str(e) or "上传失败"}, status=500) # 确认并入库 @require_http_methods(["POST"]) def confirm(request): if request.session.get("user_id") is None: try: payload_for_uid = json.loads(request.body.decode("utf-8")) except Exception: payload_for_uid = {} fb_uid = (payload_for_uid or {}).get("user_id") if fb_uid: request.session["user_id"] = fb_uid request.session.setdefault("permission", 1) else: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: payload = json.loads(request.body.decode("utf-8")) except json.JSONDecodeError: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) # 支持单项或批量入库 items = payload.get("items") if not items: # 兼容旧版本单项入库 items = [{ "data": payload.get("data"), "image": payload.get("image") }] success_count = 0 errors = [] for item in items: edited = item.get("data") or {} image_rel = item.get("image") or "" if not isinstance(edited, dict) or not edited: errors.append("数据项不能为空") continue ensure_type_in_list(edited.get("数据类型")) image_ref_to_store = "" temp_files_to_delete = [] image_rels = _parse_image_refs(image_rel) if image_rels: images_dir = os.path.join(settings.MEDIA_ROOT, "images") os.makedirs(images_dir, exist_ok=True) image_refs = [] for rel in image_rels: src_abs = os.path.join(settings.MEDIA_ROOT, rel) if not os.path.isfile(src_abs): errors.append(f"图片文件 {rel} 不存在") continue webp_name = f"{uuid.uuid4().hex}.webp" webp_abs = os.path.join(images_dir, webp_name) try: with Image.open(src_abs) as im: if im.mode in ("RGBA", "LA", "P"): im = im.convert("RGBA") else: im = im.convert("RGB") im.save(webp_abs, format="WEBP", quality=80) except Exception: errors.append(f"图片 {rel} 转换WEBP失败") continue try: object_name = f"images/{webp_name}" from minio_storage.minio_connect import upload_file, is_minio_configured if is_minio_configured(): upload_file(webp_abs, object_name, content_type="image/webp") image_refs.append(f"minio:{object_name}") temp_files_to_delete.extend([src_abs, webp_abs]) else: image_refs.append(f"local:{object_name}") temp_files_to_delete.append(src_abs) except Exception as e: errors.append(f"存储图片 {rel} 失败: {e}") continue if len(image_refs) == 1: image_ref_to_store = image_refs[0] elif len(image_refs) > 1: image_ref_to_store = json_to_string(image_refs) to_store = { "writer_id": str(request.session.get("user_id")), "data": json_to_string(edited), "image": image_ref_to_store, } ok = insert_data(to_store) if ok: success_count += 1 # 清理临时文件 for p in temp_files_to_delete: if p and os.path.isfile(p): try: os.remove(p) except: pass else: errors.append("写入ES失败") if success_count > 0: msg = f"成功录入 {success_count} 条数据" if errors: msg += f" (遇到 {len(errors)} 个错误)" return JsonResponse({"status": "success", "message": msg}) else: return JsonResponse({"status": "error", "message": "录入失败: " + "; ".join(errors[:3])}, status=500) @require_http_methods(["GET"]) @ensure_csrf_cookie def manage_page(request): session_user_id = request.session.get("user_id") if session_user_id is None: from django.shortcuts import redirect return redirect("/accounts/login/") me = get_user_by_id(session_user_id) or {} is_admin = int(request.session.get("permission", 1)) == 0 has_manage_key = bool(me.get("manage_key") or []) if is_admin: raw_results = search_all() else: uid = str(session_user_id) manage_keys = me.get("manage_key", []) or [] all_data = search_all() raw_results = [] for r in all_data: # 1. 自己的提交 if str(r.get("writer_id", "")) == uid: raw_results.append(r) continue # 2. 管理的提交 (检查 data 中是否包含 manage_key) if manage_keys: r_data = str(r.get("data", "")) for mk in manage_keys: if mk and str(mk) in r_data: raw_results.append(r) break return render( request, "elastic/manage.html", { "is_admin": is_admin, "has_manage_key": has_manage_key, "user_id": session_user_id, "username": me.get("username"), }, ) # 规范化键,避免模板点号访问下划线前缀字段 results = [] for r in raw_results: results.append({ "id": r.get("_id", ""), "writer_id": r.get("writer_id", ""), "image": r.get("image", ""), "data": r.get("data", ""), }) user_id_qs = request.GET.get("user_id") context = {"items": results, "user_id": user_id_qs or session_user_id} return render(request, "elastic/manage.html", context) @require_http_methods(["GET"]) def analytics_trend_view(request): try: gte = request.GET.get("from") lte = request.GET.get("to") interval = request.GET.get("interval", "day") data = es_analytics_trend(gte=gte, lte=lte, interval=interval) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def analytics_types_view(request): try: gte = request.GET.get("from") lte = request.GET.get("to") size = request.GET.get("size") try: size_int = int(size) if size is not None else 10 except Exception: size_int = 10 data = es_analytics_types(gte=gte, lte=lte, limit=size_int) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def analytics_types_trend_view(request): try: gte = request.GET.get("from") lte = request.GET.get("to") interval = request.GET.get("interval", "week") size = request.GET.get("size") try: size_int = int(size) if size is not None else 8 except Exception: size_int = 8 data = es_analytics_types_trend(gte=gte, lte=lte, interval=interval, size=size_int) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["GET"]) def analytics_recent_view(request): try: limit = request.GET.get("limit") gte = request.GET.get("from") lte = request.GET.get("to") try: limit_int = int(limit) if limit is not None else 10 except Exception: limit_int = 10 data = es_analytics_recent(limit=limit_int, gte=gte, lte=lte) return JsonResponse({"status": "success", "data": data}) except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_protect def remove_key_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) is_admin = int(request.session.get("permission", 1)) == 0 try: payload = json.loads(request.body.decode("utf-8")) key_to_remove = payload.get("key") if not key_to_remove: return JsonResponse({"status": "error", "message": "缺少key参数"}, status=400) key_to_remove = str(key_to_remove).strip() if not key_to_remove: return JsonResponse({"status": "error", "message": "缺少key参数"}, status=400) if not is_admin: me = get_user_by_id(request.session.get("user_id")) or {} if not (me.get("manage_key") or []): return JsonResponse({"status": "error", "message": "无权限"}, status=403) allowed = {str(x).strip() for x in list(request.session.get("tutor_added_manage_keys") or []) if str(x).strip()} if key_to_remove not in allowed: return JsonResponse({"status": "error", "message": "无权限"}, status=403) from .es_connect import delete_key_globally ok, count = delete_key_globally(key_to_remove) if ok: if not is_admin: cur = [str(x).strip() for x in list(request.session.get("tutor_added_manage_keys") or []) if str(x).strip()] cur = [k for k in cur if k != key_to_remove] request.session["tutor_added_manage_keys"] = cur try: request.session.modified = True except Exception: pass return JsonResponse({"status": "success", "message": f"已成功全局删除 Key '{key_to_remove}',并同步清理了 {count} 个注册码。"}) else: return JsonResponse({"status": "error", "message": "删除失败"}, status=500) except json.JSONDecodeError: return HttpResponseBadRequest("Invalid JSON") except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) @require_http_methods(["POST"]) @csrf_protect def unallow_tutor_added_key_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) if int(request.session.get("permission", 1)) == 0: return JsonResponse({"status": "error", "message": "无权限"}, status=403) me = get_user_by_id(request.session.get("user_id")) or {} if not (me.get("manage_key") or []): return JsonResponse({"status": "error", "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) key_name = (payload.get("key") or "").strip() if not key_name: return JsonResponse({"status": "error", "message": "key不能为空"}, status=400) cur = list(request.session.get("tutor_added_manage_keys") or []) cur = [str(x).strip() for x in cur if str(x).strip()] if key_name not in cur: return JsonResponse({"status": "error", "message": "无权限"}, status=403) cur = [k for k in cur if k != key_name] request.session["tutor_added_manage_keys"] = cur try: request.session.modified = True except Exception: pass return JsonResponse({"status": "success"}) @require_http_methods(["GET"]) @ensure_csrf_cookie def user_manage(request): session_user_id = request.session.get("user_id") if session_user_id is None: from django.shortcuts import redirect return redirect("/accounts/login/") is_admin = int(request.session.get("permission", 1)) == 0 me = get_user_by_id(session_user_id) or {} has_manage = bool(me.get("manage_key")) manage_keys = list(me.get("manage_key") or []) raw_my_keys = me.get("key") or [] if isinstance(raw_my_keys, list): my_keys = raw_my_keys elif isinstance(raw_my_keys, str): my_keys = re.split(r"[,,;;、\r\n]+", raw_my_keys) else: my_keys = [raw_my_keys] my_keys = [str(x).strip() for x in my_keys if str(x).strip()] user_id_qs = request.GET.get("user_id") context = { "user_id": user_id_qs or session_user_id, "username": me.get("username"), "is_admin": is_admin, "is_tutor": (not is_admin) and has_manage, "is_student": (not is_admin) and (not has_manage), "manage_keys_json": json.dumps(manage_keys, ensure_ascii=False), "my_keys_json": json.dumps(my_keys, ensure_ascii=False), } return render(request, "elastic/users.html", context) @require_http_methods(["GET"]) @ensure_csrf_cookie def registration_code_manage_page(request): session_user_id = request.session.get("user_id") if session_user_id is None: from django.shortcuts import redirect return redirect("/accounts/login/") is_admin = int(request.session.get("permission", 1)) == 0 me = get_user_by_id(session_user_id) or {} has_manage = bool(me.get("manage_key")) can_manage_reg = int(me.get("can_manage_registration_codes") or 0) == 1 if (not is_admin) and (not has_manage) and (not can_manage_reg): from django.shortcuts import redirect return redirect("/main/home/") user_id_qs = request.GET.get("user_id") raw_my_keys = me.get("key") or [] if isinstance(raw_my_keys, list): my_keys = raw_my_keys elif isinstance(raw_my_keys, str): my_keys = re.split(r"[,,;;、\r\n]+", raw_my_keys) else: my_keys = [raw_my_keys] my_keys = [str(x).strip() for x in my_keys if str(x).strip()] if can_manage_reg and (not has_manage) and (not is_admin): allowed_manage_keys = [str(x).strip() for x in list(me.get("registration_manage_keys") or []) if str(x).strip()] else: allowed_manage_keys = list(request.session.get("tutor_added_manage_keys") or []) allowed_manage_keys = [str(x).strip() for x in allowed_manage_keys if str(x).strip()] context = { "user_id": user_id_qs or session_user_id, "username": me.get("username"), "is_admin": is_admin, "has_manage_key": has_manage, "can_manage_registration_codes": can_manage_reg, "my_keys_json": json.dumps(my_keys, ensure_ascii=False), "manage_keys_json": json.dumps(list(me.get("manage_key") or []), ensure_ascii=False), "allowed_manage_keys_json": json.dumps(allowed_manage_keys, ensure_ascii=False), } return render(request, "elastic/registration_codes.html", context) @require_http_methods(["GET"]) def get_keys_list_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) is_admin = int(request.session.get("permission", 1)) == 0 if not is_admin: me = get_user_by_id(request.session.get("user_id")) or {} has_manage = bool(me.get("manage_key") or []) can_manage_reg = int(me.get("can_manage_registration_codes") or 0) == 1 if (not has_manage) and (not can_manage_reg): return JsonResponse({"status": "error", "message": "无权限"}, status=403) if can_manage_reg and (not has_manage): lst = [str(x).strip() for x in list(me.get("registration_manage_keys") or []) if str(x).strip()] return JsonResponse({"status": "success", "data": lst}) lst = get_keys_list() return JsonResponse({"status": "success", "data": lst}) @require_http_methods(["POST"]) @csrf_protect def add_key_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) is_admin = int(request.session.get("permission", 1)) == 0 if not is_admin: me = get_user_by_id(request.session.get("user_id")) or {} has_manage = bool(me.get("manage_key") or []) can_manage_reg = int(me.get("can_manage_registration_codes") or 0) == 1 if (not has_manage) and (not can_manage_reg): return JsonResponse({"status": "error", "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) key_name = (payload.get("key") or "").strip() if not key_name: return JsonResponse({"status": "error", "message": "key不能为空"}, status=400) cur_global = set(get_keys_list() or []) if key_name in cur_global: return JsonResponse({"status": "error", "message": "key已存在"}, status=409) ok = ensure_key_in_list(key_name) if not ok: return JsonResponse({"status": "error", "message": "写入失败"}, status=500) if not is_admin: uid = request.session.get("user_id") if has_manage: cur = list(request.session.get("tutor_added_manage_keys") or []) cur = [str(x).strip() for x in cur if str(x).strip()] if key_name not in cur: cur.append(key_name) request.session["tutor_added_manage_keys"] = cur try: request.session.modified = True except Exception: pass cur_manage = [str(x).strip() for x in list((me or {}).get("manage_key") or []) if str(x).strip()] if key_name not in cur_manage: cur_manage.append(key_name) es_update_user_by_id(uid, manage_key=cur_manage) elif can_manage_reg: cur = [str(x).strip() for x in list((me or {}).get("registration_manage_keys") or []) if str(x).strip()] if key_name not in cur: cur.append(key_name) es_update_user_by_id(uid, registration_manage_keys=cur) return JsonResponse({"status": "success"}) @require_http_methods(["POST"]) @csrf_protect def generate_registration_code_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) uid = request.session.get("user_id") is_admin = int(request.session.get("permission", 1)) == 0 if not is_admin: me = get_user_by_id(uid) or {} has_manage = bool(me.get("manage_key") or []) can_manage_reg = int(me.get("can_manage_registration_codes") or 0) == 1 if (not has_manage) and (not can_manage_reg): return JsonResponse({"status": "error", "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) keys = list(payload.get("keys") or []) manage_keys = list(payload.get("manage_keys") or []) if not is_admin: if has_manage: my_keys = [] try: my_keys = [str(x).strip() for x in list((me or {}).get("key") or []) if str(x).strip()] except Exception: my_keys = [] allowed_manage = set() try: allowed_manage = {str(x).strip() for x in list(request.session.get("tutor_added_manage_keys") or []) if str(x).strip()} except Exception: allowed_manage = set() normalized_keys = [] seen = set() for v in list(keys or []): s = str(v).strip() if not s or s in seen: continue seen.add(s) normalized_keys.append(s) missing = [k for k in my_keys if k not in seen] if missing: return JsonResponse( {"status": "error", "message": f"必须选择导师原有的 key:{'、'.join(missing)}"}, status=400, ) keys = normalized_keys clean_manage = [] manage_seen = set() for v in list(manage_keys or []): s = str(v).strip() if not s or s in manage_seen: continue if s not in allowed_manage: return JsonResponse({"status": "error", "message": "无权限"}, status=403) manage_seen.add(s) clean_manage.append(s) manage_keys = clean_manage else: allowed = {str(x).strip() for x in list((me or {}).get("registration_manage_keys") or []) if str(x).strip()} norm_keys = [] seen = set() for v in list(keys or []): s = str(v).strip() if not s or s in seen: continue if s not in allowed: return JsonResponse({"status": "error", "message": "无权限"}, status=403) seen.add(s) norm_keys.append(s) if not norm_keys: return JsonResponse({"status": "error", "message": "至少选择一个 key"}, status=400) keys = norm_keys clean_manage = [] manage_seen = set() for v in list(manage_keys or []): s = str(v).strip() if not s or s in manage_seen: continue if s not in allowed: return JsonResponse({"status": "error", "message": "无权限"}, status=403) manage_seen.add(s) clean_manage.append(s) manage_keys = clean_manage try: days = int(payload.get("expires_in_days", 30)) except Exception: days = 30 result = generate_registration_code(keys=keys, manage_keys=manage_keys, expires_in_days=days, created_by=uid) if not result: return JsonResponse({"status": "error", "message": "生成失败"}, status=500) return JsonResponse({"status": "success", "data": result}) @require_http_methods(["GET"]) def list_registration_codes_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) uid = request.session.get("user_id") is_admin = int(request.session.get("permission", 1)) == 0 if not is_admin: me = get_user_by_id(uid) or {} if (not (me.get("manage_key") or [])) and (int(me.get("can_manage_registration_codes") or 0) != 1): return JsonResponse({"status": "error", "message": "无权限"}, status=403) data = list_registration_codes() if not is_admin: data = [it for it in (data or []) if str(it.get("created_by")) == str(uid)] return JsonResponse({"status": "success", "data": data}) @require_http_methods(["GET"]) def keys_for_filter_view(request): uid = request.session.get("user_id") if uid is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) is_admin = int(request.session.get("permission", 1)) == 0 try: users = get_all_users() or [] except Exception: users = [] def norm(v): return str(v).strip().strip(";") def add(out, seen, v): s = norm(v) if not s or s in seen: return seen.add(s) out.append(s) if is_admin: out = [] seen = set() for v in get_keys_list() or []: add(out, seen, v) for u in users: for v in list(u.get("manage_key") or []) + list(u.get("key") or []): add(out, seen, v) return JsonResponse({"status": "success", "data": out}) me = get_user_by_id(uid) or {} mgr_keys = {norm(x) for x in (me.get("manage_key") or []) if norm(x)} visible_users = [] if mgr_keys: for u in users: try: ukeys = {norm(x) for x in (u.get("key") or []) if norm(x)} except Exception: ukeys = set() if ukeys & mgr_keys: visible_users.append(u) if str(me.get("user_id")) not in {str(u.get("user_id")) for u in visible_users}: visible_users.append(me) else: visible_users = [me] out = [] seen = set() for v in list(me.get("manage_key") or []) + list(me.get("key") or []): add(out, seen, v) for u in visible_users: for v in list(u.get("manage_key") or []) + list(u.get("key") or []): add(out, seen, v) return JsonResponse({"status": "success", "data": out}) def _extract_type_from_data(value): s = value if s is None: return "" if not isinstance(s, str): try: s = json.dumps(s, ensure_ascii=False) except Exception: s = str(s) s = str(s) try: obj = json.loads(s) if isinstance(obj, dict): t = obj.get("数据类型") if isinstance(t, str) and t.strip(): return t.strip() except Exception: pass try: m = re.search(r'"数据类型"\s*:\s*"([^"]+)"', s) if m: return str(m.group(1)).strip() except Exception: pass return "" @require_http_methods(["GET"]) def types_for_filter_view(request): uid = request.session.get("user_id") if uid is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) try: types = [str(t).strip() for t in (get_type_list() or []) if str(t).strip()] except Exception: types = [] seen = set() out = [] for t in types: if t in seen: continue seen.add(t) out.append(t) return JsonResponse({"status": "success", "data": out}) @require_http_methods(["GET"]) def filter_view(request): session_user_id = request.session.get("user_id") if session_user_id is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) key = (request.GET.get("key") or "").strip() typ = (request.GET.get("type") or "").strip() results = search_all() results = _filter_results_for_user(request, results) filtered = list(results or []) if key: selected = str(key).strip() try: users = get_all_users() or [] except Exception: users = [] writer_keys_by_id = {} for u in users: try: u_id = str(u.get("user_id", "")).strip() except Exception: u_id = "" if not u_id: continue try: u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()} except Exception: u_keys = set() writer_keys_by_id[u_id] = u_keys tmp = [] for r in filtered: writer_id = str(r.get("writer_id", "")).strip() writer_keys = writer_keys_by_id.get(writer_id) if writer_keys and selected in writer_keys: tmp.append(r) continue if selected and selected in str(r.get("data", "")): tmp.append(r) filtered = tmp if typ: tsel = str(typ).strip() filtered = [r for r in filtered if _extract_type_from_data(r.get("data")) == tsel] data = _attach_writer_names(_attach_image_urls(request, filtered)) return JsonResponse({"status": "success", "data": data}) def _parse_dt(value): if not value: return None if hasattr(value, "isoformat"): try: dt = value if getattr(dt, "tzinfo", None) is None: dt = dt.replace(tzinfo=timezone.utc) return dt except Exception: pass s = str(value).strip() if not s: return None if s.endswith("Z"): s = s[:-1] + "+00:00" try: dt = datetime.fromisoformat(s) if getattr(dt, "tzinfo", None) is None: dt = dt.replace(tzinfo=timezone.utc) return dt except Exception: return None def _floor_dt(dt, interval: str): if not dt: return None iv = str(interval or "day").strip().lower() if iv == "month": return dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0) if iv == "week": base = dt.replace(hour=0, minute=0, second=0, microsecond=0) return base - timedelta(days=base.weekday()) return dt.replace(hour=0, minute=0, second=0, microsecond=0) def _dt_label(dt, interval: str): if not dt: return "" iv = str(interval or "day").strip().lower() if iv == "month": return dt.strftime("%Y-%m") return dt.strftime("%Y-%m-%d") @require_http_methods(["GET"]) def report_view(request): try: session_user_id = request.session.get("user_id") if session_user_id is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) is_admin = int(request.session.get("permission", 1)) == 0 me = get_user_by_id(session_user_id) or {} has_manage_key = bool(me.get("manage_key") or []) if (not is_admin) and (not has_manage_key): return JsonResponse({"status": "error", "message": "无权限"}, status=403) gte = (request.GET.get("from") or "").strip() lte = (request.GET.get("to") or "").strip() interval = (request.GET.get("interval") or "day").strip() key = (request.GET.get("key") or "").strip() typ = (request.GET.get("type") or "").strip() gte_dt = _parse_dt(gte) if gte else None lte_dt = _parse_dt(lte) if lte else None results = search_all() results = _filter_results_for_user(request, results) filtered = list(results or []) if key: selected = str(key).strip() try: users = get_all_users() or [] except Exception: users = [] writer_keys_by_id = {} for u in users: try: u_id = str(u.get("user_id", "")).strip() except Exception: u_id = "" if not u_id: continue try: u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()} except Exception: u_keys = set() writer_keys_by_id[u_id] = u_keys tmp = [] for r in filtered: writer_id = str(r.get("writer_id", "")).strip() writer_keys = writer_keys_by_id.get(writer_id) if writer_keys and selected in writer_keys: tmp.append(r) continue if selected and selected in str(r.get("data", "")): tmp.append(r) filtered = tmp if typ: tsel = str(typ).strip() filtered = [r for r in filtered if _extract_type_from_data(r.get("data")) == tsel] ranged = [] for r in filtered: t = _parse_dt(r.get("time")) if (gte_dt or lte_dt) and (t is None): continue if gte_dt and t and t < gte_dt: continue if lte_dt and t and t > lte_dt: continue rr = dict(r) rr["_time_dt"] = t ranged.append(rr) by_type = {} by_bucket = {} for r in ranged: tval = _extract_type_from_data(r.get("data")) if tval: by_type[tval] = by_type.get(tval, 0) + 1 bucket_dt = _floor_dt(r.get("_time_dt"), interval) if bucket_dt: label = _dt_label(bucket_dt, interval) if label: by_bucket[label] = by_bucket.get(label, 0) + 1 by_type_arr = [{"type": k, "count": int(v)} for k, v in sorted(by_type.items(), key=lambda x: (-x[1], x[0]))] by_time_arr = [{"bucket": k, "count": int(v)} for k, v in sorted(by_bucket.items(), key=lambda x: x[0])] return JsonResponse( { "status": "success", "data": { "generated_at": datetime.now(timezone.utc).isoformat(), "range": {"from": gte or "", "to": lte or ""}, "filters": {"key": key or "", "type": typ or "", "interval": interval or "day"}, "total": len(ranged), "by_type": by_type_arr, "by_time": by_time_arr, }, } ) except Exception as e: return JsonResponse({"status": "error", "message": str(e) or "生成失败"}, status=500) @require_http_methods(["GET"]) def report_csv_view(request): resp = report_view(request) if getattr(resp, "status_code", 200) != 200: return resp try: payload = json.loads(resp.content.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "生成失败"}, status=500) data = (payload or {}).get("data") or {} buf = io.StringIO() w = csv.writer(buf) w.writerow(["统计报表"]) w.writerow(["生成时间", data.get("generated_at", "")]) rng = data.get("range") or {} w.writerow(["时间范围", f"{rng.get('from','')} ~ {rng.get('to','')}"]) flt = data.get("filters") or {} w.writerow(["筛查Key", flt.get("key", "")]) w.writerow(["筛查类型", flt.get("type", "")]) w.writerow(["时间粒度", flt.get("interval", "")]) w.writerow([]) w.writerow(["总数", data.get("total", 0)]) w.writerow([]) w.writerow(["按成果类型统计"]) w.writerow(["类型", "数量"]) for it in list(data.get("by_type") or []): w.writerow([it.get("type", ""), it.get("count", 0)]) w.writerow([]) w.writerow(["按时间统计"]) w.writerow(["时间", "数量"]) for it in list(data.get("by_time") or []): w.writerow([it.get("bucket", ""), it.get("count", 0)]) content = buf.getvalue() out = HttpResponse(content, content_type="text/csv; charset=utf-8") out["Content-Disposition"] = 'attachment; filename="report.csv"' return out @require_http_methods(["GET"]) def export_achievements_csv(request): """一键导出所有可见成果为 Excel (如果支持) 或 CSV""" try: session_user_id = request.session.get("user_id") if session_user_id is None: return HttpResponse("Unauthorized", status=401) # 1. 获取所有数据 results = search_all() # 2. 根据权限过滤 results = _filter_results_for_user(request, results) # 3. 补充录入人姓名 results = _attach_writer_names(results) if not results: return HttpResponse("No data to export", status=404) # 4. 解析数据并准备数据列表 parsed_data_list = [] all_data_keys = set() for item in results: raw_data = item.get("data", "{}") try: if isinstance(raw_data, str): parsed_dict = json.loads(raw_data) else: parsed_dict = raw_data except Exception: parsed_dict = {"原始数据": str(raw_data)} if not isinstance(parsed_dict, dict): parsed_dict = {"数据内容": str(parsed_dict)} # 展平基础字段和动态数据字段 flat_item = { "ID": item.get("_id", ""), "录入人": item.get("writer_name") or item.get("writer_id", ""), "时间": format_datetime_for_export(item.get("time")), } # 清理动态字段中的换行符 clean_parsed_dict = {} for k, v in parsed_dict.items(): if isinstance(v, str): clean_parsed_dict[k] = v.replace('\r', '').replace('\n', ' ') else: clean_parsed_dict[k] = v flat_item.update(clean_parsed_dict) # 保存原始图片引用以便导出 Excel 时使用 flat_item["_image_refs"] = _parse_image_refs(item.get("image", "")) parsed_data_list.append(flat_item) all_data_keys.update(clean_parsed_dict.keys()) # 确定表头:基础字段 + 动态字段(按字母排序) dynamic_headers = sorted(list(all_data_keys)) headers = ["ID", "录入人", "时间"] + dynamic_headers # 如果是 Excel 且支持图片,添加图片列 if HAS_OPENPYXL: headers.append("成果图片") filename_base = f"achievements_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # 5. 如果安装了 openpyxl,生成 Excel if HAS_OPENPYXL: wb = openpyxl.Workbook() ws = wb.active ws.title = "成果数据" # 写入表头 for col_num, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col_num, value=header) cell.font = openpyxl.styles.Font(bold=True) cell.alignment = openpyxl.styles.Alignment(horizontal='center', vertical='center') # 写入数据 img_col_index = headers.index("成果图片") + 1 if "成果图片" in headers else None for row_num, row_data in enumerate(parsed_data_list, 2): for col_num, header in enumerate(headers, 1): if header == "成果图片": continue # 图片单独处理 ws.cell(row=row_num, column=col_num, value=row_data.get(header, "")) # 处理图片插入 if img_col_index and row_data.get("_image_refs"): first_ref = row_data["_image_refs"][0] img_bytes = _get_image_bytes(first_ref) if img_bytes: try: img = XLImage(img_bytes) # 调整图片大小以适应单元格 (假设高度 80 像素左右) aspect_ratio = img.width / img.height img.height = 80 img.width = 80 * aspect_ratio # 计算插入位置 cell_address = f"{get_column_letter(img_col_index)}{row_num}" ws.add_image(img, cell_address) # 设置行高以容纳图片 (80 像素约为 60 磅) ws.row_dimensions[row_num].height = 65 except Exception as e: ws.cell(row=row_num, column=img_col_index, value=f"图片加载失败: {str(e)}") # 自动调整列宽 for i, column_cells in enumerate(ws.columns, 1): header = headers[i-1] if header == "成果图片": ws.column_dimensions[get_column_letter(i)].width = 20 continue length = max(len(str(cell.value or "")) for cell in column_cells) ws.column_dimensions[get_column_letter(i)].width = min(length + 2, 50) response = HttpResponse( content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', ) response['Content-Disposition'] = f'attachment; filename="{filename_base}.xlsx"' wb.save(response) return response # 6. 否则回退到 CSV output = io.StringIO() output.write('\ufeff') # UTF-8 BOM # 增加 extrasaction='ignore' 以忽略 _image_refs 等内部辅助字段 writer = csv.DictWriter(output, fieldnames=headers, extrasaction='ignore') writer.writeheader() for row in parsed_data_list: writer.writerow(row) response = HttpResponse(output.getvalue(), content_type='text/csv; charset=utf-8') response['Content-Disposition'] = f'attachment; filename="{filename_base}.csv"' return response except Exception as e: import traceback traceback.print_exc() return HttpResponse(f"导出失败: {str(e)}", status=500) def format_datetime_for_export(t): if not t: return "" try: if isinstance(t, datetime): return t.strftime("%Y-%m-%d %H:%M:%S") d = datetime.fromisoformat(str(t).replace('Z', '+00:00')) return d.strftime("%Y-%m-%d %H:%M:%S") except Exception: return str(t) def _get_image_bytes(image_ref): """根据 image_ref 获取图片字节流,并确保转换为 Excel 支持的格式 (如 JPEG/PNG)""" s = str(image_ref or '').strip() if not s: return None img_raw_bytes = None if s.startswith('minio:'): object_name = s[len('minio:'):].lstrip('/') try: from minio_storage.minio_connect import get_minio_client, get_bucket_name client = get_minio_client() bucket = get_bucket_name() if client: response = client.get_object(bucket, object_name) img_raw_bytes = response.read() except Exception: pass elif s.startswith('local:'): rel_path = s[len('local:'):].lstrip('/') abs_path = os.path.join(settings.MEDIA_ROOT, rel_path) if os.path.isfile(abs_path): try: with open(abs_path, 'rb') as f: img_raw_bytes = f.read() except Exception: pass if not img_raw_bytes: return None # 处理 WebP 或其他 openpyxl 可能不支持的格式 try: from PIL import Image as PILImage img_io = io.BytesIO(img_raw_bytes) with PILImage.open(img_io) as pil_img: # 如果是 WebP 或带有透明通道的图片,转换为 RGB 格式并存为 JPEG 或 PNG # Excel 对 PNG 支持较好 output_io = io.BytesIO() if pil_img.format == 'WEBP' or pil_img.mode in ('RGBA', 'LA', 'P'): rgb_img = pil_img.convert('RGB') rgb_img.save(output_io, format='JPEG', quality=85) else: pil_img.save(output_io, format=pil_img.format or 'JPEG') output_io.seek(0) return output_io except Exception as e: print(f"图片转换失败: {str(e)}") return io.BytesIO(img_raw_bytes) # 尝试直接返回原始数据作为最后手段 @require_http_methods(["POST"]) @csrf_protect def revoke_registration_code_view(request): if request.session.get("user_id") is None: return JsonResponse({"status": "error", "message": "未登录"}, status=401) uid = request.session.get("user_id") is_admin = int(request.session.get("permission", 1)) == 0 if not is_admin: me = get_user_by_id(uid) or {} if (not (me.get("manage_key") or [])) and (int(me.get("can_manage_registration_codes") or 0) != 1): return JsonResponse({"status": "error", "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except Exception: return JsonResponse({"status": "error", "message": "JSON无效"}, status=400) code = (payload.get("code") or "").strip() if not code: return JsonResponse({"status": "error", "message": "缺少code"}, status=400) if not is_admin: info = get_registration_code(code) if not info or str(info.get("created_by")) != str(uid): return JsonResponse({"status": "error", "message": "无权限"}, status=403) ok = revoke_registration_code(code) if not ok: return JsonResponse({"status": "error", "message": "作废失败"}, status=500) return JsonResponse({"status": "success"})