1752 lines
67 KiB
Python
1752 lines
67 KiB
Python
"""
|
||
ES相关的API视图
|
||
"""
|
||
import os
|
||
import re
|
||
import uuid
|
||
import base64
|
||
import json
|
||
import csv
|
||
import io
|
||
from datetime import datetime, timezone, timedelta
|
||
import tempfile
|
||
import concurrent.futures
|
||
from django.conf import settings
|
||
from django.http import JsonResponse, HttpResponse
|
||
from django.shortcuts import render
|
||
from django.views.decorators.http import require_http_methods
|
||
from django.views.decorators.csrf import ensure_csrf_cookie
|
||
from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie, csrf_protect
|
||
from .es_connect import *
|
||
from .es_connect import update_user_by_id as es_update_user_by_id, delete_user_by_id as es_delete_user_by_id
|
||
from .es_connect import (
|
||
analytics_trend as es_analytics_trend,
|
||
analytics_types as es_analytics_types,
|
||
analytics_types_trend as es_analytics_types_trend,
|
||
analytics_recent as es_analytics_recent,
|
||
)
|
||
from PIL import Image
|
||
try:
|
||
import fitz # PyMuPDF
|
||
HAS_PDF_SUPPORT = True
|
||
PDF_ERROR = ""
|
||
except ImportError as e:
|
||
try:
|
||
import pymupdf
|
||
fitz = pymupdf
|
||
HAS_PDF_SUPPORT = True
|
||
PDF_ERROR = ""
|
||
except ImportError:
|
||
HAS_PDF_SUPPORT = False
|
||
PDF_ERROR = str(e)
|
||
|
||
|
||
def _filter_results_for_user(request, results):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
return []
|
||
|
||
me = get_user_by_id(session_user_id) or {}
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
|
||
if is_admin:
|
||
return results
|
||
|
||
uid = str(session_user_id)
|
||
manage_keys = me.get("manage_key", []) or []
|
||
manage_keys_set = {str(k).strip() for k in manage_keys if str(k).strip()}
|
||
writer_keys_by_id = None
|
||
if manage_keys_set:
|
||
try:
|
||
users = get_all_users() or []
|
||
except Exception:
|
||
users = []
|
||
writer_keys_by_id = {}
|
||
for u in users:
|
||
try:
|
||
u_id = str(u.get("user_id", "")).strip()
|
||
except Exception:
|
||
u_id = ""
|
||
if not u_id:
|
||
continue
|
||
try:
|
||
u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()}
|
||
except Exception:
|
||
u_keys = set()
|
||
writer_keys_by_id[u_id] = u_keys
|
||
|
||
filtered = []
|
||
for r in results:
|
||
# 1. 自己的提交
|
||
if str(r.get("writer_id", "")) == uid:
|
||
filtered.append(r)
|
||
continue
|
||
|
||
# 2. 管理的提交:优先按“作者的 key 与我的 manage_key 交集”判断;缺失时回退为 data 字符串包含判断
|
||
if manage_keys_set:
|
||
writer_id = str(r.get("writer_id", "")).strip()
|
||
writer_keys = (writer_keys_by_id or {}).get(writer_id)
|
||
if writer_keys and (writer_keys & manage_keys_set):
|
||
filtered.append(r)
|
||
continue
|
||
r_data = str(r.get("data", ""))
|
||
for mk in manage_keys_set:
|
||
if mk and mk in r_data:
|
||
filtered.append(r)
|
||
break
|
||
return filtered
|
||
|
||
def _image_ref_to_url(request, image_ref: str) -> str:
|
||
s = str(image_ref or '').strip()
|
||
if not s:
|
||
return ''
|
||
|
||
if s.startswith('minio:'):
|
||
object_name = s[len('minio:'):].lstrip('/')
|
||
if not object_name:
|
||
return ''
|
||
|
||
try:
|
||
from minio_storage.minio_connect import presigned_get_url
|
||
return presigned_get_url(object_name, expires_seconds=8 * 60 * 60)
|
||
except Exception:
|
||
return ''
|
||
|
||
if s.startswith('local:'):
|
||
rel_path = s[len('local:'):].lstrip('/')
|
||
if not rel_path:
|
||
return ''
|
||
return request.build_absolute_uri(settings.MEDIA_URL + rel_path)
|
||
|
||
return ''
|
||
|
||
|
||
def _parse_image_refs(image_ref):
|
||
if not image_ref:
|
||
return []
|
||
if isinstance(image_ref, (list, tuple)):
|
||
return [str(x) for x in image_ref if str(x).strip()]
|
||
if isinstance(image_ref, str):
|
||
s = image_ref.strip()
|
||
if not s:
|
||
return []
|
||
parsed = None
|
||
if s[:1] in ('[', '"'):
|
||
try:
|
||
parsed = json.loads(s)
|
||
except Exception:
|
||
parsed = None
|
||
if isinstance(parsed, list):
|
||
return [str(x) for x in parsed if str(x).strip()]
|
||
if isinstance(parsed, str):
|
||
s = parsed.strip()
|
||
return [s] if s else []
|
||
return []
|
||
|
||
|
||
def _attach_image_urls(request, items):
|
||
out = []
|
||
for it in list(items or []):
|
||
try:
|
||
d = dict(it or {})
|
||
except Exception:
|
||
continue
|
||
refs = _parse_image_refs(d.get('image', ''))
|
||
urls = [_image_ref_to_url(request, r) for r in refs if str(r).strip()]
|
||
urls = [u for u in urls if u]
|
||
d['image_urls'] = urls
|
||
d['image_url'] = urls[0] if urls else _image_ref_to_url(request, d.get('image', ''))
|
||
out.append(d)
|
||
return out
|
||
|
||
|
||
def _attach_writer_names(items):
|
||
users = get_all_users()
|
||
name_by_id = {str(u.get("user_id")): u.get("username") for u in (users or [])}
|
||
out = []
|
||
for it in list(items or []):
|
||
try:
|
||
d = dict(it or {})
|
||
except Exception:
|
||
continue
|
||
wid = d.get("writer_id")
|
||
d["writer_name"] = name_by_id.get(str(wid), str(wid) if wid is not None else "")
|
||
out.append(d)
|
||
return out
|
||
|
||
|
||
@require_http_methods(["GET", "POST"])
|
||
@csrf_exempt
|
||
def init_index(request):
|
||
"""初始化ES索引"""
|
||
print("⚠️ init_index 被调用了!")
|
||
try:
|
||
create_index_with_mapping()
|
||
return JsonResponse({"status": "success", "message": "索引初始化成功"})
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_exempt
|
||
def add_data(request):
|
||
"""添加数据到ES"""
|
||
try:
|
||
data = json.loads(request.body.decode('utf-8'))
|
||
success = insert_data(data)
|
||
if success:
|
||
return JsonResponse({"status": "success", "message": "数据添加成功"})
|
||
else:
|
||
return JsonResponse({"status": "error", "message": "数据添加失败"}, status=500)
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def search(request):
|
||
"""搜索数据"""
|
||
try:
|
||
query = request.GET.get('q', '')
|
||
if not query:
|
||
return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400)
|
||
|
||
results = search_data(query)
|
||
results = _filter_results_for_user(request, results)
|
||
data = _attach_writer_names(_attach_image_urls(request, results))
|
||
return JsonResponse({"status": "success", "data": data})
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def fuzzy_search(request):
|
||
"""模糊搜索"""
|
||
try:
|
||
keyword = request.GET.get('keyword', '')
|
||
if not keyword:
|
||
return JsonResponse({"status": "error", "message": "搜索关键词不能为空"}, status=400)
|
||
|
||
results = search_by_any_field(keyword)
|
||
results = _filter_results_for_user(request, results)
|
||
data = _attach_writer_names(_attach_image_urls(request, results))
|
||
return JsonResponse({"status": "success", "data": data})
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
@require_http_methods(["GET"])
|
||
def get_all_data(request):
|
||
"""获取所有数据"""
|
||
try:
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
|
||
results = search_all()
|
||
results = _filter_results_for_user(request, results)
|
||
|
||
data = _attach_writer_names(_attach_image_urls(request, results))
|
||
return JsonResponse({"status": "success", "data": data})
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def filter_by_key(request):
|
||
try:
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
|
||
key = (request.GET.get("key") or "").strip()
|
||
results = search_all()
|
||
results = _filter_results_for_user(request, results)
|
||
if not key:
|
||
data = _attach_writer_names(_attach_image_urls(request, results))
|
||
return JsonResponse({"status": "success", "data": data})
|
||
|
||
selected = str(key).strip()
|
||
try:
|
||
users = get_all_users() or []
|
||
except Exception:
|
||
users = []
|
||
writer_keys_by_id = {}
|
||
for u in users:
|
||
try:
|
||
u_id = str(u.get("user_id", "")).strip()
|
||
except Exception:
|
||
u_id = ""
|
||
if not u_id:
|
||
continue
|
||
try:
|
||
u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()}
|
||
except Exception:
|
||
u_keys = set()
|
||
writer_keys_by_id[u_id] = u_keys
|
||
|
||
filtered = []
|
||
for r in results:
|
||
writer_id = str(r.get("writer_id", "")).strip()
|
||
writer_keys = writer_keys_by_id.get(writer_id)
|
||
if writer_keys and selected in writer_keys:
|
||
filtered.append(r)
|
||
continue
|
||
if selected and selected in str(r.get("data", "")):
|
||
filtered.append(r)
|
||
|
||
data = _attach_writer_names(_attach_image_urls(request, filtered))
|
||
return JsonResponse({"status": "success", "data": data})
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
@require_http_methods(["DELETE"])
|
||
@csrf_exempt
|
||
def delete_data(request, doc_id):
|
||
"""删除数据(需登录;管理员或作者本人)"""
|
||
request_user=request.session.get("user_id")
|
||
# request_admin=request.session.get("permisssion")
|
||
if request_user is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
|
||
|
||
try:
|
||
existing = get_by_id(doc_id)
|
||
user_existing=get_user_by_id(request_user)
|
||
|
||
if not existing:
|
||
return JsonResponse({"status": "error", "message": "数据不存在"}, status=404)
|
||
|
||
is_admin = int(user_existing.get('permission')) == 0
|
||
is_owner = str(existing.get("writer_id", "")) == str(request.session.get("user_id"))
|
||
|
||
if not (is_admin or is_owner):
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
success = delete_by_id(doc_id)
|
||
|
||
if success:
|
||
return JsonResponse({"status": "success", "message": "数据删除成功"})
|
||
else:
|
||
return JsonResponse({"status": "error", "message": "数据删除失败"}, status=500)
|
||
|
||
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
|
||
@require_http_methods(["PUT"])
|
||
@csrf_exempt
|
||
def update_data(request, doc_id):
|
||
"""更新数据(需登录;管理员或作者本人)"""
|
||
request_user = request.session.get("user_id")
|
||
if request_user is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
|
||
try:
|
||
payload = json.loads(request.body.decode('utf-8'))
|
||
except Exception:
|
||
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
|
||
try:
|
||
existing = get_by_id(doc_id)
|
||
user_existing = get_user_by_id(request_user)
|
||
|
||
if not existing:
|
||
return JsonResponse({"status": "error", "message": "数据不存在"}, status=404)
|
||
|
||
is_admin = int(user_existing.get('permission')) == 0
|
||
is_owner = str(existing.get("writer_id", "")) == str(request.session.get("user_id"))
|
||
|
||
if not (is_admin or is_owner):
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
|
||
updated = {}
|
||
if "writer_id" in payload:
|
||
updated["writer_id"] = payload["writer_id"]
|
||
if "image" in payload:
|
||
img_val = payload["image"]
|
||
if isinstance(img_val, list):
|
||
updated["image"] = json_to_string(img_val)
|
||
else:
|
||
updated["image"] = img_val
|
||
if "data" in payload:
|
||
v = payload["data"]
|
||
if isinstance(v, dict):
|
||
updated["data"] = json.dumps(v, ensure_ascii=False)
|
||
else:
|
||
try:
|
||
obj = json.loads(str(v))
|
||
updated["data"] = json.dumps(obj, ensure_ascii=False)
|
||
except Exception:
|
||
updated["data"] = str(v)
|
||
|
||
success = update_by_id(doc_id, updated)
|
||
if success:
|
||
return JsonResponse({"status": "success", "message": "数据更新成功"})
|
||
else:
|
||
return JsonResponse({"status": "error", "message": "数据更新失败"}, status=500)
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def get_data(request, doc_id):
|
||
"""获取单个数据"""
|
||
try:
|
||
result = get_by_id(doc_id)
|
||
if result:
|
||
wrapped = dict(result)
|
||
wrapped['image_url'] = _image_ref_to_url(request, wrapped.get('image', ''))
|
||
return JsonResponse({"status": "success", "data": wrapped})
|
||
else:
|
||
return JsonResponse({"status": "error", "message": "数据不存在"}, status=404)
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def add_user(request):
|
||
if request.session.get("user_id") is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
if int(request.session.get("permission", 1)) != 0:
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except Exception:
|
||
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
|
||
username = (payload.get("username") or "").strip()
|
||
password = (payload.get("password") or "").strip()
|
||
try:
|
||
permission = int(payload.get("permission", 1))
|
||
except Exception:
|
||
permission = 1
|
||
if not username:
|
||
return JsonResponse({"status": "error", "message": "用户名不能为空"}, status=400)
|
||
if password and len(password) < 6:
|
||
return JsonResponse({"status": "error", "message": "密码长度至少为6位"}, status=400)
|
||
existing = get_user_by_username(username)
|
||
if existing:
|
||
return JsonResponse({"status": "error", "message": "用户名已存在"}, status=409)
|
||
users = get_all_users()
|
||
next_id = (max([int(u.get("user_id", 0)) for u in users]) + 1) if users else 1
|
||
ok = write_user_data({
|
||
"user_id": next_id,
|
||
"username": username,
|
||
"password": password,
|
||
"permission": permission,
|
||
})
|
||
if not ok:
|
||
return JsonResponse({"status": "error", "message": "用户添加失败"}, status=500)
|
||
return JsonResponse({"status": "success", "message": "用户添加成功"})
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def get_users(request):
|
||
uid = request.session.get("user_id")
|
||
if uid is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
try:
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
requester = get_user_by_id(uid) or {}
|
||
mgr_keys = set(requester.get("manage_key") or [])
|
||
key_q = (request.GET.get("key") or "").strip()
|
||
q = (request.GET.get("search") or "").strip()
|
||
users = get_all_users()
|
||
if is_admin:
|
||
filtered = users
|
||
elif mgr_keys:
|
||
def match_manage(user):
|
||
ukeys = set(user.get("key") or [])
|
||
return bool(ukeys & mgr_keys)
|
||
filtered = [u for u in users if match_manage(u)]
|
||
else:
|
||
filtered = [u for u in users if str(u.get("user_id")) == str(uid)]
|
||
if key_q:
|
||
k = str(key_q).strip()
|
||
def match_key(user):
|
||
try:
|
||
ukeys = {str(x).strip() for x in (user.get("key") or []) if str(x).strip()}
|
||
except Exception:
|
||
ukeys = set()
|
||
try:
|
||
umkeys = {str(x).strip() for x in (user.get("manage_key") or []) if str(x).strip()}
|
||
except Exception:
|
||
umkeys = set()
|
||
return (k in ukeys) or (k in umkeys)
|
||
filtered = [u for u in filtered if match_key(u)]
|
||
if q:
|
||
filtered = [u for u in filtered if q in str(u.get("username", ""))]
|
||
return JsonResponse({"status": "success", "data": filtered})
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def update_user_by_id_view(request, user_id):
|
||
uid = request.session.get("user_id")
|
||
if uid is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except Exception:
|
||
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
|
||
new_username = (payload.get("username") or "").strip()
|
||
new_permission = payload.get("permission")
|
||
new_password = (payload.get("password") or "").strip()
|
||
raw_keys = payload.get("key", None)
|
||
raw_manage_keys = payload.get("manage_key", None)
|
||
if new_password and len(new_password) < 6:
|
||
return JsonResponse({"status": "error", "message": "密码长度至少为6位"}, status=400)
|
||
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
requester = get_user_by_id(uid) or {}
|
||
target = get_user_by_id(user_id) or {}
|
||
requester_mgr = set(requester.get("manage_key") or [])
|
||
target_keys = set(target.get("key") or [])
|
||
|
||
def normalize_keys(v):
|
||
if v is None:
|
||
return None
|
||
if isinstance(v, list):
|
||
parts = v
|
||
elif isinstance(v, str):
|
||
parts = re.split(r"[,,;;、\r\n]+", v)
|
||
else:
|
||
parts = [v]
|
||
out = []
|
||
seen = set()
|
||
for p in parts:
|
||
s = str(p).strip().strip(";")
|
||
if not s or s in seen:
|
||
continue
|
||
seen.add(s)
|
||
out.append(s)
|
||
return out
|
||
|
||
new_keys = normalize_keys(raw_keys)
|
||
new_manage_keys = normalize_keys(raw_manage_keys)
|
||
|
||
if is_admin:
|
||
if new_username:
|
||
other = get_user_by_username(new_username)
|
||
if other and int(other.get("user_id", -1)) != int(user_id):
|
||
return JsonResponse({"status": "error", "message": "用户名已存在"}, status=409)
|
||
ok = es_update_user_by_id(
|
||
user_id,
|
||
username=new_username if new_username else None,
|
||
permission=int(new_permission) if new_permission is not None else None,
|
||
password=new_password if new_password else None,
|
||
key=new_keys,
|
||
manage_key=new_manage_keys,
|
||
)
|
||
return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500)
|
||
|
||
if str(uid) == str(user_id):
|
||
if not new_password:
|
||
return JsonResponse({"status": "error", "message": "仅允许修改密码"}, status=400)
|
||
ok = es_update_user_by_id(user_id, password=new_password)
|
||
return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500)
|
||
|
||
if requester_mgr and (target_keys & requester_mgr):
|
||
if new_username or new_permission is not None or new_manage_keys is not None:
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
if not new_password and new_keys is None:
|
||
return JsonResponse({"status": "error", "message": "缺少更新内容"}, status=400)
|
||
merged_keys = None
|
||
if new_keys is not None:
|
||
try:
|
||
new_keys_set = set(new_keys)
|
||
except Exception:
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
requester_locked = set(normalize_keys(requester.get("key")) or [])
|
||
if new_keys_set & requester_locked:
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
existing_keys = [str(v).strip() for v in list(target.get("key") or []) if str(v).strip()]
|
||
preserved = [k for k in existing_keys if k in requester_locked]
|
||
merged_keys = preserved + [str(v).strip() for v in list(new_keys) if str(v).strip() and str(v).strip() not in requester_locked]
|
||
seen = set()
|
||
merged_keys = [k for k in merged_keys if not (k in seen or seen.add(k))]
|
||
ok = es_update_user_by_id(user_id, password=new_password if new_password else None, key=merged_keys)
|
||
return JsonResponse({"status": "success"}) if ok else JsonResponse({"status": "error", "message": "用户更新失败"}, status=500)
|
||
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def delete_user_by_id_view(request, user_id):
|
||
if request.session.get("user_id") is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
if int(request.session.get("permission", 1)) != 0:
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
ok = es_delete_user_by_id(user_id)
|
||
if not ok:
|
||
return JsonResponse({"status": "error", "message": "用户删除失败"}, status=500)
|
||
return JsonResponse({"status": "success", "message": "用户删除成功"})
|
||
|
||
|
||
# 辅助:JSON 转换
|
||
def json_to_string(obj):
|
||
if isinstance(obj, str):
|
||
# 如果已经是字符串,尝试解析一下验证是否为有效 JSON,或者是普通文本
|
||
try:
|
||
json.loads(obj)
|
||
return obj # 是有效 JSON 字符串,直接返回
|
||
except Exception:
|
||
# 不是 JSON 字符串,可能是普通文本,将其封装成 JSON
|
||
return json.dumps(obj, ensure_ascii=False)
|
||
try:
|
||
return json.dumps(obj, ensure_ascii=False)
|
||
except Exception:
|
||
return str(obj)
|
||
|
||
|
||
def string_to_json(s):
|
||
try:
|
||
return json.loads(s)
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
# 移植自 a.py 的核心:调用大模型进行 OCR/信息抽取
|
||
def ocr_and_extract_info(image_path: str):
|
||
# from openai import OpenAI
|
||
def encode_image(path: str) -> str:
|
||
with open(path, "rb") as f:
|
||
return base64.b64encode(f.read()).decode("utf-8")
|
||
|
||
base64_image = encode_image(image_path)
|
||
|
||
# api_key = getattr(settings, "AISTUDIO_API_KEY", "188f57db3766e02ed2c7e18373996d84f4112272")
|
||
# base_url = getattr(settings, "OPENAI_BASE_URL", "https://aistudio.baidu.com/llm/lmapi/v3")
|
||
# if not api_key:
|
||
# raise RuntimeError("缺少 AISTUDIO_API_KEY,请在环境变量或 settings 中配置")
|
||
|
||
|
||
# api_key = getattr(settings, "AISTUDIO_API_KEY", "")
|
||
# base_url = getattr(settings, "OPENAI_BASE_URL", "")
|
||
# if not api_key or not base_url:
|
||
# raise RuntimeError("缺少模型服务配置,请设置 AISTUDIO_API_KEY 与 OPENAI_BASE_URL")
|
||
# client = OpenAI(api_key=api_key, base_url=base_url)
|
||
# types = get_type_list()
|
||
# chat_completion = client.chat.completions.create(
|
||
# messages=[
|
||
# {"role": "system", "content": "你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。"},
|
||
# {
|
||
# "role": "user",
|
||
# "content": [
|
||
# {"type": "text", "text": f"请识别这张图片中的信息,将你认为重要的数据转换为不包含嵌套的json,不要显示其它信息以便于解析,直接输出json结果即可。使用“数据类型”字段表示这个东西的大致类型,除此之外你可以自行决定使用哪些json字段。“数据类型”的内容有严格规定,请查看{json.dumps(types, ensure_ascii=False)}中是否包含你所需要的类型,确定不包含后你才可以填入你觉得合适的大致分类。"},
|
||
# {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}},
|
||
# ],
|
||
# },
|
||
# ],
|
||
# model="glm-5",
|
||
# )
|
||
# response_text = chat_completion.choices[0].message.content
|
||
|
||
from zai import ZhipuAiClient
|
||
import httpx
|
||
# api_key = (
|
||
# getattr(settings, "ZHIPU_API_KEY", "")
|
||
# or getattr(settings, "ZAI_API_KEY", "")
|
||
# or getattr(settings, "AISTUDIO_API_KEY", "")
|
||
# )
|
||
# if not api_key:
|
||
# raise RuntimeError("缺少模型服务配置,请设置 ZHIPU_API_KEY")
|
||
# base_url = (
|
||
# getattr(settings, "ZHIPU_BASE_URL", "")
|
||
# or getattr(settings, "ZAI_BASE_URL", "")
|
||
# or "https://open.bigmodel.cn/api/paas/v4/"
|
||
# )
|
||
client = ZhipuAiClient(api_key="fb83a3f91e8c4e45af811236548765a2.cX4kUhigHm7VNowf")
|
||
types = get_type_list()
|
||
chat_completion = client.chat.completions.create(
|
||
messages=[
|
||
{"role": "system", "content": "你是一个能理解图片和文本的助手,请根据用户提供的信息进行回答。"},
|
||
{
|
||
"role": "user",
|
||
"content": [
|
||
{"type": "text", "text": f"请识别这张图片中的信息,将你认为重要的数据转换为不包含嵌套的json,不要显示其它信息以便于解析,直接输出json结果即可。使用“数据类型”字段表示这个东西的大致类型,除此之外你可以自行决定使用哪些json字段。“数据类型”的内容有严格规定,请查看{json.dumps(types, ensure_ascii=False)}中是否包含你所需要的类型,确定不包含后你才可以填入你觉得合适的大致分类。"},
|
||
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}},
|
||
],
|
||
},
|
||
],
|
||
model="glm-4.6v",
|
||
)
|
||
response_text = chat_completion.choices[0].message.content
|
||
|
||
def parse_response(text: str):
|
||
try:
|
||
result = json.loads(text)
|
||
if result:
|
||
return result
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
m = re.search(r"```json\n(.*?)```", text, re.DOTALL)
|
||
if m:
|
||
try:
|
||
result = json.loads(m.group(1))
|
||
if result:
|
||
return result
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
try:
|
||
fixed = text.replace("'", '"')
|
||
result = json.loads(fixed)
|
||
if result:
|
||
return result
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
return None
|
||
|
||
return parse_response(response_text)
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
@ensure_csrf_cookie
|
||
def upload_page(request):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
from django.shortcuts import redirect
|
||
return redirect("/accounts/login/")
|
||
user_id_qs = request.GET.get("user_id")
|
||
me = get_user_by_id(session_user_id) or {}
|
||
context = {
|
||
"user_id": user_id_qs or session_user_id,
|
||
"username": me.get("username"),
|
||
}
|
||
return render(request, "elastic/upload.html", context)
|
||
|
||
|
||
# 上传并识别(不入库)
|
||
@require_http_methods(["POST"])
|
||
def upload(request):
|
||
if request.session.get("user_id") is None:
|
||
fallback_uid = request.POST.get("user_id") or request.GET.get("user_id")
|
||
if fallback_uid:
|
||
request.session["user_id"] = fallback_uid
|
||
request.session.setdefault("permission", 1)
|
||
else:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
|
||
files = request.FILES.getlist("file")
|
||
if not files:
|
||
one = request.FILES.get("file")
|
||
if one:
|
||
files = [one]
|
||
if not files:
|
||
return JsonResponse({"status": "error", "message": "未选择文件"}, status=400)
|
||
|
||
images_dir = os.path.join(settings.MEDIA_ROOT, "images")
|
||
os.makedirs(images_dir, exist_ok=True)
|
||
|
||
# 按照原始文件进行分组处理
|
||
file_results = []
|
||
|
||
for f in files:
|
||
group_images = [] # 存储该文件生成的所有图片路径信息 (abs_path, filename)
|
||
is_pdf = f.name.lower().endswith('.pdf')
|
||
|
||
if is_pdf:
|
||
if not HAS_PDF_SUPPORT:
|
||
return JsonResponse({"status": "error", "message": f"服务器未安装PDF处理组件(PyMuPDF): {PDF_ERROR}"}, status=500)
|
||
|
||
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp:
|
||
for chunk in f.chunks():
|
||
tmp.write(chunk)
|
||
tmp_path = tmp.name
|
||
|
||
try:
|
||
doc = fitz.open(tmp_path)
|
||
for i in range(len(doc)):
|
||
page = doc.load_page(i)
|
||
pix = page.get_pixmap(dpi=150)
|
||
img_filename = f"{uuid.uuid4()}_page_{i+1}.jpg"
|
||
img_abs_path = os.path.join(images_dir, img_filename)
|
||
pix.save(img_abs_path)
|
||
group_images.append((img_abs_path, img_filename))
|
||
doc.close()
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": f"PDF {f.name} 转换失败: {str(e)}"}, status=500)
|
||
finally:
|
||
if os.path.exists(tmp_path):
|
||
os.remove(tmp_path)
|
||
else:
|
||
filename = f"{uuid.uuid4()}_{f.name}"
|
||
abs_path = os.path.join(images_dir, filename)
|
||
with open(abs_path, "wb") as dst:
|
||
for chunk in f.chunks():
|
||
dst.write(chunk)
|
||
group_images.append((abs_path, filename))
|
||
|
||
# 对该组图片并行进行 OCR 识别
|
||
def run_ocr(img_info):
|
||
abs_p, fname = img_info
|
||
try:
|
||
data = ocr_and_extract_info(abs_p)
|
||
return data
|
||
except Exception:
|
||
return None
|
||
|
||
group_data_list = []
|
||
with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(group_images), 8)) as executor:
|
||
futures = [executor.submit(run_ocr, img_info) for img_info in group_images]
|
||
for future in concurrent.futures.as_completed(futures):
|
||
res = future.result()
|
||
if res:
|
||
group_data_list.append(res)
|
||
|
||
# 合并该文件的多页识别结果
|
||
merged_group_data = {}
|
||
for item in group_data_list:
|
||
if not isinstance(item, dict): continue
|
||
for k, v in item.items():
|
||
key = str(k).strip()
|
||
if not key: continue
|
||
if key not in merged_group_data or merged_group_data.get(key) in (None, ''):
|
||
merged_group_data[key] = v
|
||
elif merged_group_data.get(key) != v:
|
||
base = key
|
||
idx = 2
|
||
while f"{base}_{idx}" in merged_group_data: idx += 1
|
||
merged_group_data[f"{base}_{idx}"] = v
|
||
|
||
if not merged_group_data:
|
||
# 如果没识别到,至少保留一个空结构或者包含文件名的提示
|
||
merged_group_data = {"文件名": f.name, "提示": "未识别到具体内容"}
|
||
|
||
rel_paths = [f"images/{img[1]}" for img in group_images]
|
||
image_urls = [request.build_absolute_uri(settings.MEDIA_URL + rp) for rp in rel_paths]
|
||
|
||
file_results.append({
|
||
"name": f.name,
|
||
"data": merged_group_data,
|
||
"images": rel_paths,
|
||
"image_urls": image_urls,
|
||
})
|
||
|
||
return JsonResponse({
|
||
"status": "success",
|
||
"message": f"成功处理 {len(file_results)} 个文件,请确认数据后点击录入",
|
||
"items": file_results,
|
||
})
|
||
|
||
|
||
# 确认并入库
|
||
@require_http_methods(["POST"])
|
||
def confirm(request):
|
||
if request.session.get("user_id") is None:
|
||
try:
|
||
payload_for_uid = json.loads(request.body.decode("utf-8"))
|
||
except Exception:
|
||
payload_for_uid = {}
|
||
fb_uid = (payload_for_uid or {}).get("user_id")
|
||
if fb_uid:
|
||
request.session["user_id"] = fb_uid
|
||
request.session.setdefault("permission", 1)
|
||
else:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except json.JSONDecodeError:
|
||
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
|
||
|
||
# 支持单项或批量入库
|
||
items = payload.get("items")
|
||
if not items:
|
||
# 兼容旧版本单项入库
|
||
items = [{
|
||
"data": payload.get("data"),
|
||
"image": payload.get("image")
|
||
}]
|
||
|
||
success_count = 0
|
||
errors = []
|
||
|
||
for item in items:
|
||
edited = item.get("data") or {}
|
||
image_rel = item.get("image") or ""
|
||
|
||
if not isinstance(edited, dict) or not edited:
|
||
errors.append("数据项不能为空")
|
||
continue
|
||
|
||
ensure_type_in_list(edited.get("数据类型"))
|
||
image_ref_to_store = ""
|
||
temp_files_to_delete = []
|
||
image_rels = _parse_image_refs(image_rel)
|
||
|
||
if image_rels:
|
||
images_dir = os.path.join(settings.MEDIA_ROOT, "images")
|
||
os.makedirs(images_dir, exist_ok=True)
|
||
image_refs = []
|
||
for rel in image_rels:
|
||
src_abs = os.path.join(settings.MEDIA_ROOT, rel)
|
||
if not os.path.isfile(src_abs):
|
||
errors.append(f"图片文件 {rel} 不存在")
|
||
continue
|
||
|
||
webp_name = f"{uuid.uuid4().hex}.webp"
|
||
webp_abs = os.path.join(images_dir, webp_name)
|
||
try:
|
||
with Image.open(src_abs) as im:
|
||
if im.mode in ("RGBA", "LA", "P"):
|
||
im = im.convert("RGBA")
|
||
else:
|
||
im = im.convert("RGB")
|
||
im.save(webp_abs, format="WEBP", quality=80)
|
||
except Exception:
|
||
errors.append(f"图片 {rel} 转换WEBP失败")
|
||
continue
|
||
|
||
try:
|
||
object_name = f"images/{webp_name}"
|
||
from minio_storage.minio_connect import upload_file, is_minio_configured
|
||
if is_minio_configured():
|
||
upload_file(webp_abs, object_name, content_type="image/webp")
|
||
image_refs.append(f"minio:{object_name}")
|
||
temp_files_to_delete.extend([src_abs, webp_abs])
|
||
else:
|
||
image_refs.append(f"local:{object_name}")
|
||
temp_files_to_delete.append(src_abs)
|
||
except Exception as e:
|
||
errors.append(f"存储图片 {rel} 失败: {e}")
|
||
continue
|
||
|
||
if len(image_refs) == 1:
|
||
image_ref_to_store = image_refs[0]
|
||
elif len(image_refs) > 1:
|
||
image_ref_to_store = json_to_string(image_refs)
|
||
|
||
to_store = {
|
||
"writer_id": str(request.session.get("user_id")),
|
||
"data": json_to_string(edited),
|
||
"image": image_ref_to_store,
|
||
}
|
||
|
||
ok = insert_data(to_store)
|
||
if ok:
|
||
success_count += 1
|
||
# 清理临时文件
|
||
for p in temp_files_to_delete:
|
||
if p and os.path.isfile(p):
|
||
try: os.remove(p)
|
||
except: pass
|
||
else:
|
||
errors.append("写入ES失败")
|
||
|
||
if success_count > 0:
|
||
msg = f"成功录入 {success_count} 条数据"
|
||
if errors:
|
||
msg += f" (遇到 {len(errors)} 个错误)"
|
||
return JsonResponse({"status": "success", "message": msg})
|
||
else:
|
||
return JsonResponse({"status": "error", "message": "录入失败: " + "; ".join(errors[:3])}, status=500)
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
@ensure_csrf_cookie
|
||
def manage_page(request):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
from django.shortcuts import redirect
|
||
return redirect("/accounts/login/")
|
||
|
||
me = get_user_by_id(session_user_id) or {}
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
has_manage_key = bool(me.get("manage_key") or [])
|
||
if is_admin:
|
||
raw_results = search_all()
|
||
else:
|
||
uid = str(session_user_id)
|
||
manage_keys = me.get("manage_key", []) or []
|
||
|
||
all_data = search_all()
|
||
raw_results = []
|
||
for r in all_data:
|
||
# 1. 自己的提交
|
||
if str(r.get("writer_id", "")) == uid:
|
||
raw_results.append(r)
|
||
continue
|
||
|
||
# 2. 管理的提交 (检查 data 中是否包含 manage_key)
|
||
if manage_keys:
|
||
r_data = str(r.get("data", ""))
|
||
for mk in manage_keys:
|
||
if mk and str(mk) in r_data:
|
||
raw_results.append(r)
|
||
break
|
||
|
||
return render(
|
||
request,
|
||
"elastic/manage.html",
|
||
{
|
||
"is_admin": is_admin,
|
||
"has_manage_key": has_manage_key,
|
||
"user_id": session_user_id,
|
||
"username": me.get("username"),
|
||
},
|
||
)
|
||
# 规范化键,避免模板点号访问下划线前缀字段
|
||
results = []
|
||
for r in raw_results:
|
||
results.append({
|
||
"id": r.get("_id", ""),
|
||
"writer_id": r.get("writer_id", ""),
|
||
"image": r.get("image", ""),
|
||
"data": r.get("data", ""),
|
||
})
|
||
user_id_qs = request.GET.get("user_id")
|
||
context = {"items": results, "user_id": user_id_qs or session_user_id}
|
||
return render(request, "elastic/manage.html", context)
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def analytics_trend_view(request):
|
||
try:
|
||
gte = request.GET.get("from")
|
||
lte = request.GET.get("to")
|
||
interval = request.GET.get("interval", "day")
|
||
data = es_analytics_trend(gte=gte, lte=lte, interval=interval)
|
||
return JsonResponse({"status": "success", "data": data})
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def analytics_types_view(request):
|
||
try:
|
||
gte = request.GET.get("from")
|
||
lte = request.GET.get("to")
|
||
size = request.GET.get("size")
|
||
try:
|
||
size_int = int(size) if size is not None else 10
|
||
except Exception:
|
||
size_int = 10
|
||
data = es_analytics_types(gte=gte, lte=lte, size=size_int)
|
||
return JsonResponse({"status": "success", "data": data})
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def analytics_types_trend_view(request):
|
||
try:
|
||
gte = request.GET.get("from")
|
||
lte = request.GET.get("to")
|
||
interval = request.GET.get("interval", "week")
|
||
size = request.GET.get("size")
|
||
try:
|
||
size_int = int(size) if size is not None else 8
|
||
except Exception:
|
||
size_int = 8
|
||
data = es_analytics_types_trend(gte=gte, lte=lte, interval=interval, size=size_int)
|
||
return JsonResponse({"status": "success", "data": data})
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def analytics_recent_view(request):
|
||
try:
|
||
limit = request.GET.get("limit")
|
||
gte = request.GET.get("from")
|
||
lte = request.GET.get("to")
|
||
try:
|
||
limit_int = int(limit) if limit is not None else 10
|
||
except Exception:
|
||
limit_int = 10
|
||
data = es_analytics_recent(limit=limit_int, gte=gte, lte=lte)
|
||
return JsonResponse({"status": "success", "data": data})
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def remove_key_view(request):
|
||
if request.session.get("user_id") is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
key_to_remove = payload.get("key")
|
||
|
||
if not key_to_remove:
|
||
return JsonResponse({"status": "error", "message": "缺少key参数"}, status=400)
|
||
key_to_remove = str(key_to_remove).strip()
|
||
if not key_to_remove:
|
||
return JsonResponse({"status": "error", "message": "缺少key参数"}, status=400)
|
||
|
||
if not is_admin:
|
||
me = get_user_by_id(request.session.get("user_id")) or {}
|
||
if not (me.get("manage_key") or []):
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
allowed = {str(x).strip() for x in list(request.session.get("tutor_added_manage_keys") or []) if str(x).strip()}
|
||
if key_to_remove not in allowed:
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
|
||
from .es_connect import delete_key_globally
|
||
ok, count = delete_key_globally(key_to_remove)
|
||
|
||
if ok:
|
||
if not is_admin:
|
||
cur = [str(x).strip() for x in list(request.session.get("tutor_added_manage_keys") or []) if str(x).strip()]
|
||
cur = [k for k in cur if k != key_to_remove]
|
||
request.session["tutor_added_manage_keys"] = cur
|
||
try:
|
||
request.session.modified = True
|
||
except Exception:
|
||
pass
|
||
return JsonResponse({"status": "success", "message": f"已成功全局删除 Key '{key_to_remove}',并同步清理了 {count} 个注册码。"})
|
||
else:
|
||
return JsonResponse({"status": "error", "message": "删除失败"}, status=500)
|
||
|
||
except json.JSONDecodeError:
|
||
return HttpResponseBadRequest("Invalid JSON")
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def unallow_tutor_added_key_view(request):
|
||
if request.session.get("user_id") is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
if int(request.session.get("permission", 1)) == 0:
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
me = get_user_by_id(request.session.get("user_id")) or {}
|
||
if not (me.get("manage_key") or []):
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except Exception:
|
||
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
|
||
key_name = (payload.get("key") or "").strip()
|
||
if not key_name:
|
||
return JsonResponse({"status": "error", "message": "key不能为空"}, status=400)
|
||
cur = list(request.session.get("tutor_added_manage_keys") or [])
|
||
cur = [str(x).strip() for x in cur if str(x).strip()]
|
||
if key_name not in cur:
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
cur = [k for k in cur if k != key_name]
|
||
request.session["tutor_added_manage_keys"] = cur
|
||
try:
|
||
request.session.modified = True
|
||
except Exception:
|
||
pass
|
||
return JsonResponse({"status": "success"})
|
||
|
||
@require_http_methods(["GET"])
|
||
@ensure_csrf_cookie
|
||
def user_manage(request):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
from django.shortcuts import redirect
|
||
return redirect("/accounts/login/")
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
me = get_user_by_id(session_user_id) or {}
|
||
has_manage = bool(me.get("manage_key"))
|
||
manage_keys = list(me.get("manage_key") or [])
|
||
raw_my_keys = me.get("key") or []
|
||
if isinstance(raw_my_keys, list):
|
||
my_keys = raw_my_keys
|
||
elif isinstance(raw_my_keys, str):
|
||
my_keys = re.split(r"[,,;;、\r\n]+", raw_my_keys)
|
||
else:
|
||
my_keys = [raw_my_keys]
|
||
my_keys = [str(x).strip() for x in my_keys if str(x).strip()]
|
||
user_id_qs = request.GET.get("user_id")
|
||
context = {
|
||
"user_id": user_id_qs or session_user_id,
|
||
"username": me.get("username"),
|
||
"is_admin": is_admin,
|
||
"is_tutor": (not is_admin) and has_manage,
|
||
"is_student": (not is_admin) and (not has_manage),
|
||
"manage_keys_json": json.dumps(manage_keys, ensure_ascii=False),
|
||
"my_keys_json": json.dumps(my_keys, ensure_ascii=False),
|
||
}
|
||
return render(request, "elastic/users.html", context)
|
||
|
||
@require_http_methods(["GET"])
|
||
@ensure_csrf_cookie
|
||
def registration_code_manage_page(request):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
from django.shortcuts import redirect
|
||
return redirect("/accounts/login/")
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
me = get_user_by_id(session_user_id) or {}
|
||
has_manage = bool(me.get("manage_key"))
|
||
can_manage_reg = int(me.get("can_manage_registration_codes") or 0) == 1
|
||
if (not is_admin) and (not has_manage) and (not can_manage_reg):
|
||
from django.shortcuts import redirect
|
||
return redirect("/main/home/")
|
||
user_id_qs = request.GET.get("user_id")
|
||
raw_my_keys = me.get("key") or []
|
||
if isinstance(raw_my_keys, list):
|
||
my_keys = raw_my_keys
|
||
elif isinstance(raw_my_keys, str):
|
||
my_keys = re.split(r"[,,;;、\r\n]+", raw_my_keys)
|
||
else:
|
||
my_keys = [raw_my_keys]
|
||
my_keys = [str(x).strip() for x in my_keys if str(x).strip()]
|
||
if can_manage_reg and (not has_manage) and (not is_admin):
|
||
allowed_manage_keys = [str(x).strip() for x in list(me.get("registration_manage_keys") or []) if str(x).strip()]
|
||
else:
|
||
allowed_manage_keys = list(request.session.get("tutor_added_manage_keys") or [])
|
||
allowed_manage_keys = [str(x).strip() for x in allowed_manage_keys if str(x).strip()]
|
||
context = {
|
||
"user_id": user_id_qs or session_user_id,
|
||
"username": me.get("username"),
|
||
"is_admin": is_admin,
|
||
"has_manage_key": has_manage,
|
||
"can_manage_registration_codes": can_manage_reg,
|
||
"my_keys_json": json.dumps(my_keys, ensure_ascii=False),
|
||
"manage_keys_json": json.dumps(list(me.get("manage_key") or []), ensure_ascii=False),
|
||
"allowed_manage_keys_json": json.dumps(allowed_manage_keys, ensure_ascii=False),
|
||
}
|
||
return render(request, "elastic/registration_codes.html", context)
|
||
|
||
@require_http_methods(["GET"])
|
||
def get_keys_list_view(request):
|
||
if request.session.get("user_id") is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
if not is_admin:
|
||
me = get_user_by_id(request.session.get("user_id")) or {}
|
||
has_manage = bool(me.get("manage_key") or [])
|
||
can_manage_reg = int(me.get("can_manage_registration_codes") or 0) == 1
|
||
if (not has_manage) and (not can_manage_reg):
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
if can_manage_reg and (not has_manage):
|
||
lst = [str(x).strip() for x in list(me.get("registration_manage_keys") or []) if str(x).strip()]
|
||
return JsonResponse({"status": "success", "data": lst})
|
||
lst = get_keys_list()
|
||
return JsonResponse({"status": "success", "data": lst})
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def add_key_view(request):
|
||
if request.session.get("user_id") is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
if not is_admin:
|
||
me = get_user_by_id(request.session.get("user_id")) or {}
|
||
has_manage = bool(me.get("manage_key") or [])
|
||
can_manage_reg = int(me.get("can_manage_registration_codes") or 0) == 1
|
||
if (not has_manage) and (not can_manage_reg):
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except Exception:
|
||
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
|
||
key_name = (payload.get("key") or "").strip()
|
||
if not key_name:
|
||
return JsonResponse({"status": "error", "message": "key不能为空"}, status=400)
|
||
cur_global = set(get_keys_list() or [])
|
||
if key_name in cur_global:
|
||
return JsonResponse({"status": "error", "message": "key已存在"}, status=409)
|
||
ok = ensure_key_in_list(key_name)
|
||
if not ok:
|
||
return JsonResponse({"status": "error", "message": "写入失败"}, status=500)
|
||
if not is_admin:
|
||
uid = request.session.get("user_id")
|
||
if has_manage:
|
||
cur = list(request.session.get("tutor_added_manage_keys") or [])
|
||
cur = [str(x).strip() for x in cur if str(x).strip()]
|
||
if key_name not in cur:
|
||
cur.append(key_name)
|
||
request.session["tutor_added_manage_keys"] = cur
|
||
try:
|
||
request.session.modified = True
|
||
except Exception:
|
||
pass
|
||
elif can_manage_reg:
|
||
cur = [str(x).strip() for x in list((me or {}).get("registration_manage_keys") or []) if str(x).strip()]
|
||
if key_name not in cur:
|
||
cur.append(key_name)
|
||
es_update_user_by_id(uid, registration_manage_keys=cur)
|
||
return JsonResponse({"status": "success"})
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def generate_registration_code_view(request):
|
||
if request.session.get("user_id") is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
uid = request.session.get("user_id")
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
if not is_admin:
|
||
me = get_user_by_id(uid) or {}
|
||
has_manage = bool(me.get("manage_key") or [])
|
||
can_manage_reg = int(me.get("can_manage_registration_codes") or 0) == 1
|
||
if (not has_manage) and (not can_manage_reg):
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except Exception:
|
||
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
|
||
keys = list(payload.get("keys") or [])
|
||
manage_keys = list(payload.get("manage_keys") or [])
|
||
if not is_admin:
|
||
if has_manage:
|
||
my_keys = []
|
||
try:
|
||
my_keys = [str(x).strip() for x in list((me or {}).get("key") or []) if str(x).strip()]
|
||
except Exception:
|
||
my_keys = []
|
||
allowed_manage = set()
|
||
try:
|
||
allowed_manage = {str(x).strip() for x in list(request.session.get("tutor_added_manage_keys") or []) if str(x).strip()}
|
||
except Exception:
|
||
allowed_manage = set()
|
||
normalized_keys = []
|
||
seen = set()
|
||
for v in list(keys or []):
|
||
s = str(v).strip()
|
||
if not s or s in seen:
|
||
continue
|
||
seen.add(s)
|
||
normalized_keys.append(s)
|
||
missing = [k for k in my_keys if k not in seen]
|
||
if missing:
|
||
return JsonResponse(
|
||
{"status": "error", "message": f"必须选择导师原有的 key:{'、'.join(missing)}"},
|
||
status=400,
|
||
)
|
||
keys = normalized_keys
|
||
clean_manage = []
|
||
manage_seen = set()
|
||
for v in list(manage_keys or []):
|
||
s = str(v).strip()
|
||
if not s or s in manage_seen:
|
||
continue
|
||
if s not in allowed_manage:
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
manage_seen.add(s)
|
||
clean_manage.append(s)
|
||
manage_keys = clean_manage
|
||
else:
|
||
allowed = {str(x).strip() for x in list((me or {}).get("registration_manage_keys") or []) if str(x).strip()}
|
||
norm_keys = []
|
||
seen = set()
|
||
for v in list(keys or []):
|
||
s = str(v).strip()
|
||
if not s or s in seen:
|
||
continue
|
||
if s not in allowed:
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
seen.add(s)
|
||
norm_keys.append(s)
|
||
if not norm_keys:
|
||
return JsonResponse({"status": "error", "message": "至少选择一个 key"}, status=400)
|
||
keys = norm_keys
|
||
clean_manage = []
|
||
manage_seen = set()
|
||
for v in list(manage_keys or []):
|
||
s = str(v).strip()
|
||
if not s or s in manage_seen:
|
||
continue
|
||
if s not in allowed:
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
manage_seen.add(s)
|
||
clean_manage.append(s)
|
||
manage_keys = clean_manage
|
||
try:
|
||
days = int(payload.get("expires_in_days", 30))
|
||
except Exception:
|
||
days = 30
|
||
result = generate_registration_code(keys=keys, manage_keys=manage_keys, expires_in_days=days, created_by=uid)
|
||
if not result:
|
||
return JsonResponse({"status": "error", "message": "生成失败"}, status=500)
|
||
return JsonResponse({"status": "success", "data": result})
|
||
|
||
@require_http_methods(["GET"])
|
||
def list_registration_codes_view(request):
|
||
if request.session.get("user_id") is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
uid = request.session.get("user_id")
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
if not is_admin:
|
||
me = get_user_by_id(uid) or {}
|
||
if (not (me.get("manage_key") or [])) and (int(me.get("can_manage_registration_codes") or 0) != 1):
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
data = list_registration_codes()
|
||
if not is_admin:
|
||
data = [it for it in (data or []) if str(it.get("created_by")) == str(uid)]
|
||
return JsonResponse({"status": "success", "data": data})
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def keys_for_filter_view(request):
|
||
uid = request.session.get("user_id")
|
||
if uid is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
try:
|
||
users = get_all_users() or []
|
||
except Exception:
|
||
users = []
|
||
|
||
def norm(v):
|
||
return str(v).strip().strip(";")
|
||
|
||
def add(out, seen, v):
|
||
s = norm(v)
|
||
if not s or s in seen:
|
||
return
|
||
seen.add(s)
|
||
out.append(s)
|
||
|
||
if is_admin:
|
||
out = []
|
||
seen = set()
|
||
for v in get_keys_list() or []:
|
||
add(out, seen, v)
|
||
for u in users:
|
||
for v in list(u.get("manage_key") or []) + list(u.get("key") or []):
|
||
add(out, seen, v)
|
||
return JsonResponse({"status": "success", "data": out})
|
||
|
||
me = get_user_by_id(uid) or {}
|
||
mgr_keys = {norm(x) for x in (me.get("manage_key") or []) if norm(x)}
|
||
visible_users = []
|
||
if mgr_keys:
|
||
for u in users:
|
||
try:
|
||
ukeys = {norm(x) for x in (u.get("key") or []) if norm(x)}
|
||
except Exception:
|
||
ukeys = set()
|
||
if ukeys & mgr_keys:
|
||
visible_users.append(u)
|
||
if str(me.get("user_id")) not in {str(u.get("user_id")) for u in visible_users}:
|
||
visible_users.append(me)
|
||
else:
|
||
visible_users = [me]
|
||
|
||
out = []
|
||
seen = set()
|
||
for v in list(me.get("manage_key") or []) + list(me.get("key") or []):
|
||
add(out, seen, v)
|
||
for u in visible_users:
|
||
for v in list(u.get("manage_key") or []) + list(u.get("key") or []):
|
||
add(out, seen, v)
|
||
return JsonResponse({"status": "success", "data": out})
|
||
|
||
|
||
def _extract_type_from_data(value):
|
||
s = value
|
||
if s is None:
|
||
return ""
|
||
if not isinstance(s, str):
|
||
try:
|
||
s = json.dumps(s, ensure_ascii=False)
|
||
except Exception:
|
||
s = str(s)
|
||
s = str(s)
|
||
try:
|
||
obj = json.loads(s)
|
||
if isinstance(obj, dict):
|
||
t = obj.get("数据类型")
|
||
if isinstance(t, str) and t.strip():
|
||
return t.strip()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
m = re.search(r'"数据类型"\s*:\s*"([^"]+)"', s)
|
||
if m:
|
||
return str(m.group(1)).strip()
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def types_for_filter_view(request):
|
||
uid = request.session.get("user_id")
|
||
if uid is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
try:
|
||
types = [str(t).strip() for t in (get_type_list() or []) if str(t).strip()]
|
||
except Exception:
|
||
types = []
|
||
seen = set()
|
||
out = []
|
||
for t in types:
|
||
if t in seen:
|
||
continue
|
||
seen.add(t)
|
||
out.append(t)
|
||
return JsonResponse({"status": "success", "data": out})
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def filter_view(request):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
|
||
key = (request.GET.get("key") or "").strip()
|
||
typ = (request.GET.get("type") or "").strip()
|
||
|
||
results = search_all()
|
||
results = _filter_results_for_user(request, results)
|
||
|
||
filtered = list(results or [])
|
||
|
||
if key:
|
||
selected = str(key).strip()
|
||
try:
|
||
users = get_all_users() or []
|
||
except Exception:
|
||
users = []
|
||
writer_keys_by_id = {}
|
||
for u in users:
|
||
try:
|
||
u_id = str(u.get("user_id", "")).strip()
|
||
except Exception:
|
||
u_id = ""
|
||
if not u_id:
|
||
continue
|
||
try:
|
||
u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()}
|
||
except Exception:
|
||
u_keys = set()
|
||
writer_keys_by_id[u_id] = u_keys
|
||
|
||
tmp = []
|
||
for r in filtered:
|
||
writer_id = str(r.get("writer_id", "")).strip()
|
||
writer_keys = writer_keys_by_id.get(writer_id)
|
||
if writer_keys and selected in writer_keys:
|
||
tmp.append(r)
|
||
continue
|
||
if selected and selected in str(r.get("data", "")):
|
||
tmp.append(r)
|
||
filtered = tmp
|
||
|
||
if typ:
|
||
tsel = str(typ).strip()
|
||
filtered = [r for r in filtered if _extract_type_from_data(r.get("data")) == tsel]
|
||
|
||
data = _attach_writer_names(_attach_image_urls(request, filtered))
|
||
return JsonResponse({"status": "success", "data": data})
|
||
|
||
|
||
def _parse_dt(value):
|
||
if not value:
|
||
return None
|
||
if hasattr(value, "isoformat"):
|
||
try:
|
||
dt = value
|
||
if getattr(dt, "tzinfo", None) is None:
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
return dt
|
||
except Exception:
|
||
pass
|
||
s = str(value).strip()
|
||
if not s:
|
||
return None
|
||
if s.endswith("Z"):
|
||
s = s[:-1] + "+00:00"
|
||
try:
|
||
dt = datetime.fromisoformat(s)
|
||
if getattr(dt, "tzinfo", None) is None:
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
return dt
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _floor_dt(dt, interval: str):
|
||
if not dt:
|
||
return None
|
||
iv = str(interval or "day").strip().lower()
|
||
if iv == "month":
|
||
return dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
||
if iv == "week":
|
||
base = dt.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
return base - timedelta(days=base.weekday())
|
||
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
|
||
|
||
def _dt_label(dt, interval: str):
|
||
if not dt:
|
||
return ""
|
||
iv = str(interval or "day").strip().lower()
|
||
if iv == "month":
|
||
return dt.strftime("%Y-%m")
|
||
return dt.strftime("%Y-%m-%d")
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def report_view(request):
|
||
try:
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
me = get_user_by_id(session_user_id) or {}
|
||
has_manage_key = bool(me.get("manage_key") or [])
|
||
if (not is_admin) and (not has_manage_key):
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
|
||
gte = (request.GET.get("from") or "").strip()
|
||
lte = (request.GET.get("to") or "").strip()
|
||
interval = (request.GET.get("interval") or "day").strip()
|
||
key = (request.GET.get("key") or "").strip()
|
||
typ = (request.GET.get("type") or "").strip()
|
||
|
||
gte_dt = _parse_dt(gte) if gte else None
|
||
lte_dt = _parse_dt(lte) if lte else None
|
||
|
||
results = search_all()
|
||
results = _filter_results_for_user(request, results)
|
||
filtered = list(results or [])
|
||
|
||
if key:
|
||
selected = str(key).strip()
|
||
try:
|
||
users = get_all_users() or []
|
||
except Exception:
|
||
users = []
|
||
writer_keys_by_id = {}
|
||
for u in users:
|
||
try:
|
||
u_id = str(u.get("user_id", "")).strip()
|
||
except Exception:
|
||
u_id = ""
|
||
if not u_id:
|
||
continue
|
||
try:
|
||
u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()}
|
||
except Exception:
|
||
u_keys = set()
|
||
writer_keys_by_id[u_id] = u_keys
|
||
|
||
tmp = []
|
||
for r in filtered:
|
||
writer_id = str(r.get("writer_id", "")).strip()
|
||
writer_keys = writer_keys_by_id.get(writer_id)
|
||
if writer_keys and selected in writer_keys:
|
||
tmp.append(r)
|
||
continue
|
||
if selected and selected in str(r.get("data", "")):
|
||
tmp.append(r)
|
||
filtered = tmp
|
||
|
||
if typ:
|
||
tsel = str(typ).strip()
|
||
filtered = [r for r in filtered if _extract_type_from_data(r.get("data")) == tsel]
|
||
|
||
ranged = []
|
||
for r in filtered:
|
||
t = _parse_dt(r.get("time"))
|
||
if (gte_dt or lte_dt) and (t is None):
|
||
continue
|
||
if gte_dt and t and t < gte_dt:
|
||
continue
|
||
if lte_dt and t and t > lte_dt:
|
||
continue
|
||
rr = dict(r)
|
||
rr["_time_dt"] = t
|
||
ranged.append(rr)
|
||
|
||
by_type = {}
|
||
by_bucket = {}
|
||
for r in ranged:
|
||
tval = _extract_type_from_data(r.get("data"))
|
||
if tval:
|
||
by_type[tval] = by_type.get(tval, 0) + 1
|
||
bucket_dt = _floor_dt(r.get("_time_dt"), interval)
|
||
if bucket_dt:
|
||
label = _dt_label(bucket_dt, interval)
|
||
if label:
|
||
by_bucket[label] = by_bucket.get(label, 0) + 1
|
||
|
||
by_type_arr = [{"type": k, "count": int(v)} for k, v in sorted(by_type.items(), key=lambda x: (-x[1], x[0]))]
|
||
by_time_arr = [{"bucket": k, "count": int(v)} for k, v in sorted(by_bucket.items(), key=lambda x: x[0])]
|
||
|
||
return JsonResponse(
|
||
{
|
||
"status": "success",
|
||
"data": {
|
||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||
"range": {"from": gte or "", "to": lte or ""},
|
||
"filters": {"key": key or "", "type": typ or "", "interval": interval or "day"},
|
||
"total": len(ranged),
|
||
"by_type": by_type_arr,
|
||
"by_time": by_time_arr,
|
||
},
|
||
}
|
||
)
|
||
except Exception as e:
|
||
return JsonResponse({"status": "error", "message": str(e) or "生成失败"}, status=500)
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def report_csv_view(request):
|
||
resp = report_view(request)
|
||
if getattr(resp, "status_code", 200) != 200:
|
||
return resp
|
||
try:
|
||
payload = json.loads(resp.content.decode("utf-8"))
|
||
except Exception:
|
||
return JsonResponse({"status": "error", "message": "生成失败"}, status=500)
|
||
data = (payload or {}).get("data") or {}
|
||
|
||
buf = io.StringIO()
|
||
w = csv.writer(buf)
|
||
w.writerow(["统计报表"])
|
||
w.writerow(["生成时间", data.get("generated_at", "")])
|
||
rng = data.get("range") or {}
|
||
w.writerow(["时间范围", f"{rng.get('from','')} ~ {rng.get('to','')}"])
|
||
flt = data.get("filters") or {}
|
||
w.writerow(["筛查Key", flt.get("key", "")])
|
||
w.writerow(["筛查类型", flt.get("type", "")])
|
||
w.writerow(["时间粒度", flt.get("interval", "")])
|
||
w.writerow([])
|
||
w.writerow(["总数", data.get("total", 0)])
|
||
w.writerow([])
|
||
w.writerow(["按成果类型统计"])
|
||
w.writerow(["类型", "数量"])
|
||
for it in list(data.get("by_type") or []):
|
||
w.writerow([it.get("type", ""), it.get("count", 0)])
|
||
w.writerow([])
|
||
w.writerow(["按时间统计"])
|
||
w.writerow(["时间", "数量"])
|
||
for it in list(data.get("by_time") or []):
|
||
w.writerow([it.get("bucket", ""), it.get("count", 0)])
|
||
|
||
content = buf.getvalue()
|
||
out = HttpResponse(content, content_type="text/csv; charset=utf-8")
|
||
out["Content-Disposition"] = 'attachment; filename="report.csv"'
|
||
return out
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def revoke_registration_code_view(request):
|
||
if request.session.get("user_id") is None:
|
||
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
|
||
uid = request.session.get("user_id")
|
||
is_admin = int(request.session.get("permission", 1)) == 0
|
||
if not is_admin:
|
||
me = get_user_by_id(uid) or {}
|
||
if (not (me.get("manage_key") or [])) and (int(me.get("can_manage_registration_codes") or 0) != 1):
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except Exception:
|
||
return JsonResponse({"status": "error", "message": "JSON无效"}, status=400)
|
||
code = (payload.get("code") or "").strip()
|
||
if not code:
|
||
return JsonResponse({"status": "error", "message": "缺少code"}, status=400)
|
||
if not is_admin:
|
||
info = get_registration_code(code)
|
||
if not info or str(info.get("created_by")) != str(uid):
|
||
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
|
||
ok = revoke_registration_code(code)
|
||
if not ok:
|
||
return JsonResponse({"status": "error", "message": "作废失败"}, status=500)
|
||
return JsonResponse({"status": "success"})
|