diff --git a/elastic/apps.py b/elastic/apps.py index 27c3ffc..d9cf102 100644 --- a/elastic/apps.py +++ b/elastic/apps.py @@ -17,9 +17,8 @@ class ElasticConfig(AppConfig): return # 延迟导入,避免循环导入或过早加载 - from .es_connect import create_index_with_mapping, start_daily_analytics_scheduler + from .es_connect import create_index_with_mapping try: create_index_with_mapping() - start_daily_analytics_scheduler() except Exception as e: print(f"❌ ES 初始化失败: {e}") \ No newline at end of file diff --git a/elastic/es_connect.py b/elastic/es_connect.py index c0b1f3d..908f31c 100644 --- a/elastic/es_connect.py +++ b/elastic/es_connect.py @@ -9,8 +9,8 @@ from .documents import AchievementDocument, UserDocument, GlobalDocument from .indexes import ACHIEVEMENT_INDEX_NAME, USER_INDEX_NAME, GLOBAL_INDEX_NAME import hashlib import time -from datetime import datetime, timezone, timedelta -import threading +from datetime import datetime, timezone +import json # 使用环境变量配置ES连接,默认为本机 _ES_URL = os.environ.get('ELASTICSEARCH_URL', 'http://localhost:9200') @@ -309,67 +309,193 @@ def search_by_any_field(keyword): print(f"模糊搜索失败: {str(e)}") return [] -ANALYTICS_CACHE = {"data": None, "ts": 0} - -def _compute_hist(range_gte: str, interval: str, fmt: str): - from elasticsearch_dsl import Search - s = AchievementDocument.search() - s = s.filter('range', time={'gte': range_gte, 'lte': 'now'}) - s = s.extra(size=0) - s.aggs.bucket('b', 'date_histogram', field='time', calendar_interval=interval, format=fmt, min_doc_count=0) - resp = s.execute() - buckets = getattr(resp.aggs, 'b').buckets - return [{"label": b.key_as_string, "count": b.doc_count} for b in buckets] - -def _compute_type_counts(range_gte: str, types: list): - counts = [] + +def _type_filters_from_list(limit: int = None): + try: + types = get_type_list() + except Exception: + types = ['软著', '专利', '奖状'] + if isinstance(limit, int) and limit > 0: + types = types[:limit] + filters = {} for t in types: - s = AchievementDocument.search() - s = s.filter('range', time={'gte': range_gte, 'lte': 'now'}) - s = s.query('match_phrase', data=str(t)) - total = s.count() - counts.append({"type": str(t), "count": int(total)}) - return counts + key = str(t) + # 精确匹配键与值之间的关系,避免其它字段中的同名值造成误匹配 + pattern = f'*"数据类型": "{key}"*' + filters[key] = {"wildcard": {"data.keyword": {"value": pattern}}} + return filters -def compute_analytics(): - types = get_type_list() - days = _compute_hist('now-10d/d', 'day', 'yyyy-MM-dd') - weeks = _compute_hist('now-10w/w', 'week', 'yyyy-ww') - months = _compute_hist('now-10M/M', 'month', 'yyyy-MM') - pie_1m = _compute_type_counts('now-1M/M', types) - pie_12m = _compute_type_counts('now-12M/M', types) - return { - "last_10_days": days[-10:], - "last_10_weeks": weeks[-10:], - "last_10_months": months[-10:], - "type_pie_1m": pie_1m, - "type_pie_12m": pie_12m, - } +def analytics_trend(gte: str = None, lte: str = None, interval: str = "day"): + try: + search = AchievementDocument.search() + body = { + "size": 0, + "aggs": { + "trend": { + "date_histogram": { + "field": "time", + "calendar_interval": interval, + "min_doc_count": 0 + } + } + } + } + if gte or lte: + rng = {} + if gte: + rng["gte"] = gte + if lte: + rng["lte"] = lte + body["query"] = {"range": {"time": rng}} + search = search.update_from_dict(body) + resp = search.execute() + buckets = resp.aggregations.trend.buckets if hasattr(resp, 'aggregations') else [] + return [{"key_as_string": b.key_as_string, "key": b.key, "doc_count": b.doc_count} for b in buckets] + except Exception as e: + print(f"分析趋势失败: {str(e)}") + return [] -def get_analytics_overview(force: bool = False): - now_ts = time.time() - if force or ANALYTICS_CACHE["data"] is None or (now_ts - ANALYTICS_CACHE["ts"]) > 3600: - ANALYTICS_CACHE["data"] = compute_analytics() - ANALYTICS_CACHE["ts"] = now_ts - return ANALYTICS_CACHE["data"] +def analytics_types(gte: str = None, lte: str = None, size: int = 10): + try: + filters = _type_filters_from_list(limit=size) + body = { + "size": 0, + "aggs": { + "by_type": { + "filters": { + "filters": filters + } + } + } + } + if gte or lte: + rng = {} + if gte: + rng["gte"] = gte + if lte: + rng["lte"] = lte + body["query"] = {"range": {"time": rng}} + resp = es.search(index=DATA_INDEX_NAME, body=body) + buckets = resp.get("aggregations", {}).get("by_type", {}).get("buckets", {}) + out = [] + for k, v in buckets.items(): + try: + out.append({"key": k, "doc_count": int(v.get("doc_count", 0))}) + except Exception: + out.append({"key": str(k), "doc_count": 0}) + return out + except Exception as e: + print(f"分析类型占比失败: {str(e)}") + return [] -def _seconds_until_hour(h: int): - now = datetime.now() - tgt = now.replace(hour=h, minute=0, second=0, microsecond=0) - if tgt <= now: - tgt = tgt + timedelta(days=1) - return max(0, int((tgt - now).total_seconds())) +def analytics_types_trend(gte: str = None, lte: str = None, interval: str = "week", size: int = 8): + try: + filters = _type_filters_from_list(limit=size) + body = { + "size": 0, + "aggs": { + "by_interval": { + "date_histogram": { + "field": "time", + "calendar_interval": interval, + "min_doc_count": 0 + }, + "aggs": { + "by_type": { + "filters": {"filters": filters} + } + } + } + } + } + if gte or lte: + rng = {} + if gte: + rng["gte"] = gte + if lte: + rng["lte"] = lte + body["query"] = {"range": {"time": rng}} + resp = es.search(index=DATA_INDEX_NAME, body=body) + by_interval = resp.get("aggregations", {}).get("by_interval", {}).get("buckets", []) + out = [] + for ib in by_interval: + t_buckets = ib.get("by_type", {}).get("buckets", {}) + types_arr = [] + for k, v in t_buckets.items(): + types_arr.append({"key": k, "doc_count": int(v.get("doc_count", 0))}) + out.append({ + "key_as_string": ib.get("key_as_string"), + "key": ib.get("key"), + "doc_count": ib.get("doc_count", 0), + "types": types_arr + }) + return out + except Exception as e: + print(f"分析类型变化失败: {str(e)}") + return [] -def start_daily_analytics_scheduler(): - def _run_and_reschedule(): - try: - get_analytics_overview(force=True) - except Exception as e: - print(f"分析任务失败: {e}") - finally: - threading.Timer(24 * 3600, _run_and_reschedule).start() - delay = _seconds_until_hour(3) - threading.Timer(delay, _run_and_reschedule).start() +def analytics_recent(limit: int = 10, gte: str = None, lte: str = None): + try: + def _extract_type(s: str): + if not s: + return "" + try: + obj = json.loads(s) + if isinstance(obj, dict): + v = obj.get("数据类型") + if isinstance(v, str) and v: + return v + except Exception: + pass + try: + m = re.search(r'"数据类型"\s*:\s*"([^"]+)"', s) + if m: + return m.group(1) + except Exception: + pass + return "" + + search = AchievementDocument.search() + body = { + "size": max(1, min(limit, 100)), + "sort": [{"time": {"order": "desc"}}] + } + if gte or lte: + rng = {} + if gte: + rng["gte"] = gte + if lte: + rng["lte"] = lte + body["query"] = {"range": {"time": rng}} + search = search.update_from_dict(body) + resp = search.execute() + results = [] + for hit in resp: + w = getattr(hit, 'writer_id', '') + uname = None + try: + uname_lookup = get_user_by_id(w) + uname = (uname_lookup or {}).get("username") + except Exception: + uname = None + if not uname: + try: + uname_lookup = get_user_by_id(int(w)) + uname = (uname_lookup or {}).get("username") + except Exception: + uname = None + tval = _extract_type(getattr(hit, 'data', '')) + results.append({ + "_id": hit.meta.id, + "writer_id": w, + "username": uname or "", + "type": tval or "", + "time": getattr(hit, 'time', None) + }) + return results + except Exception as e: + print(f"获取最近活动失败: {str(e)}") + return [] def write_user_data(user_data): """ diff --git a/elastic/indexes.py b/elastic/indexes.py index 2bd3a5a..6c75be1 100644 --- a/elastic/indexes.py +++ b/elastic/indexes.py @@ -1,5 +1,5 @@ -INDEX_NAME = "wordsearch266666" +INDEX_NAME = "wordsearch2666661" USER_NAME = "users11111" ACHIEVEMENT_INDEX_NAME = INDEX_NAME USER_INDEX_NAME = USER_NAME -GLOBAL_INDEX_NAME = "global11111" +GLOBAL_INDEX_NAME = "global11111111211" diff --git a/elastic/urls.py b/elastic/urls.py index 785a437..dc2690b 100644 --- a/elastic/urls.py +++ b/elastic/urls.py @@ -17,7 +17,6 @@ urlpatterns = [ path('search/', views.search, name='search'), path('fuzzy-search/', views.fuzzy_search, name='fuzzy_search'), path('all-data/', views.get_all_data, name='get_all_data'), - path('analytics/overview/', views.analytics_overview, name='analytics_overview'), # 用户管理 path('users/', views.get_users, name='get_users'), @@ -33,4 +32,10 @@ urlpatterns = [ # 管理页面 path('manage/', views.manage_page, name='manage_page'), path('user_manage/', views.user_manage, name='user_manage'), + + # 分析接口 + path('analytics/trend/', views.analytics_trend_view, name='analytics_trend'), + path('analytics/types/', views.analytics_types_view, name='analytics_types'), + path('analytics/types_trend/', views.analytics_types_trend_view, name='analytics_types_trend'), + path('analytics/recent/', views.analytics_recent_view, name='analytics_recent'), ] diff --git a/elastic/views.py b/elastic/views.py index 940d00a..66b7872 100644 --- a/elastic/views.py +++ b/elastic/views.py @@ -14,6 +14,12 @@ from django.views.decorators.csrf import ensure_csrf_cookie from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie, csrf_protect from .es_connect import * from .es_connect import update_user_by_id as es_update_user_by_id, delete_user_by_id as es_delete_user_by_id +from .es_connect import ( + analytics_trend as es_analytics_trend, + analytics_types as es_analytics_types, + analytics_types_trend as es_analytics_types_trend, + analytics_recent as es_analytics_recent, +) from PIL import Image @@ -80,16 +86,6 @@ def get_all_data(request): except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) -@require_http_methods(["GET"]) -def analytics_overview(request): - try: - force = request.GET.get("force") == "1" - data = get_analytics_overview(force=force) - return JsonResponse({"status": "success", "data": data}) - except Exception as e: - return JsonResponse({"status": "error", "message": str(e)}, status=500) - - @require_http_methods(["DELETE"]) @csrf_exempt def delete_data(request, doc_id): @@ -159,7 +155,11 @@ def update_data(request, doc_id): if isinstance(v, dict): updated["data"] = json.dumps(v, ensure_ascii=False) else: - updated["data"] = str(v) + try: + obj = json.loads(str(v)) + updated["data"] = json.dumps(obj, ensure_ascii=False) + except Exception: + updated["data"] = str(v) success = update_by_id(doc_id, updated) if success: @@ -511,6 +511,67 @@ def manage_page(request): context = {"items": results, "user_id": user_id_qs or session_user_id} return render(request, "elastic/manage.html", context) + +@require_http_methods(["GET"]) +def analytics_trend_view(request): + try: + gte = request.GET.get("from") + lte = request.GET.get("to") + interval = request.GET.get("interval", "day") + data = es_analytics_trend(gte=gte, lte=lte, interval=interval) + return JsonResponse({"status": "success", "data": data}) + except Exception as e: + return JsonResponse({"status": "error", "message": str(e)}, status=500) + + +@require_http_methods(["GET"]) +def analytics_types_view(request): + try: + gte = request.GET.get("from") + lte = request.GET.get("to") + size = request.GET.get("size") + try: + size_int = int(size) if size is not None else 10 + except Exception: + size_int = 10 + data = es_analytics_types(gte=gte, lte=lte, size=size_int) + return JsonResponse({"status": "success", "data": data}) + except Exception as e: + return JsonResponse({"status": "error", "message": str(e)}, status=500) + + +@require_http_methods(["GET"]) +def analytics_types_trend_view(request): + try: + gte = request.GET.get("from") + lte = request.GET.get("to") + interval = request.GET.get("interval", "week") + size = request.GET.get("size") + try: + size_int = int(size) if size is not None else 8 + except Exception: + size_int = 8 + data = es_analytics_types_trend(gte=gte, lte=lte, interval=interval, size=size_int) + return JsonResponse({"status": "success", "data": data}) + except Exception as e: + return JsonResponse({"status": "error", "message": str(e)}, status=500) + + +@require_http_methods(["GET"]) +def analytics_recent_view(request): + try: + limit = request.GET.get("limit") + gte = request.GET.get("from") + lte = request.GET.get("to") + try: + limit_int = int(limit) if limit is not None else 10 + except Exception: + limit_int = 10 + data = es_analytics_recent(limit=limit_int, gte=gte, lte=lte) + return JsonResponse({"status": "success", "data": data}) + except Exception as e: + return JsonResponse({"status": "error", "message": str(e)}, status=500) + @require_http_methods(["GET"]) @ensure_csrf_cookie def user_manage(request): diff --git a/main/templates/main/home.html b/main/templates/main/home.html index 79d3a5c..eb7c949 100644 --- a/main/templates/main/home.html +++ b/main/templates/main/home.html @@ -3,6 +3,7 @@ 数据管理系统 +