diff --git a/elastic/apps.py b/elastic/apps.py index b9cce81..27c3ffc 100644 --- a/elastic/apps.py +++ b/elastic/apps.py @@ -17,10 +17,9 @@ class ElasticConfig(AppConfig): return # 延迟导入,避免循环导入或过早加载 - from .es_connect import create_index_with_mapping, get_type_list + from .es_connect import create_index_with_mapping, start_daily_analytics_scheduler try: create_index_with_mapping() - types = get_type_list() - print(f"🔎 启动时 type_list: {types}") + start_daily_analytics_scheduler() except Exception as e: print(f"❌ ES 初始化失败: {e}") \ No newline at end of file diff --git a/elastic/documents.py b/elastic/documents.py index 111bf49..43d1577 100644 --- a/elastic/documents.py +++ b/elastic/documents.py @@ -15,6 +15,8 @@ GLOBAL_INDEX.settings(number_of_shards=1, number_of_replicas=0) class AchievementDocument(Document): """获奖数据文档映射""" writer_id = fields.TextField(fields={'keyword': {'type': 'keyword'}}) + time = fields.DateField() + data = fields.TextField( analyzer='ik_max_word', search_analyzer='ik_smart', diff --git a/elastic/es_connect.py b/elastic/es_connect.py index 47afdf5..2b7dacc 100644 --- a/elastic/es_connect.py +++ b/elastic/es_connect.py @@ -8,6 +8,8 @@ from .documents import AchievementDocument, UserDocument, GlobalDocument from .indexes import ACHIEVEMENT_INDEX_NAME, USER_INDEX_NAME, GLOBAL_INDEX_NAME import hashlib import time +from datetime import datetime, timezone, timedelta +import threading # 使用Django的ES连接配置 connections.create_connection(hosts=['localhost:9200']) @@ -132,11 +134,11 @@ def insert_data(data): bool: 插入成功返回True,失败返回False """ try: - # 使用Django-elasticsearch-dsl的方式插入数据 achievement = AchievementDocument( writer_id=data.get('writer_id', ''), data=data.get('data', ''), - image=data.get('image', '') + image=data.get('image', ''), + time=datetime.now(timezone.utc).isoformat() ) achievement.save() print(f"文档写入成功,内容: {data}") @@ -303,6 +305,68 @@ def search_by_any_field(keyword): print(f"模糊搜索失败: {str(e)}") return [] +ANALYTICS_CACHE = {"data": None, "ts": 0} + +def _compute_hist(range_gte: str, interval: str, fmt: str): + from elasticsearch_dsl import Search + s = AchievementDocument.search() + s = s.filter('range', time={'gte': range_gte, 'lte': 'now'}) + s = s.extra(size=0) + s.aggs.bucket('b', 'date_histogram', field='time', calendar_interval=interval, format=fmt, min_doc_count=0) + resp = s.execute() + buckets = getattr(resp.aggs, 'b').buckets + return [{"label": b.key_as_string, "count": b.doc_count} for b in buckets] + +def _compute_type_counts(range_gte: str, types: list): + counts = [] + for t in types: + s = AchievementDocument.search() + s = s.filter('range', time={'gte': range_gte, 'lte': 'now'}) + s = s.query('match_phrase', data=str(t)) + total = s.count() + counts.append({"type": str(t), "count": int(total)}) + return counts + +def compute_analytics(): + types = get_type_list() + days = _compute_hist('now-10d/d', 'day', 'yyyy-MM-dd') + weeks = _compute_hist('now-10w/w', 'week', 'yyyy-ww') + months = _compute_hist('now-10M/M', 'month', 'yyyy-MM') + pie_1m = _compute_type_counts('now-1M/M', types) + pie_12m = _compute_type_counts('now-12M/M', types) + return { + "last_10_days": days[-10:], + "last_10_weeks": weeks[-10:], + "last_10_months": months[-10:], + "type_pie_1m": pie_1m, + "type_pie_12m": pie_12m, + } + +def get_analytics_overview(force: bool = False): + now_ts = time.time() + if force or ANALYTICS_CACHE["data"] is None or (now_ts - ANALYTICS_CACHE["ts"]) > 3600: + ANALYTICS_CACHE["data"] = compute_analytics() + ANALYTICS_CACHE["ts"] = now_ts + return ANALYTICS_CACHE["data"] + +def _seconds_until_hour(h: int): + now = datetime.now() + tgt = now.replace(hour=h, minute=0, second=0, microsecond=0) + if tgt <= now: + tgt = tgt + timedelta(days=1) + return max(0, int((tgt - now).total_seconds())) + +def start_daily_analytics_scheduler(): + def _run_and_reschedule(): + try: + get_analytics_overview(force=True) + except Exception as e: + print(f"分析任务失败: {e}") + finally: + threading.Timer(24 * 3600, _run_and_reschedule).start() + delay = _seconds_until_hour(3) + threading.Timer(delay, _run_and_reschedule).start() + def write_user_data(user_data): """ 写入用户数据到 ES @@ -375,6 +439,23 @@ def get_all_users(): print(f"获取所有用户失败: {str(e)}") return [] +def get_user_by_id(user_id): + try: + search = UserDocument.search() + search = search.query("term", user_id=int(user_id)) + response = search.execute() + if response.hits: + hit = response.hits[0] + return { + "user_id": hit.user_id, + "username": hit.username, + "permission": hit.permission, + } + return None + except Exception as e: + print(f"获取用户数据失败: {str(e)}") + return None + def delete_user_by_username(username): """ 根据用户名删除用户 diff --git a/elastic/urls.py b/elastic/urls.py index 5ff4aec..75a1222 100644 --- a/elastic/urls.py +++ b/elastic/urls.py @@ -17,6 +17,7 @@ urlpatterns = [ path('search/', views.search, name='search'), path('fuzzy-search/', views.fuzzy_search, name='fuzzy_search'), path('all-data/', views.get_all_data, name='get_all_data'), + path('analytics/overview/', views.analytics_overview, name='analytics_overview'), # 用户管理 path('users/', views.get_users, name='get_users'), diff --git a/elastic/views.py b/elastic/views.py index 45d94f7..a6b71c8 100644 --- a/elastic/views.py +++ b/elastic/views.py @@ -79,6 +79,15 @@ def get_all_data(request): except Exception as e: return JsonResponse({"status": "error", "message": str(e)}, status=500) +@require_http_methods(["GET"]) +def analytics_overview(request): + try: + force = request.GET.get("force") == "1" + data = get_analytics_overview(force=force) + return JsonResponse({"status": "success", "data": data}) + except Exception as e: + return JsonResponse({"status": "error", "message": str(e)}, status=500) + @require_http_methods(["DELETE"]) @csrf_exempt