爆改了数据可视化

This commit is contained in:
2025-11-17 01:07:52 +08:00
parent ec7bc64bfa
commit cb28d45cd1
6 changed files with 369 additions and 175 deletions

View File

@@ -17,9 +17,8 @@ class ElasticConfig(AppConfig):
return
# 延迟导入,避免循环导入或过早加载
from .es_connect import create_index_with_mapping, start_daily_analytics_scheduler
from .es_connect import create_index_with_mapping
try:
create_index_with_mapping()
start_daily_analytics_scheduler()
except Exception as e:
print(f"❌ ES 初始化失败: {e}")

View File

@@ -9,8 +9,8 @@ from .documents import AchievementDocument, UserDocument, GlobalDocument
from .indexes import ACHIEVEMENT_INDEX_NAME, USER_INDEX_NAME, GLOBAL_INDEX_NAME
import hashlib
import time
from datetime import datetime, timezone, timedelta
import threading
from datetime import datetime, timezone
import json
# 使用环境变量配置ES连接默认为本机
_ES_URL = os.environ.get('ELASTICSEARCH_URL', 'http://localhost:9200')
@@ -309,67 +309,193 @@ def search_by_any_field(keyword):
print(f"模糊搜索失败: {str(e)}")
return []
ANALYTICS_CACHE = {"data": None, "ts": 0}
def _compute_hist(range_gte: str, interval: str, fmt: str):
from elasticsearch_dsl import Search
s = AchievementDocument.search()
s = s.filter('range', time={'gte': range_gte, 'lte': 'now'})
s = s.extra(size=0)
s.aggs.bucket('b', 'date_histogram', field='time', calendar_interval=interval, format=fmt, min_doc_count=0)
resp = s.execute()
buckets = getattr(resp.aggs, 'b').buckets
return [{"label": b.key_as_string, "count": b.doc_count} for b in buckets]
def _compute_type_counts(range_gte: str, types: list):
counts = []
def _type_filters_from_list(limit: int = None):
try:
types = get_type_list()
except Exception:
types = ['软著', '专利', '奖状']
if isinstance(limit, int) and limit > 0:
types = types[:limit]
filters = {}
for t in types:
s = AchievementDocument.search()
s = s.filter('range', time={'gte': range_gte, 'lte': 'now'})
s = s.query('match_phrase', data=str(t))
total = s.count()
counts.append({"type": str(t), "count": int(total)})
return counts
key = str(t)
# 精确匹配键与值之间的关系,避免其它字段中的同名值造成误匹配
pattern = f'*"数据类型": "{key}"*'
filters[key] = {"wildcard": {"data.keyword": {"value": pattern}}}
return filters
def compute_analytics():
types = get_type_list()
days = _compute_hist('now-10d/d', 'day', 'yyyy-MM-dd')
weeks = _compute_hist('now-10w/w', 'week', 'yyyy-ww')
months = _compute_hist('now-10M/M', 'month', 'yyyy-MM')
pie_1m = _compute_type_counts('now-1M/M', types)
pie_12m = _compute_type_counts('now-12M/M', types)
return {
"last_10_days": days[-10:],
"last_10_weeks": weeks[-10:],
"last_10_months": months[-10:],
"type_pie_1m": pie_1m,
"type_pie_12m": pie_12m,
}
def analytics_trend(gte: str = None, lte: str = None, interval: str = "day"):
try:
search = AchievementDocument.search()
body = {
"size": 0,
"aggs": {
"trend": {
"date_histogram": {
"field": "time",
"calendar_interval": interval,
"min_doc_count": 0
}
}
}
}
if gte or lte:
rng = {}
if gte:
rng["gte"] = gte
if lte:
rng["lte"] = lte
body["query"] = {"range": {"time": rng}}
search = search.update_from_dict(body)
resp = search.execute()
buckets = resp.aggregations.trend.buckets if hasattr(resp, 'aggregations') else []
return [{"key_as_string": b.key_as_string, "key": b.key, "doc_count": b.doc_count} for b in buckets]
except Exception as e:
print(f"分析趋势失败: {str(e)}")
return []
def get_analytics_overview(force: bool = False):
now_ts = time.time()
if force or ANALYTICS_CACHE["data"] is None or (now_ts - ANALYTICS_CACHE["ts"]) > 3600:
ANALYTICS_CACHE["data"] = compute_analytics()
ANALYTICS_CACHE["ts"] = now_ts
return ANALYTICS_CACHE["data"]
def analytics_types(gte: str = None, lte: str = None, size: int = 10):
try:
filters = _type_filters_from_list(limit=size)
body = {
"size": 0,
"aggs": {
"by_type": {
"filters": {
"filters": filters
}
}
}
}
if gte or lte:
rng = {}
if gte:
rng["gte"] = gte
if lte:
rng["lte"] = lte
body["query"] = {"range": {"time": rng}}
resp = es.search(index=DATA_INDEX_NAME, body=body)
buckets = resp.get("aggregations", {}).get("by_type", {}).get("buckets", {})
out = []
for k, v in buckets.items():
try:
out.append({"key": k, "doc_count": int(v.get("doc_count", 0))})
except Exception:
out.append({"key": str(k), "doc_count": 0})
return out
except Exception as e:
print(f"分析类型占比失败: {str(e)}")
return []
def _seconds_until_hour(h: int):
now = datetime.now()
tgt = now.replace(hour=h, minute=0, second=0, microsecond=0)
if tgt <= now:
tgt = tgt + timedelta(days=1)
return max(0, int((tgt - now).total_seconds()))
def analytics_types_trend(gte: str = None, lte: str = None, interval: str = "week", size: int = 8):
try:
filters = _type_filters_from_list(limit=size)
body = {
"size": 0,
"aggs": {
"by_interval": {
"date_histogram": {
"field": "time",
"calendar_interval": interval,
"min_doc_count": 0
},
"aggs": {
"by_type": {
"filters": {"filters": filters}
}
}
}
}
}
if gte or lte:
rng = {}
if gte:
rng["gte"] = gte
if lte:
rng["lte"] = lte
body["query"] = {"range": {"time": rng}}
resp = es.search(index=DATA_INDEX_NAME, body=body)
by_interval = resp.get("aggregations", {}).get("by_interval", {}).get("buckets", [])
out = []
for ib in by_interval:
t_buckets = ib.get("by_type", {}).get("buckets", {})
types_arr = []
for k, v in t_buckets.items():
types_arr.append({"key": k, "doc_count": int(v.get("doc_count", 0))})
out.append({
"key_as_string": ib.get("key_as_string"),
"key": ib.get("key"),
"doc_count": ib.get("doc_count", 0),
"types": types_arr
})
return out
except Exception as e:
print(f"分析类型变化失败: {str(e)}")
return []
def start_daily_analytics_scheduler():
def _run_and_reschedule():
try:
get_analytics_overview(force=True)
except Exception as e:
print(f"分析任务失败: {e}")
finally:
threading.Timer(24 * 3600, _run_and_reschedule).start()
delay = _seconds_until_hour(3)
threading.Timer(delay, _run_and_reschedule).start()
def analytics_recent(limit: int = 10, gte: str = None, lte: str = None):
try:
def _extract_type(s: str):
if not s:
return ""
try:
obj = json.loads(s)
if isinstance(obj, dict):
v = obj.get("数据类型")
if isinstance(v, str) and v:
return v
except Exception:
pass
try:
m = re.search(r'"数据类型"\s*:\s*"([^"]+)"', s)
if m:
return m.group(1)
except Exception:
pass
return ""
search = AchievementDocument.search()
body = {
"size": max(1, min(limit, 100)),
"sort": [{"time": {"order": "desc"}}]
}
if gte or lte:
rng = {}
if gte:
rng["gte"] = gte
if lte:
rng["lte"] = lte
body["query"] = {"range": {"time": rng}}
search = search.update_from_dict(body)
resp = search.execute()
results = []
for hit in resp:
w = getattr(hit, 'writer_id', '')
uname = None
try:
uname_lookup = get_user_by_id(w)
uname = (uname_lookup or {}).get("username")
except Exception:
uname = None
if not uname:
try:
uname_lookup = get_user_by_id(int(w))
uname = (uname_lookup or {}).get("username")
except Exception:
uname = None
tval = _extract_type(getattr(hit, 'data', ''))
results.append({
"_id": hit.meta.id,
"writer_id": w,
"username": uname or "",
"type": tval or "",
"time": getattr(hit, 'time', None)
})
return results
except Exception as e:
print(f"获取最近活动失败: {str(e)}")
return []
def write_user_data(user_data):
"""

View File

@@ -1,5 +1,5 @@
INDEX_NAME = "wordsearch266666"
INDEX_NAME = "wordsearch2666661"
USER_NAME = "users11111"
ACHIEVEMENT_INDEX_NAME = INDEX_NAME
USER_INDEX_NAME = USER_NAME
GLOBAL_INDEX_NAME = "global11111"
GLOBAL_INDEX_NAME = "global11111111211"

View File

@@ -17,7 +17,6 @@ urlpatterns = [
path('search/', views.search, name='search'),
path('fuzzy-search/', views.fuzzy_search, name='fuzzy_search'),
path('all-data/', views.get_all_data, name='get_all_data'),
path('analytics/overview/', views.analytics_overview, name='analytics_overview'),
# 用户管理
path('users/', views.get_users, name='get_users'),
@@ -33,4 +32,10 @@ urlpatterns = [
# 管理页面
path('manage/', views.manage_page, name='manage_page'),
path('user_manage/', views.user_manage, name='user_manage'),
# 分析接口
path('analytics/trend/', views.analytics_trend_view, name='analytics_trend'),
path('analytics/types/', views.analytics_types_view, name='analytics_types'),
path('analytics/types_trend/', views.analytics_types_trend_view, name='analytics_types_trend'),
path('analytics/recent/', views.analytics_recent_view, name='analytics_recent'),
]

View File

@@ -14,6 +14,12 @@ from django.views.decorators.csrf import ensure_csrf_cookie
from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie, csrf_protect
from .es_connect import *
from .es_connect import update_user_by_id as es_update_user_by_id, delete_user_by_id as es_delete_user_by_id
from .es_connect import (
analytics_trend as es_analytics_trend,
analytics_types as es_analytics_types,
analytics_types_trend as es_analytics_types_trend,
analytics_recent as es_analytics_recent,
)
from PIL import Image
@@ -80,16 +86,6 @@ def get_all_data(request):
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def analytics_overview(request):
try:
force = request.GET.get("force") == "1"
data = get_analytics_overview(force=force)
return JsonResponse({"status": "success", "data": data})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["DELETE"])
@csrf_exempt
def delete_data(request, doc_id):
@@ -159,7 +155,11 @@ def update_data(request, doc_id):
if isinstance(v, dict):
updated["data"] = json.dumps(v, ensure_ascii=False)
else:
updated["data"] = str(v)
try:
obj = json.loads(str(v))
updated["data"] = json.dumps(obj, ensure_ascii=False)
except Exception:
updated["data"] = str(v)
success = update_by_id(doc_id, updated)
if success:
@@ -511,6 +511,67 @@ def manage_page(request):
context = {"items": results, "user_id": user_id_qs or session_user_id}
return render(request, "elastic/manage.html", context)
@require_http_methods(["GET"])
def analytics_trend_view(request):
try:
gte = request.GET.get("from")
lte = request.GET.get("to")
interval = request.GET.get("interval", "day")
data = es_analytics_trend(gte=gte, lte=lte, interval=interval)
return JsonResponse({"status": "success", "data": data})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def analytics_types_view(request):
try:
gte = request.GET.get("from")
lte = request.GET.get("to")
size = request.GET.get("size")
try:
size_int = int(size) if size is not None else 10
except Exception:
size_int = 10
data = es_analytics_types(gte=gte, lte=lte, size=size_int)
return JsonResponse({"status": "success", "data": data})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def analytics_types_trend_view(request):
try:
gte = request.GET.get("from")
lte = request.GET.get("to")
interval = request.GET.get("interval", "week")
size = request.GET.get("size")
try:
size_int = int(size) if size is not None else 8
except Exception:
size_int = 8
data = es_analytics_types_trend(gte=gte, lte=lte, interval=interval, size=size_int)
return JsonResponse({"status": "success", "data": data})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
def analytics_recent_view(request):
try:
limit = request.GET.get("limit")
gte = request.GET.get("from")
lte = request.GET.get("to")
try:
limit_int = int(limit) if limit is not None else 10
except Exception:
limit_int = 10
data = es_analytics_recent(limit=limit_int, gte=gte, lte=lte)
return JsonResponse({"status": "success", "data": data})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)}, status=500)
@require_http_methods(["GET"])
@ensure_csrf_cookie
def user_manage(request):