Files
Achievement_Inputing/elastic/views.py
DSQ 9e3fe7150b
All checks were successful
CI / docker-ci (push) Successful in 26s
[0.2.7.8][ci]
2026-03-23 11:02:00 +08:00

1754 lines
67 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ES相关的API视图
"""
import os
import re
import uuid
import base64
import json
import csv
import io
from datetime import datetime, timezone, timedelta
import tempfile
import concurrent.futures
from django.conf import settings
from django.http import JsonResponse, HttpResponse
from django.shortcuts import render
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import ensure_csrf_cookie
from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie, csrf_protect
from .es_connect import *
from .es_connect import update_user_by_id as es_update_user_by_id, delete_user_by_id as es_delete_user_by_id
from .es_connect import (
analytics_trend as es_analytics_trend,
analytics_types as es_analytics_types,
analytics_types_trend as es_analytics_types_trend,
analytics_recent as es_analytics_recent,
)
from PIL import Image
try:
import fitz # PyMuPDF
HAS_PDF_SUPPORT = True
PDF_ERROR = ""
except ImportError as e:
try:
import pymupdf
fitz = pymupdf
HAS_PDF_SUPPORT = True
PDF_ERROR = ""
except ImportError:
HAS_PDF_SUPPORT = False
PDF_ERROR = str(e)
def _filter_results_for_user(request, results):
session_user_id = request.session.get("user_id")
if session_user_id is None:
return []
me = get_user_by_id(session_user_id) or {}
is_admin = int(request.session.get("permission", 1)) == 0
if is_admin:
return results
uid = str(session_user_id)
manage_keys = me.get("manage_key", []) or []
manage_keys_set = {str(k).strip() for k in manage_keys if str(k).strip()}
writer_keys_by_id = None
if manage_keys_set:
try:
users = get_all_users() or []
except Exception:
users = []
writer_keys_by_id = {}
for u in users:
try:
u_id = str(u.get("user_id", "")).strip()
except Exception:
u_id = ""
if not u_id:
continue
try:
u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()}
except Exception:
u_keys = set()
writer_keys_by_id[u_id] = u_keys
filtered = []
for r in results:
# 1. 自己的提交
if str(r.get("writer_id", "")) == uid:
filtered.append(r)
continue
# 2. 管理的提交:优先按“作者的 key 与我的 manage_key 交集”判断;缺失时回退为 data 字符串包含判断
if manage_keys_set:
writer_id = str(r.get("writer_id", "")).strip()
writer_keys = (writer_keys_by_id or {}).get(writer_id)
if writer_keys and (writer_keys & manage_keys_set):
filtered.append(r)
continue
r_data = str(r.get("data", ""))
for mk in manage_keys_set:
if mk and mk in r_data:
filtered.append(r)
break
return filtered
def _image_ref_to_url(request, image_ref: str) -> str:
s = str(image_ref or '').strip()
if not s:
return ''
if s.startswith('minio:'):
object_name = s[len('minio:'):].lstrip('/')
if not object_name:
return ''
try:
from minio_storage.minio_connect import presigned_get_url
return presigned_get_url(object_name, expires_seconds=8 * 60 * 60)
except Exception:
return ''
if s.startswith('local:'):
rel_path = s[len('local:'):].lstrip('/')
if not rel_path:
return ''
return request.build_absolute_uri(settings.MEDIA_URL + rel_path)
return ''
def _parse_image_refs(image_ref):
if not image_ref:
return []
if isinstance(image_ref, (list, tuple)):
return [str(x) for x in image_ref if str(x).strip()]
if isinstance(image_ref, str):
s = image_ref.strip()
if not s:
return []
parsed = None
if s[:1] in ('[', '"'):
try:
parsed = json.loads(s)
except Exception:
parsed = None
if isinstance(parsed, list):
return [str(x) for x in parsed if str(x).strip()]
if isinstance(parsed, str):
s = parsed.strip()
return [s] if s else []
return []
def _attach_image_urls(request, items):
out = []
for it in list(items or []):
try:
d = dict(it or {})
except Exception:
continue
refs = _parse_image_refs(d.get('image', ''))
urls = [_image_ref_to_url(request, r) for r in refs if str(r).strip()]
urls = [u for u in urls if u]
d['image_urls'] = urls
d['image_url'] = urls[0] if urls else _image_ref_to_url(request, d.get('image', ''))
out.append(d)
return out
def _attach_writer_names(items):
users = get_all_users()
name_by_id = {str(u.get("user_id")): u.get("username") for u in (users or [])}
out = []
for it in list(items or []):
try:
d = dict(it or {})
except Exception:
continue
wid = d.get("writer_id")
d["writer_name"] = name_by_id.get(str(wid), str(wid) if wid is not None else "")
out.append(d)
return out
@require_http_methods(["GET", "POST"])
@csrf_exempt
def init_index(request):
"""初始化ES索引"""
print("⚠️ init_index 被调用了!")
try:
create_index_with_mapping()
return JsonResponse({"status": "success", "message": "索引初始化成功"})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["POST"])
@csrf_exempt
def add_data(request):
"""添加数据到ES"""
try:
data = json.loads(request.body.decode('utf-8'))
success = insert_data(data)
if success:
return JsonResponse({"status": "success", "message": "数据添加成功"})
else:
return JsonResponse({"status": "error", "message": "数据添加失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def search(request):
"""搜索数据"""
try:
query = request.GET.get('q', '')
if not query:
return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400)
results = search_data(query)
results = _filter_results_for_user(request, results)
data = _attach_writer_names(_attach_image_urls(request, results))
return JsonResponse({"status": "success", "data": data})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def fuzzy_search(request):
"""模糊搜索"""
try:
keyword = request.GET.get('keyword', '')
if not keyword:
return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400)
results = search_by_any_field(keyword)
results = _filter_results_for_user(request, results)
data = _attach_writer_names(_attach_image_urls(request, results))
return JsonResponse({"status": "success", "data": data})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def get_all_data(request):
"""获取所有数据"""
try:
session_user_id = request.session.get("user_id")
if session_user_id is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
results = search_all()
results = _filter_results_for_user(request, results)
data = _attach_writer_names(_attach_image_urls(request, results))
return JsonResponse({"status": "success", "data": data})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def filter_by_key(request):
try:
session_user_id = request.session.get("user_id")
if session_user_id is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
key = (request.GET.get("key") or "").strip()
results = search_all()
results = _filter_results_for_user(request, results)
if not key:
data = _attach_writer_names(_attach_image_urls(request, results))
return JsonResponse({"status": "success", "data": data})
selected = str(key).strip()
try:
users = get_all_users() or []
except Exception:
users = []
writer_keys_by_id = {}
for u in users:
try:
u_id = str(u.get("user_id", "")).strip()
except Exception:
u_id = ""
if not u_id:
continue
try:
u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()}
except Exception:
u_keys = set()
writer_keys_by_id[u_id] = u_keys
filtered = []
for r in results:
writer_id = str(r.get("writer_id", "")).strip()
writer_keys = writer_keys_by_id.get(writer_id)
if writer_keys and selected in writer_keys:
filtered.append(r)
continue
if selected and selected in str(r.get("data", "")):
filtered.append(r)
data = _attach_writer_names(_attach_image_urls(request, filtered))
return JsonResponse({"status": "success", "data": data})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["DELETE"])
@csrf_exempt
def delete_data(request, doc_id):
"""删除数据(需登录;管理员或作者本人)"""
request_user=request.session.get("user_id")
# request_admin=request.session.get("permisssion")
if request_user is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
try:
existing = get_by_id(doc_id)
user_existing=get_user_by_id(request_user)
if not existing:
return JsonResponse({"status": "error", "message": "数据不存在"}, status=404)
is_admin = int(user_existing.get('permission')) == 0
is_owner = str(existing.get("writer_id", "")) == str(request.session.get("user_id"))
if not (is_admin or is_owner):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
success = delete_by_id(doc_id)
if success:
return JsonResponse({"status": "success", "message": "数据删除成功"})
else:
return JsonResponse({"status": "error", "message": "数据删除失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["PUT"])
@csrf_exempt
def update_data(request, doc_id):
"""更新数据(需登录;管理员或作者本人)"""
request_user = request.session.get("user_id")
if request_user is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
try:
payload = json.loads(request.body.decode('utf-8'))
except Exception:
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
try:
existing = get_by_id(doc_id)
user_existing = get_user_by_id(request_user)
if not existing:
return JsonResponse({"status": "error", "message": "数据不存在"}, status=404)
is_admin = int(user_existing.get('permission')) == 0
is_owner = str(existing.get("writer_id", "")) == str(request.session.get("user_id"))
if not (is_admin or is_owner):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
updated = {}
if "writer_id" in payload:
updated["writer_id"] = payload["writer_id"]
if "image" in payload:
img_val = payload["image"]
if isinstance(img_val, list):
updated["image"] = json_to_string(img_val)
else:
updated["image"] = img_val
if "data" in payload:
v = payload["data"]
if isinstance(v, dict):
updated["data"] = json.dumps(v, ensure_ascii=False)
else:
try:
obj = json.loads(str(v))
updated["data"] = json.dumps(obj, ensure_ascii=False)
except Exception:
updated["data"] = str(v)
success = update_by_id(doc_id, updated)
if success:
return JsonResponse({"status": "success", "message": "数据更新成功"})
else:
return JsonResponse({"status": "error", "message": "数据更新失败"}, status=500)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def get_data(request, doc_id):
"""获取单个数据"""
try:
result = get_by_id(doc_id)
if result:
wrapped = dict(result)
wrapped['image_url'] = _image_ref_to_url(request, wrapped.get('image', ''))
return JsonResponse({"status": "success", "data": wrapped})
else:
return JsonResponse({"status": "error", "message": "数据不存在"}, status=404)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["POST"])
@csrf_protect
def add_user(request):
if request.session.get("user_id") is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
if int(request.session.get("permission", 1)) != 0:
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
try:
payload = json.loads(request.body.decode("utf-8"))
except Exception:
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
username = (payload.get("username") or "").strip()
password = (payload.get("password") or "").strip()
try:
permission = int(payload.get("permission", 1))
except Exception:
permission = 1
if not username:
return JsonResponse({"status": "error", "message": "用户名不能为空"}, status=400)
if password and len(password) < 6:
return JsonResponse({"status": "error", "message": "密码长度至少为6位"}, status=400)
existing = get_user_by_username(username)
if existing:
return JsonResponse({"status": "error", "message": "用户名已存在"}, status=409)
users = get_all_users()
next_id = (max([int(u.get("user_id", 0)) for u in users]) + 1) if users else 1
ok = write_user_data({
"user_id": next_id,
"username": username,
"password": password,
"permission": permission,
})
if not ok:
return JsonResponse({"status": "error", "message": "用户添加失败"}, status=500)
return JsonResponse({"status": "success", "message": "用户添加成功"})
@require_http_methods(["GET"])
def get_users(request):
uid = request.session.get("user_id")
if uid is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
try:
is_admin = int(request.session.get("permission", 1)) == 0
requester = get_user_by_id(uid) or {}
mgr_keys = set(requester.get("manage_key") or [])
key_q = (request.GET.get("key") or "").strip()
q = (request.GET.get("search") or "").strip()
users = get_all_users()
if is_admin:
filtered = users
elif mgr_keys:
def match_manage(user):
ukeys = set(user.get("key") or [])
return bool(ukeys & mgr_keys)
filtered = [u for u in users if match_manage(u)]
else:
filtered = [u for u in users if str(u.get("user_id")) == str(uid)]
if key_q:
k = str(key_q).strip()
def match_key(user):
try:
ukeys = {str(x).strip() for x in (user.get("key") or []) if str(x).strip()}
except Exception:
ukeys = set()
try:
umkeys = {str(x).strip() for x in (user.get("manage_key") or []) if str(x).strip()}
except Exception:
umkeys = set()
return (k in ukeys) or (k in umkeys)
filtered = [u for u in filtered if match_key(u)]
if q:
filtered = [u for u in filtered if q in str(u.get("username", ""))]
return JsonResponse({"status": "success", "data": filtered})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["POST"])
@csrf_protect
def update_user_by_id_view(request, user_id):
uid = request.session.get("user_id")
if uid is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
try:
payload = json.loads(request.body.decode("utf-8"))
except Exception:
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
new_username = (payload.get("username") or "").strip()
new_permission = payload.get("permission")
new_password = (payload.get("password") or "").strip()
raw_keys = payload.get("key", None)
raw_manage_keys = payload.get("manage_key", None)
if new_password and len(new_password) < 6:
return JsonResponse({"status": "error", "message": "密码长度至少为6位"}, status=400)
is_admin = int(request.session.get("permission", 1)) == 0
requester = get_user_by_id(uid) or {}
target = get_user_by_id(user_id) or {}
requester_mgr = set(requester.get("manage_key") or [])
target_keys = set(target.get("key") or [])
def normalize_keys(v):
if v is None:
return None
if isinstance(v, list):
parts = v
elif isinstance(v, str):
parts = re.split(r"[,;;、\r\n]+", v)
else:
parts = [v]
out = []
seen = set()
for p in parts:
s = str(p).strip().strip(";")
if not s or s in seen:
continue
seen.add(s)
out.append(s)
return out
new_keys = normalize_keys(raw_keys)
new_manage_keys = normalize_keys(raw_manage_keys)
if is_admin:
if new_username:
other = get_user_by_username(new_username)
if other and int(other.get("user_id", -1)) != int(user_id):
return JsonResponse({"status": "error", "message": "用户名已存在"}, status=409)
ok = es_update_user_by_id(
user_id,
username=new_username if new_username else None,
permission=int(new_permission) if new_permission is not None else None,
password=new_password if new_password else None,
key=new_keys,
manage_key=new_manage_keys,
)
return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500)
if str(uid) == str(user_id):
if not new_password:
return JsonResponse({"status": "error", "message": "仅允许修改密码"}, status=400)
ok = es_update_user_by_id(user_id, password=new_password)
return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500)
if requester_mgr and (target_keys & requester_mgr):
if new_username or new_permission is not None or new_manage_keys is not None:
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
if not new_password and new_keys is None:
return JsonResponse({"status": "error", "message": "缺少更新内容"}, status=400)
merged_keys = None
if new_keys is not None:
try:
new_keys_set = set(new_keys)
except Exception:
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
requester_locked = set(normalize_keys(requester.get("key")) or [])
if new_keys_set & requester_locked:
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
existing_keys = [str(v).strip() for v in list(target.get("key") or []) if str(v).strip()]
preserved = [k for k in existing_keys if k in requester_locked]
merged_keys = preserved + [str(v).strip() for v in list(new_keys) if str(v).strip() and str(v).strip() not in requester_locked]
seen = set()
merged_keys = [k for k in merged_keys if not (k in seen or seen.add(k))]
ok = es_update_user_by_id(user_id, password=new_password if new_password else None, key=merged_keys)
return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500)
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
@require_http_methods(["POST"])
@csrf_protect
def delete_user_by_id_view(request, user_id):
if request.session.get("user_id") is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
if int(request.session.get("permission", 1)) != 0:
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
ok = es_delete_user_by_id(user_id)
if not ok:
return JsonResponse({"status": "error", "message": "用户删除失败"}, status=500)
return JsonResponse({"status": "success", "message": "用户删除成功"})
# 辅助JSON 转换
def json_to_string(obj):
if isinstance(obj, str):
# 如果已经是字符串,尝试解析一下验证是否为有效 JSON或者是普通文本
try:
json.loads(obj)
return obj # 是有效 JSON 字符串,直接返回
except Exception:
# 不是 JSON 字符串,可能是普通文本,将其封装成 JSON
return json.dumps(obj, ensure_ascii=False)
try:
return json.dumps(obj, ensure_ascii=False)
except Exception:
return str(obj)
def string_to_json(s):
try:
return json.loads(s)
except Exception:
return {}
# 移植自 a.py 的核心:调用大模型进行 OCR/信息抽取
def ocr_and_extract_info(image_path: str):
# from openai import OpenAI
def encode_image(path: str) -> str:
with open(path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
base64_image = encode_image(image_path)
# api_key = getattr(settings, "AISTUDIO_API_KEY", "188f57db3766e02ed2c7e18373996d84f4112272")
# base_url = getattr(settings, "OPENAI_BASE_URL", "https://aistudio.baidu.com/llm/lmapi/v3")
# if not api_key:
# raise RuntimeError("缺少 AISTUDIO_API_KEY请在环境变量或 settings 中配置")
# api_key = getattr(settings, "AISTUDIO_API_KEY", "")
# base_url = getattr(settings, "OPENAI_BASE_URL", "")
# if not api_key or not base_url:
# raise RuntimeError("缺少模型服务配置,请设置 AISTUDIO_API_KEY 与 OPENAI_BASE_URL")
# client = OpenAI(api_key=api_key, base_url=base_url)
# types = get_type_list()
# chat_completion = client.chat.completions.create(
# messages=[
# {"role": "system", "content": "你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。"},
# {
# "role": "user",
# "content": [
# {"type": "text", "text": f"请识别这张图片中的信息将你认为重要的数据转换为不包含嵌套的json不要显示其它信息以便于解析直接输出json结果即可。使用“数据类型”字段表示这个东西的大致类型除此之外你可以自行决定使用哪些json字段。“数据类型”的内容有严格规定请查看{json.dumps(types, ensure_ascii=False)}中是否包含你所需要的类型,确定不包含后你才可以填入你觉得合适的大致分类。"},
# {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}},
# ],
# },
# ],
# model="glm-5",
# )
# response_text = chat_completion.choices[0].message.content
from zai import ZhipuAiClient
import httpx
# api_key = (
# getattr(settings, "ZHIPU_API_KEY", "")
# or getattr(settings, "ZAI_API_KEY", "")
# or getattr(settings, "AISTUDIO_API_KEY", "")
# )
# if not api_key:
# raise RuntimeError("缺少模型服务配置,请设置 ZHIPU_API_KEY")
# base_url = (
# getattr(settings, "ZHIPU_BASE_URL", "")
# or getattr(settings, "ZAI_BASE_URL", "")
# or "https://open.bigmodel.cn/api/paas/v4/"
# )
client = ZhipuAiClient(api_key="fb83a3f91e8c4e45af811236548765a2.cX4kUhigHm7VNowf")
types = get_type_list()
chat_completion = client.chat.completions.create(
messages=[
{"role": "system", "content": "你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。"},
{
"role": "user",
"content": [
{"type": "text", "text": f"请识别这张图片中的信息将你认为重要的数据转换为不包含嵌套的json不要显示其它信息以便于解析直接输出json结果即可。使用“数据类型”字段表示这个东西的大致类型除此之外你可以自行决定使用哪些json字段。“数据类型”的内容有严格规定请查看{json.dumps(types, ensure_ascii=False)}中是否包含你所需要的类型,确定不包含后你才可以填入你觉得合适的大致分类。"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}},
],
},
],
model="glm-4.6v",
)
response_text = chat_completion.choices[0].message.content
def parse_response(text: str):
try:
result = json.loads(text)
if result:
return result
except json.JSONDecodeError:
pass
m = re.search(r"```json\n(.*?)```", text, re.DOTALL)
if m:
try:
result = json.loads(m.group(1))
if result:
return result
except json.JSONDecodeError:
pass
try:
fixed = text.replace("'", '"')
result = json.loads(fixed)
if result:
return result
except json.JSONDecodeError:
pass
return None
return parse_response(response_text)
@require_http_methods(["GET"])
@ensure_csrf_cookie
def upload_page(request):
session_user_id = request.session.get("user_id")
if session_user_id is None:
from django.shortcuts import redirect
return redirect("/accounts/login/")
user_id_qs = request.GET.get("user_id")
me = get_user_by_id(session_user_id) or {}
context = {
"user_id": user_id_qs or session_user_id,
"username": me.get("username"),
}
return render(request, "elastic/upload.html", context)
# 上传并识别(不入库)
@require_http_methods(["POST"])
def upload(request):
try:
if request.session.get("user_id") is None:
fallback_uid = request.POST.get("user_id") or request.GET.get("user_id")
if fallback_uid:
request.session["user_id"] = fallback_uid
request.session.setdefault("permission", 1)
else:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
files = request.FILES.getlist("file")
if not files:
one = request.FILES.get("file")
if one:
files = [one]
if not files:
return JsonResponse({"status": "error", "message": "未选择文件"}, status=400)
images_dir = os.path.join(settings.MEDIA_ROOT, "images")
os.makedirs(images_dir, exist_ok=True)
file_results = []
for f in files:
group_images = []
is_pdf = f.name.lower().endswith('.pdf')
if is_pdf:
if not HAS_PDF_SUPPORT:
return JsonResponse({"status": "error", "message": f"服务器未安装PDF处理组件(PyMuPDF): {PDF_ERROR}"}, status=500)
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp:
for chunk in f.chunks():
tmp.write(chunk)
tmp_path = tmp.name
try:
doc = fitz.open(tmp_path)
for i in range(len(doc)):
page = doc.load_page(i)
pix = page.get_pixmap(dpi=150)
img_filename = f"{uuid.uuid4()}_page_{i+1}.jpg"
img_abs_path = os.path.join(images_dir, img_filename)
pix.save(img_abs_path)
group_images.append((img_abs_path, img_filename))
doc.close()
except Exception as e:
return JsonResponse({"status": "error", "message": f"PDF {f.name} 转换失败: {str(e)}"}, status=500)
finally:
if os.path.exists(tmp_path):
os.remove(tmp_path)
else:
filename = f"{uuid.uuid4()}_{f.name}"
abs_path = os.path.join(images_dir, filename)
with open(abs_path, "wb") as dst:
for chunk in f.chunks():
dst.write(chunk)
group_images.append((abs_path, filename))
def run_ocr(img_info):
abs_p, fname = img_info
try:
data = ocr_and_extract_info(abs_p)
return data
except Exception:
return None
group_data_list = []
with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(group_images), 8)) as executor:
futures = [executor.submit(run_ocr, img_info) for img_info in group_images]
for future in concurrent.futures.as_completed(futures):
res = future.result()
if res:
group_data_list.append(res)
merged_group_data = {}
for item in group_data_list:
if not isinstance(item, dict):
continue
for k, v in item.items():
key = str(k).strip()
if not key:
continue
if key not in merged_group_data or merged_group_data.get(key) in (None, ''):
merged_group_data[key] = v
elif merged_group_data.get(key) != v:
base = key
idx = 2
while f"{base}_{idx}" in merged_group_data:
idx += 1
merged_group_data[f"{base}_{idx}"] = v
if not merged_group_data:
merged_group_data = {"文件名": f.name, "提示": "未识别到具体内容"}
rel_paths = [f"images/{img[1]}" for img in group_images]
image_urls = [request.build_absolute_uri(settings.MEDIA_URL + rp) for rp in rel_paths]
file_results.append({
"name": f.name,
"data": merged_group_data,
"images": rel_paths,
"image_urls": image_urls,
})
return JsonResponse({
"status": "success",
"message": f"成功处理 {len(file_results)} 个文件,请确认数据后点击录入",
"items": file_results,
})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e) or "上传失败"}, status=500)
# 确认并入库
@require_http_methods(["POST"])
def confirm(request):
if request.session.get("user_id") is None:
try:
payload_for_uid = json.loads(request.body.decode("utf-8"))
except Exception:
payload_for_uid = {}
fb_uid = (payload_for_uid or {}).get("user_id")
if fb_uid:
request.session["user_id"] = fb_uid
request.session.setdefault("permission", 1)
else:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
try:
payload = json.loads(request.body.decode("utf-8"))
except json.JSONDecodeError:
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
# 支持单项或批量入库
items = payload.get("items")
if not items:
# 兼容旧版本单项入库
items = [{
"data": payload.get("data"),
"image": payload.get("image")
}]
success_count = 0
errors = []
for item in items:
edited = item.get("data") or {}
image_rel = item.get("image") or ""
if not isinstance(edited, dict) or not edited:
errors.append("数据项不能为空")
continue
ensure_type_in_list(edited.get("数据类型"))
image_ref_to_store = ""
temp_files_to_delete = []
image_rels = _parse_image_refs(image_rel)
if image_rels:
images_dir = os.path.join(settings.MEDIA_ROOT, "images")
os.makedirs(images_dir, exist_ok=True)
image_refs = []
for rel in image_rels:
src_abs = os.path.join(settings.MEDIA_ROOT, rel)
if not os.path.isfile(src_abs):
errors.append(f"图片文件 {rel} 不存在")
continue
webp_name = f"{uuid.uuid4().hex}.webp"
webp_abs = os.path.join(images_dir, webp_name)
try:
with Image.open(src_abs) as im:
if im.mode in ("RGBA", "LA", "P"):
im = im.convert("RGBA")
else:
im = im.convert("RGB")
im.save(webp_abs, format="WEBP", quality=80)
except Exception:
errors.append(f"图片 {rel} 转换WEBP失败")
continue
try:
object_name = f"images/{webp_name}"
from minio_storage.minio_connect import upload_file, is_minio_configured
if is_minio_configured():
upload_file(webp_abs, object_name, content_type="image/webp")
image_refs.append(f"minio:{object_name}")
temp_files_to_delete.extend([src_abs, webp_abs])
else:
image_refs.append(f"local:{object_name}")
temp_files_to_delete.append(src_abs)
except Exception as e:
errors.append(f"存储图片 {rel} 失败: {e}")
continue
if len(image_refs) == 1:
image_ref_to_store = image_refs[0]
elif len(image_refs) > 1:
image_ref_to_store = json_to_string(image_refs)
to_store = {
"writer_id": str(request.session.get("user_id")),
"data": json_to_string(edited),
"image": image_ref_to_store,
}
ok = insert_data(to_store)
if ok:
success_count += 1
# 清理临时文件
for p in temp_files_to_delete:
if p and os.path.isfile(p):
try: os.remove(p)
except: pass
else:
errors.append("写入ES失败")
if success_count > 0:
msg = f"成功录入 {success_count} 条数据"
if errors:
msg += f" (遇到 {len(errors)} 个错误)"
return JsonResponse({"status": "success", "message": msg})
else:
return JsonResponse({"status": "error", "message": "录入失败: " + "; ".join(errors[:3])}, status=500)
@require_http_methods(["GET"])
@ensure_csrf_cookie
def manage_page(request):
session_user_id = request.session.get("user_id")
if session_user_id is None:
from django.shortcuts import redirect
return redirect("/accounts/login/")
me = get_user_by_id(session_user_id) or {}
is_admin = int(request.session.get("permission", 1)) == 0
has_manage_key = bool(me.get("manage_key") or [])
if is_admin:
raw_results = search_all()
else:
uid = str(session_user_id)
manage_keys = me.get("manage_key", []) or []
all_data = search_all()
raw_results = []
for r in all_data:
# 1. 自己的提交
if str(r.get("writer_id", "")) == uid:
raw_results.append(r)
continue
# 2. 管理的提交 (检查 data 中是否包含 manage_key)
if manage_keys:
r_data = str(r.get("data", ""))
for mk in manage_keys:
if mk and str(mk) in r_data:
raw_results.append(r)
break
return render(
request,
"elastic/manage.html",
{
"is_admin": is_admin,
"has_manage_key": has_manage_key,
"user_id": session_user_id,
"username": me.get("username"),
},
)
# 规范化键,避免模板点号访问下划线前缀字段
results = []
for r in raw_results:
results.append({
"id": r.get("_id", ""),
"writer_id": r.get("writer_id", ""),
"image": r.get("image", ""),
"data": r.get("data", ""),
})
user_id_qs = request.GET.get("user_id")
context = {"items": results, "user_id": user_id_qs or session_user_id}
return render(request, "elastic/manage.html", context)
@require_http_methods(["GET"])
def analytics_trend_view(request):
try:
gte = request.GET.get("from")
lte = request.GET.get("to")
interval = request.GET.get("interval", "day")
data = es_analytics_trend(gte=gte, lte=lte, interval=interval)
return JsonResponse({"status": "success", "data": data})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def analytics_types_view(request):
try:
gte = request.GET.get("from")
lte = request.GET.get("to")
size = request.GET.get("size")
try:
size_int = int(size) if size is not None else 10
except Exception:
size_int = 10
data = es_analytics_types(gte=gte, lte=lte, size=size_int)
return JsonResponse({"status": "success", "data": data})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def analytics_types_trend_view(request):
try:
gte = request.GET.get("from")
lte = request.GET.get("to")
interval = request.GET.get("interval", "week")
size = request.GET.get("size")
try:
size_int = int(size) if size is not None else 8
except Exception:
size_int = 8
data = es_analytics_types_trend(gte=gte, lte=lte, interval=interval, size=size_int)
return JsonResponse({"status": "success", "data": data})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def analytics_recent_view(request):
try:
limit = request.GET.get("limit")
gte = request.GET.get("from")
lte = request.GET.get("to")
try:
limit_int = int(limit) if limit is not None else 10
except Exception:
limit_int = 10
data = es_analytics_recent(limit=limit_int, gte=gte, lte=lte)
return JsonResponse({"status": "success", "data": data})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["POST"])
@csrf_protect
def remove_key_view(request):
if request.session.get("user_id") is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
is_admin = int(request.session.get("permission", 1)) == 0
try:
payload = json.loads(request.body.decode("utf-8"))
key_to_remove = payload.get("key")
if not key_to_remove:
return JsonResponse({"status": "error", "message": "缺少key参数"}, status=400)
key_to_remove = str(key_to_remove).strip()
if not key_to_remove:
return JsonResponse({"status": "error", "message": "缺少key参数"}, status=400)
if not is_admin:
me = get_user_by_id(request.session.get("user_id")) or {}
if not (me.get("manage_key") or []):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
allowed = {str(x).strip() for x in list(request.session.get("tutor_added_manage_keys") or []) if str(x).strip()}
if key_to_remove not in allowed:
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
from .es_connect import delete_key_globally
ok, count = delete_key_globally(key_to_remove)
if ok:
if not is_admin:
cur = [str(x).strip() for x in list(request.session.get("tutor_added_manage_keys") or []) if str(x).strip()]
cur = [k for k in cur if k != key_to_remove]
request.session["tutor_added_manage_keys"] = cur
try:
request.session.modified = True
except Exception:
pass
return JsonResponse({"status": "success", "message": f"已成功全局删除 Key '{key_to_remove}',并同步清理了 {count} 个注册码。"})
else:
return JsonResponse({"status": "error", "message": "删除失败"}, status=500)
except json.JSONDecodeError:
return HttpResponseBadRequest("Invalid JSON")
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["POST"])
@csrf_protect
def unallow_tutor_added_key_view(request):
if request.session.get("user_id") is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
if int(request.session.get("permission", 1)) == 0:
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
me = get_user_by_id(request.session.get("user_id")) or {}
if not (me.get("manage_key") or []):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
try:
payload = json.loads(request.body.decode("utf-8"))
except Exception:
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
key_name = (payload.get("key") or "").strip()
if not key_name:
return JsonResponse({"status": "error", "message": "key不能为空"}, status=400)
cur = list(request.session.get("tutor_added_manage_keys") or [])
cur = [str(x).strip() for x in cur if str(x).strip()]
if key_name not in cur:
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
cur = [k for k in cur if k != key_name]
request.session["tutor_added_manage_keys"] = cur
try:
request.session.modified = True
except Exception:
pass
return JsonResponse({"status": "success"})
@require_http_methods(["GET"])
@ensure_csrf_cookie
def user_manage(request):
session_user_id = request.session.get("user_id")
if session_user_id is None:
from django.shortcuts import redirect
return redirect("/accounts/login/")
is_admin = int(request.session.get("permission", 1)) == 0
me = get_user_by_id(session_user_id) or {}
has_manage = bool(me.get("manage_key"))
manage_keys = list(me.get("manage_key") or [])
raw_my_keys = me.get("key") or []
if isinstance(raw_my_keys, list):
my_keys = raw_my_keys
elif isinstance(raw_my_keys, str):
my_keys = re.split(r"[,;;、\r\n]+", raw_my_keys)
else:
my_keys = [raw_my_keys]
my_keys = [str(x).strip() for x in my_keys if str(x).strip()]
user_id_qs = request.GET.get("user_id")
context = {
"user_id": user_id_qs or session_user_id,
"username": me.get("username"),
"is_admin": is_admin,
"is_tutor": (not is_admin) and has_manage,
"is_student": (not is_admin) and (not has_manage),
"manage_keys_json": json.dumps(manage_keys, ensure_ascii=False),
"my_keys_json": json.dumps(my_keys, ensure_ascii=False),
}
return render(request, "elastic/users.html", context)
@require_http_methods(["GET"])
@ensure_csrf_cookie
def registration_code_manage_page(request):
session_user_id = request.session.get("user_id")
if session_user_id is None:
from django.shortcuts import redirect
return redirect("/accounts/login/")
is_admin = int(request.session.get("permission", 1)) == 0
me = get_user_by_id(session_user_id) or {}
has_manage = bool(me.get("manage_key"))
can_manage_reg = int(me.get("can_manage_registration_codes") or 0) == 1
if (not is_admin) and (not has_manage) and (not can_manage_reg):
from django.shortcuts import redirect
return redirect("/main/home/")
user_id_qs = request.GET.get("user_id")
raw_my_keys = me.get("key") or []
if isinstance(raw_my_keys, list):
my_keys = raw_my_keys
elif isinstance(raw_my_keys, str):
my_keys = re.split(r"[,;;、\r\n]+", raw_my_keys)
else:
my_keys = [raw_my_keys]
my_keys = [str(x).strip() for x in my_keys if str(x).strip()]
if can_manage_reg and (not has_manage) and (not is_admin):
allowed_manage_keys = [str(x).strip() for x in list(me.get("registration_manage_keys") or []) if str(x).strip()]
else:
allowed_manage_keys = list(request.session.get("tutor_added_manage_keys") or [])
allowed_manage_keys = [str(x).strip() for x in allowed_manage_keys if str(x).strip()]
context = {
"user_id": user_id_qs or session_user_id,
"username": me.get("username"),
"is_admin": is_admin,
"has_manage_key": has_manage,
"can_manage_registration_codes": can_manage_reg,
"my_keys_json": json.dumps(my_keys, ensure_ascii=False),
"manage_keys_json": json.dumps(list(me.get("manage_key") or []), ensure_ascii=False),
"allowed_manage_keys_json": json.dumps(allowed_manage_keys, ensure_ascii=False),
}
return render(request, "elastic/registration_codes.html", context)
@require_http_methods(["GET"])
def get_keys_list_view(request):
if request.session.get("user_id") is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
is_admin = int(request.session.get("permission", 1)) == 0
if not is_admin:
me = get_user_by_id(request.session.get("user_id")) or {}
has_manage = bool(me.get("manage_key") or [])
can_manage_reg = int(me.get("can_manage_registration_codes") or 0) == 1
if (not has_manage) and (not can_manage_reg):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
if can_manage_reg and (not has_manage):
lst = [str(x).strip() for x in list(me.get("registration_manage_keys") or []) if str(x).strip()]
return JsonResponse({"status": "success", "data": lst})
lst = get_keys_list()
return JsonResponse({"status": "success", "data": lst})
@require_http_methods(["POST"])
@csrf_protect
def add_key_view(request):
if request.session.get("user_id") is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
is_admin = int(request.session.get("permission", 1)) == 0
if not is_admin:
me = get_user_by_id(request.session.get("user_id")) or {}
has_manage = bool(me.get("manage_key") or [])
can_manage_reg = int(me.get("can_manage_registration_codes") or 0) == 1
if (not has_manage) and (not can_manage_reg):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
try:
payload = json.loads(request.body.decode("utf-8"))
except Exception:
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
key_name = (payload.get("key") or "").strip()
if not key_name:
return JsonResponse({"status": "error", "message": "key不能为空"}, status=400)
cur_global = set(get_keys_list() or [])
if key_name in cur_global:
return JsonResponse({"status": "error", "message": "key已存在"}, status=409)
ok = ensure_key_in_list(key_name)
if not ok:
return JsonResponse({"status": "error", "message": "写入失败"}, status=500)
if not is_admin:
uid = request.session.get("user_id")
if has_manage:
cur = list(request.session.get("tutor_added_manage_keys") or [])
cur = [str(x).strip() for x in cur if str(x).strip()]
if key_name not in cur:
cur.append(key_name)
request.session["tutor_added_manage_keys"] = cur
try:
request.session.modified = True
except Exception:
pass
elif can_manage_reg:
cur = [str(x).strip() for x in list((me or {}).get("registration_manage_keys") or []) if str(x).strip()]
if key_name not in cur:
cur.append(key_name)
es_update_user_by_id(uid, registration_manage_keys=cur)
return JsonResponse({"status": "success"})
@require_http_methods(["POST"])
@csrf_protect
def generate_registration_code_view(request):
if request.session.get("user_id") is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
uid = request.session.get("user_id")
is_admin = int(request.session.get("permission", 1)) == 0
if not is_admin:
me = get_user_by_id(uid) or {}
has_manage = bool(me.get("manage_key") or [])
can_manage_reg = int(me.get("can_manage_registration_codes") or 0) == 1
if (not has_manage) and (not can_manage_reg):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
try:
payload = json.loads(request.body.decode("utf-8"))
except Exception:
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
keys = list(payload.get("keys") or [])
manage_keys = list(payload.get("manage_keys") or [])
if not is_admin:
if has_manage:
my_keys = []
try:
my_keys = [str(x).strip() for x in list((me or {}).get("key") or []) if str(x).strip()]
except Exception:
my_keys = []
allowed_manage = set()
try:
allowed_manage = {str(x).strip() for x in list(request.session.get("tutor_added_manage_keys") or []) if str(x).strip()}
except Exception:
allowed_manage = set()
normalized_keys = []
seen = set()
for v in list(keys or []):
s = str(v).strip()
if not s or s in seen:
continue
seen.add(s)
normalized_keys.append(s)
missing = [k for k in my_keys if k not in seen]
if missing:
return JsonResponse(
{"status": "error", "message": f"必须选择导师原有的 key{''.join(missing)}"},
status=400,
)
keys = normalized_keys
clean_manage = []
manage_seen = set()
for v in list(manage_keys or []):
s = str(v).strip()
if not s or s in manage_seen:
continue
if s not in allowed_manage:
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
manage_seen.add(s)
clean_manage.append(s)
manage_keys = clean_manage
else:
allowed = {str(x).strip() for x in list((me or {}).get("registration_manage_keys") or []) if str(x).strip()}
norm_keys = []
seen = set()
for v in list(keys or []):
s = str(v).strip()
if not s or s in seen:
continue
if s not in allowed:
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
seen.add(s)
norm_keys.append(s)
if not norm_keys:
return JsonResponse({"status": "error", "message": "至少选择一个 key"}, status=400)
keys = norm_keys
clean_manage = []
manage_seen = set()
for v in list(manage_keys or []):
s = str(v).strip()
if not s or s in manage_seen:
continue
if s not in allowed:
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
manage_seen.add(s)
clean_manage.append(s)
manage_keys = clean_manage
try:
days = int(payload.get("expires_in_days", 30))
except Exception:
days = 30
result = generate_registration_code(keys=keys, manage_keys=manage_keys, expires_in_days=days, created_by=uid)
if not result:
return JsonResponse({"status": "error", "message": "生成失败"}, status=500)
return JsonResponse({"status": "success", "data": result})
@require_http_methods(["GET"])
def list_registration_codes_view(request):
if request.session.get("user_id") is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
uid = request.session.get("user_id")
is_admin = int(request.session.get("permission", 1)) == 0
if not is_admin:
me = get_user_by_id(uid) or {}
if (not (me.get("manage_key") or [])) and (int(me.get("can_manage_registration_codes") or 0) != 1):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
data = list_registration_codes()
if not is_admin:
data = [it for it in (data or []) if str(it.get("created_by")) == str(uid)]
return JsonResponse({"status": "success", "data": data})
@require_http_methods(["GET"])
def keys_for_filter_view(request):
uid = request.session.get("user_id")
if uid is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
is_admin = int(request.session.get("permission", 1)) == 0
try:
users = get_all_users() or []
except Exception:
users = []
def norm(v):
return str(v).strip().strip(";")
def add(out, seen, v):
s = norm(v)
if not s or s in seen:
return
seen.add(s)
out.append(s)
if is_admin:
out = []
seen = set()
for v in get_keys_list() or []:
add(out, seen, v)
for u in users:
for v in list(u.get("manage_key") or []) + list(u.get("key") or []):
add(out, seen, v)
return JsonResponse({"status": "success", "data": out})
me = get_user_by_id(uid) or {}
mgr_keys = {norm(x) for x in (me.get("manage_key") or []) if norm(x)}
visible_users = []
if mgr_keys:
for u in users:
try:
ukeys = {norm(x) for x in (u.get("key") or []) if norm(x)}
except Exception:
ukeys = set()
if ukeys & mgr_keys:
visible_users.append(u)
if str(me.get("user_id")) not in {str(u.get("user_id")) for u in visible_users}:
visible_users.append(me)
else:
visible_users = [me]
out = []
seen = set()
for v in list(me.get("manage_key") or []) + list(me.get("key") or []):
add(out, seen, v)
for u in visible_users:
for v in list(u.get("manage_key") or []) + list(u.get("key") or []):
add(out, seen, v)
return JsonResponse({"status": "success", "data": out})
def _extract_type_from_data(value):
s = value
if s is None:
return ""
if not isinstance(s, str):
try:
s = json.dumps(s, ensure_ascii=False)
except Exception:
s = str(s)
s = str(s)
try:
obj = json.loads(s)
if isinstance(obj, dict):
t = obj.get("数据类型")
if isinstance(t, str) and t.strip():
return t.strip()
except Exception:
pass
try:
m = re.search(r'"数据类型"\s*:\s*"([^"]+)"', s)
if m:
return str(m.group(1)).strip()
except Exception:
pass
return ""
@require_http_methods(["GET"])
def types_for_filter_view(request):
uid = request.session.get("user_id")
if uid is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
try:
types = [str(t).strip() for t in (get_type_list() or []) if str(t).strip()]
except Exception:
types = []
seen = set()
out = []
for t in types:
if t in seen:
continue
seen.add(t)
out.append(t)
return JsonResponse({"status": "success", "data": out})
@require_http_methods(["GET"])
def filter_view(request):
session_user_id = request.session.get("user_id")
if session_user_id is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
key = (request.GET.get("key") or "").strip()
typ = (request.GET.get("type") or "").strip()
results = search_all()
results = _filter_results_for_user(request, results)
filtered = list(results or [])
if key:
selected = str(key).strip()
try:
users = get_all_users() or []
except Exception:
users = []
writer_keys_by_id = {}
for u in users:
try:
u_id = str(u.get("user_id", "")).strip()
except Exception:
u_id = ""
if not u_id:
continue
try:
u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()}
except Exception:
u_keys = set()
writer_keys_by_id[u_id] = u_keys
tmp = []
for r in filtered:
writer_id = str(r.get("writer_id", "")).strip()
writer_keys = writer_keys_by_id.get(writer_id)
if writer_keys and selected in writer_keys:
tmp.append(r)
continue
if selected and selected in str(r.get("data", "")):
tmp.append(r)
filtered = tmp
if typ:
tsel = str(typ).strip()
filtered = [r for r in filtered if _extract_type_from_data(r.get("data")) == tsel]
data = _attach_writer_names(_attach_image_urls(request, filtered))
return JsonResponse({"status": "success", "data": data})
def _parse_dt(value):
if not value:
return None
if hasattr(value, "isoformat"):
try:
dt = value
if getattr(dt, "tzinfo", None) is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception:
pass
s = str(value).strip()
if not s:
return None
if s.endswith("Z"):
s = s[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(s)
if getattr(dt, "tzinfo", None) is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception:
return None
def _floor_dt(dt, interval: str):
if not dt:
return None
iv = str(interval or "day").strip().lower()
if iv == "month":
return dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
if iv == "week":
base = dt.replace(hour=0, minute=0, second=0, microsecond=0)
return base - timedelta(days=base.weekday())
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
def _dt_label(dt, interval: str):
if not dt:
return ""
iv = str(interval or "day").strip().lower()
if iv == "month":
return dt.strftime("%Y-%m")
return dt.strftime("%Y-%m-%d")
@require_http_methods(["GET"])
def report_view(request):
try:
session_user_id = request.session.get("user_id")
if session_user_id is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
is_admin = int(request.session.get("permission", 1)) == 0
me = get_user_by_id(session_user_id) or {}
has_manage_key = bool(me.get("manage_key") or [])
if (not is_admin) and (not has_manage_key):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
gte = (request.GET.get("from") or "").strip()
lte = (request.GET.get("to") or "").strip()
interval = (request.GET.get("interval") or "day").strip()
key = (request.GET.get("key") or "").strip()
typ = (request.GET.get("type") or "").strip()
gte_dt = _parse_dt(gte) if gte else None
lte_dt = _parse_dt(lte) if lte else None
results = search_all()
results = _filter_results_for_user(request, results)
filtered = list(results or [])
if key:
selected = str(key).strip()
try:
users = get_all_users() or []
except Exception:
users = []
writer_keys_by_id = {}
for u in users:
try:
u_id = str(u.get("user_id", "")).strip()
except Exception:
u_id = ""
if not u_id:
continue
try:
u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()}
except Exception:
u_keys = set()
writer_keys_by_id[u_id] = u_keys
tmp = []
for r in filtered:
writer_id = str(r.get("writer_id", "")).strip()
writer_keys = writer_keys_by_id.get(writer_id)
if writer_keys and selected in writer_keys:
tmp.append(r)
continue
if selected and selected in str(r.get("data", "")):
tmp.append(r)
filtered = tmp
if typ:
tsel = str(typ).strip()
filtered = [r for r in filtered if _extract_type_from_data(r.get("data")) == tsel]
ranged = []
for r in filtered:
t = _parse_dt(r.get("time"))
if (gte_dt or lte_dt) and (t is None):
continue
if gte_dt and t and t < gte_dt:
continue
if lte_dt and t and t > lte_dt:
continue
rr = dict(r)
rr["_time_dt"] = t
ranged.append(rr)
by_type = {}
by_bucket = {}
for r in ranged:
tval = _extract_type_from_data(r.get("data"))
if tval:
by_type[tval] = by_type.get(tval, 0) + 1
bucket_dt = _floor_dt(r.get("_time_dt"), interval)
if bucket_dt:
label = _dt_label(bucket_dt, interval)
if label:
by_bucket[label] = by_bucket.get(label, 0) + 1
by_type_arr = [{"type": k, "count": int(v)} for k, v in sorted(by_type.items(), key=lambda x: (-x[1], x[0]))]
by_time_arr = [{"bucket": k, "count": int(v)} for k, v in sorted(by_bucket.items(), key=lambda x: x[0])]
return JsonResponse(
{
"status": "success",
"data": {
"generated_at": datetime.now(timezone.utc).isoformat(),
"range": {"from": gte or "", "to": lte or ""},
"filters": {"key": key or "", "type": typ or "", "interval": interval or "day"},
"total": len(ranged),
"by_type": by_type_arr,
"by_time": by_time_arr,
},
}
)
except Exception as e:
return JsonResponse({"status": "error", "message": str(e) or "生成失败"}, status=500)
@require_http_methods(["GET"])
def report_csv_view(request):
resp = report_view(request)
if getattr(resp, "status_code", 200) != 200:
return resp
try:
payload = json.loads(resp.content.decode("utf-8"))
except Exception:
return JsonResponse({"status": "error", "message": "生成失败"}, status=500)
data = (payload or {}).get("data") or {}
buf = io.StringIO()
w = csv.writer(buf)
w.writerow(["统计报表"])
w.writerow(["生成时间", data.get("generated_at", "")])
rng = data.get("range") or {}
w.writerow(["时间范围", f"{rng.get('from','')} ~ {rng.get('to','')}"])
flt = data.get("filters") or {}
w.writerow(["筛查Key", flt.get("key", "")])
w.writerow(["筛查类型", flt.get("type", "")])
w.writerow(["时间粒度", flt.get("interval", "")])
w.writerow([])
w.writerow(["总数", data.get("total", 0)])
w.writerow([])
w.writerow(["按成果类型统计"])
w.writerow(["类型", "数量"])
for it in list(data.get("by_type") or []):
w.writerow([it.get("type", ""), it.get("count", 0)])
w.writerow([])
w.writerow(["按时间统计"])
w.writerow(["时间", "数量"])
for it in list(data.get("by_time") or []):
w.writerow([it.get("bucket", ""), it.get("count", 0)])
content = buf.getvalue()
out = HttpResponse(content, content_type="text/csv; charset=utf-8")
out["Content-Disposition"] = 'attachment; filename="report.csv"'
return out
@require_http_methods(["POST"])
@csrf_protect
def revoke_registration_code_view(request):
if request.session.get("user_id") is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
uid = request.session.get("user_id")
is_admin = int(request.session.get("permission", 1)) == 0
if not is_admin:
me = get_user_by_id(uid) or {}
if (not (me.get("manage_key") or [])) and (int(me.get("can_manage_registration_codes") or 0) != 1):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
try:
payload = json.loads(request.body.decode("utf-8"))
except Exception:
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
code = (payload.get("code") or "").strip()
if not code:
return JsonResponse({"status": "error", "message": "缺少code"}, status=400)
if not is_admin:
info = get_registration_code(code)
if not info or str(info.get("created_by")) != str(uid):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
ok = revoke_registration_code(code)
if not ok:
return JsonResponse({"status": "error", "message": "作废失败"}, status=500)
return JsonResponse({"status": "success"})