数据管理添加时间的显示和统计报表的功能[0.2.7.6][ci]
All checks were successful
CI / docker-ci (push) Successful in 35s

This commit is contained in:
DSQ
2026-03-19 16:38:16 +08:00
parent 5e38ebf856
commit fe7f08ed1c
4 changed files with 476 additions and 10 deletions

View File

@@ -6,10 +6,12 @@ import re
import uuid
import base64
import json
import csv
import io
import tempfile
import concurrent.futures
from django.conf import settings
from django.http import JsonResponse
from django.http import JsonResponse, HttpResponse
from django.shortcuts import render
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import ensure_csrf_cookie
@@ -953,6 +955,7 @@ def manage_page(request):
me = get_user_by_id(session_user_id) or {}
is_admin = int(request.session.get("permission", 1)) == 0
has_manage_key = bool(me.get("manage_key") or [])
if is_admin:
raw_results = search_all()
else:
@@ -980,6 +983,7 @@ def manage_page(request):
"elastic/manage.html",
{
"is_admin": is_admin,
"has_manage_key": has_manage_key,
"user_id": session_user_id,
"username": me.get("username"),
},
@@ -1428,6 +1432,293 @@ def keys_for_filter_view(request):
add(out, seen, v)
return JsonResponse({"status": "success", "data": out})
def _extract_type_from_data(value):
s = value
if s is None:
return ""
if not isinstance(s, str):
try:
s = json.dumps(s, ensure_ascii=False)
except Exception:
s = str(s)
s = str(s)
try:
obj = json.loads(s)
if isinstance(obj, dict):
t = obj.get("数据类型")
if isinstance(t, str) and t.strip():
return t.strip()
except Exception:
pass
try:
m = re.search(r'"数据类型"\s*:\s*"([^"]+)"', s)
if m:
return str(m.group(1)).strip()
except Exception:
pass
return ""
@require_http_methods(["GET"])
def types_for_filter_view(request):
uid = request.session.get("user_id")
if uid is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
try:
types = [str(t).strip() for t in (get_type_list() or []) if str(t).strip()]
except Exception:
types = []
seen = set()
out = []
for t in types:
if t in seen:
continue
seen.add(t)
out.append(t)
return JsonResponse({"status": "success", "data": out})
@require_http_methods(["GET"])
def filter_view(request):
session_user_id = request.session.get("user_id")
if session_user_id is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
key = (request.GET.get("key") or "").strip()
typ = (request.GET.get("type") or "").strip()
results = search_all()
results = _filter_results_for_user(request, results)
filtered = list(results or [])
if key:
selected = str(key).strip()
try:
users = get_all_users() or []
except Exception:
users = []
writer_keys_by_id = {}
for u in users:
try:
u_id = str(u.get("user_id", "")).strip()
except Exception:
u_id = ""
if not u_id:
continue
try:
u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()}
except Exception:
u_keys = set()
writer_keys_by_id[u_id] = u_keys
tmp = []
for r in filtered:
writer_id = str(r.get("writer_id", "")).strip()
writer_keys = writer_keys_by_id.get(writer_id)
if writer_keys and selected in writer_keys:
tmp.append(r)
continue
if selected and selected in str(r.get("data", "")):
tmp.append(r)
filtered = tmp
if typ:
tsel = str(typ).strip()
filtered = [r for r in filtered if _extract_type_from_data(r.get("data")) == tsel]
data = _attach_writer_names(_attach_image_urls(request, filtered))
return JsonResponse({"status": "success", "data": data})
def _parse_dt(value):
if not value:
return None
if hasattr(value, "isoformat"):
try:
dt = value
if getattr(dt, "tzinfo", None) is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception:
pass
s = str(value).strip()
if not s:
return None
if s.endswith("Z"):
s = s[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(s)
if getattr(dt, "tzinfo", None) is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception:
return None
def _floor_dt(dt, interval: str):
if not dt:
return None
iv = str(interval or "day").strip().lower()
if iv == "month":
return dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
if iv == "week":
base = dt.replace(hour=0, minute=0, second=0, microsecond=0)
return base - timedelta(days=base.weekday())
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
def _dt_label(dt, interval: str):
if not dt:
return ""
iv = str(interval or "day").strip().lower()
if iv == "month":
return dt.strftime("%Y-%m")
return dt.strftime("%Y-%m-%d")
@require_http_methods(["GET"])
def report_view(request):
session_user_id = request.session.get("user_id")
if session_user_id is None:
return JsonResponse({"status": "error", "message": "未登录"}, status=401)
is_admin = int(request.session.get("permission", 1)) == 0
me = get_user_by_id(session_user_id) or {}
has_manage_key = bool(me.get("manage_key") or [])
if (not is_admin) and (not has_manage_key):
return JsonResponse({"status": "error", "message": "无权限"}, status=403)
gte = (request.GET.get("from") or "").strip()
lte = (request.GET.get("to") or "").strip()
interval = (request.GET.get("interval") or "day").strip()
key = (request.GET.get("key") or "").strip()
typ = (request.GET.get("type") or "").strip()
gte_dt = _parse_dt(gte) if gte else None
lte_dt = _parse_dt(lte) if lte else None
results = search_all()
results = _filter_results_for_user(request, results)
filtered = list(results or [])
if key:
selected = str(key).strip()
try:
users = get_all_users() or []
except Exception:
users = []
writer_keys_by_id = {}
for u in users:
try:
u_id = str(u.get("user_id", "")).strip()
except Exception:
u_id = ""
if not u_id:
continue
try:
u_keys = {str(k).strip() for k in (u.get("key") or []) if str(k).strip()}
except Exception:
u_keys = set()
writer_keys_by_id[u_id] = u_keys
tmp = []
for r in filtered:
writer_id = str(r.get("writer_id", "")).strip()
writer_keys = writer_keys_by_id.get(writer_id)
if writer_keys and selected in writer_keys:
tmp.append(r)
continue
if selected and selected in str(r.get("data", "")):
tmp.append(r)
filtered = tmp
if typ:
tsel = str(typ).strip()
filtered = [r for r in filtered if _extract_type_from_data(r.get("data")) == tsel]
ranged = []
for r in filtered:
t = _parse_dt(r.get("time"))
if (gte_dt or lte_dt) and (t is None):
continue
if gte_dt and t and t < gte_dt:
continue
if lte_dt and t and t > lte_dt:
continue
rr = dict(r)
rr["_time_dt"] = t
ranged.append(rr)
by_type = {}
by_bucket = {}
for r in ranged:
tval = _extract_type_from_data(r.get("data"))
if tval:
by_type[tval] = by_type.get(tval, 0) + 1
bucket_dt = _floor_dt(r.get("_time_dt"), interval)
if bucket_dt:
label = _dt_label(bucket_dt, interval)
if label:
by_bucket[label] = by_bucket.get(label, 0) + 1
by_type_arr = [{"type": k, "count": int(v)} for k, v in sorted(by_type.items(), key=lambda x: (-x[1], x[0]))]
by_time_arr = [{"bucket": k, "count": int(v)} for k, v in sorted(by_bucket.items(), key=lambda x: x[0])]
return JsonResponse(
{
"status": "success",
"data": {
"generated_at": datetime.now(timezone.utc).isoformat(),
"range": {"from": gte or "", "to": lte or ""},
"filters": {"key": key or "", "type": typ or "", "interval": interval or "day"},
"total": len(ranged),
"by_type": by_type_arr,
"by_time": by_time_arr,
},
}
)
@require_http_methods(["GET"])
def report_csv_view(request):
resp = report_view(request)
if getattr(resp, "status_code", 200) != 200:
return resp
try:
payload = json.loads(resp.content.decode("utf-8"))
except Exception:
return JsonResponse({"status": "error", "message": "生成失败"}, status=500)
data = (payload or {}).get("data") or {}
buf = io.StringIO()
w = csv.writer(buf)
w.writerow(["统计报表"])
w.writerow(["生成时间", data.get("generated_at", "")])
rng = data.get("range") or {}
w.writerow(["时间范围", f"{rng.get('from','')} ~ {rng.get('to','')}"])
flt = data.get("filters") or {}
w.writerow(["筛查Key", flt.get("key", "")])
w.writerow(["筛查类型", flt.get("type", "")])
w.writerow(["时间粒度", flt.get("interval", "")])
w.writerow([])
w.writerow(["总数", data.get("total", 0)])
w.writerow([])
w.writerow(["按成果类型统计"])
w.writerow(["类型", "数量"])
for it in list(data.get("by_type") or []):
w.writerow([it.get("type", ""), it.get("count", 0)])
w.writerow([])
w.writerow(["按时间统计"])
w.writerow(["时间", "数量"])
for it in list(data.get("by_time") or []):
w.writerow([it.get("bucket", ""), it.get("count", 0)])
content = buf.getvalue()
out = HttpResponse(content, content_type="text/csv; charset=utf-8")
out["Content-Disposition"] = 'attachment; filename="report.csv"'
return out
@require_http_methods(["POST"])
@csrf_protect
def revoke_registration_code_view(request):