数据可视化上线
This commit is contained in:
@@ -17,10 +17,9 @@ class ElasticConfig(AppConfig):
|
||||
return
|
||||
|
||||
# 延迟导入,避免循环导入或过早加载
|
||||
from .es_connect import create_index_with_mapping, get_type_list
|
||||
from .es_connect import create_index_with_mapping, start_daily_analytics_scheduler
|
||||
try:
|
||||
create_index_with_mapping()
|
||||
types = get_type_list()
|
||||
print(f"🔎 启动时 type_list: {types}")
|
||||
start_daily_analytics_scheduler()
|
||||
except Exception as e:
|
||||
print(f"❌ ES 初始化失败: {e}")
|
||||
@@ -15,6 +15,8 @@ GLOBAL_INDEX.settings(number_of_shards=1, number_of_replicas=0)
|
||||
class AchievementDocument(Document):
|
||||
"""获奖数据文档映射"""
|
||||
writer_id = fields.TextField(fields={'keyword': {'type': 'keyword'}})
|
||||
time = fields.DateField()
|
||||
|
||||
data = fields.TextField(
|
||||
analyzer='ik_max_word',
|
||||
search_analyzer='ik_smart',
|
||||
|
||||
@@ -8,6 +8,8 @@ from .documents import AchievementDocument, UserDocument, GlobalDocument
|
||||
from .indexes import ACHIEVEMENT_INDEX_NAME, USER_INDEX_NAME, GLOBAL_INDEX_NAME
|
||||
import hashlib
|
||||
import time
|
||||
from datetime import datetime, timezone, timedelta
|
||||
import threading
|
||||
|
||||
# 使用Django的ES连接配置
|
||||
connections.create_connection(hosts=['localhost:9200'])
|
||||
@@ -132,11 +134,11 @@ def insert_data(data):
|
||||
bool: 插入成功返回True,失败返回False
|
||||
"""
|
||||
try:
|
||||
# 使用Django-elasticsearch-dsl的方式插入数据
|
||||
achievement = AchievementDocument(
|
||||
writer_id=data.get('writer_id', ''),
|
||||
data=data.get('data', ''),
|
||||
image=data.get('image', '')
|
||||
image=data.get('image', ''),
|
||||
time=datetime.now(timezone.utc).isoformat()
|
||||
)
|
||||
achievement.save()
|
||||
print(f"文档写入成功,内容: {data}")
|
||||
@@ -303,6 +305,68 @@ def search_by_any_field(keyword):
|
||||
print(f"模糊搜索失败: {str(e)}")
|
||||
return []
|
||||
|
||||
ANALYTICS_CACHE = {"data": None, "ts": 0}
|
||||
|
||||
def _compute_hist(range_gte: str, interval: str, fmt: str):
|
||||
from elasticsearch_dsl import Search
|
||||
s = AchievementDocument.search()
|
||||
s = s.filter('range', time={'gte': range_gte, 'lte': 'now'})
|
||||
s = s.extra(size=0)
|
||||
s.aggs.bucket('b', 'date_histogram', field='time', calendar_interval=interval, format=fmt, min_doc_count=0)
|
||||
resp = s.execute()
|
||||
buckets = getattr(resp.aggs, 'b').buckets
|
||||
return [{"label": b.key_as_string, "count": b.doc_count} for b in buckets]
|
||||
|
||||
def _compute_type_counts(range_gte: str, types: list):
|
||||
counts = []
|
||||
for t in types:
|
||||
s = AchievementDocument.search()
|
||||
s = s.filter('range', time={'gte': range_gte, 'lte': 'now'})
|
||||
s = s.query('match_phrase', data=str(t))
|
||||
total = s.count()
|
||||
counts.append({"type": str(t), "count": int(total)})
|
||||
return counts
|
||||
|
||||
def compute_analytics():
|
||||
types = get_type_list()
|
||||
days = _compute_hist('now-10d/d', 'day', 'yyyy-MM-dd')
|
||||
weeks = _compute_hist('now-10w/w', 'week', 'yyyy-ww')
|
||||
months = _compute_hist('now-10M/M', 'month', 'yyyy-MM')
|
||||
pie_1m = _compute_type_counts('now-1M/M', types)
|
||||
pie_12m = _compute_type_counts('now-12M/M', types)
|
||||
return {
|
||||
"last_10_days": days[-10:],
|
||||
"last_10_weeks": weeks[-10:],
|
||||
"last_10_months": months[-10:],
|
||||
"type_pie_1m": pie_1m,
|
||||
"type_pie_12m": pie_12m,
|
||||
}
|
||||
|
||||
def get_analytics_overview(force: bool = False):
|
||||
now_ts = time.time()
|
||||
if force or ANALYTICS_CACHE["data"] is None or (now_ts - ANALYTICS_CACHE["ts"]) > 3600:
|
||||
ANALYTICS_CACHE["data"] = compute_analytics()
|
||||
ANALYTICS_CACHE["ts"] = now_ts
|
||||
return ANALYTICS_CACHE["data"]
|
||||
|
||||
def _seconds_until_hour(h: int):
|
||||
now = datetime.now()
|
||||
tgt = now.replace(hour=h, minute=0, second=0, microsecond=0)
|
||||
if tgt <= now:
|
||||
tgt = tgt + timedelta(days=1)
|
||||
return max(0, int((tgt - now).total_seconds()))
|
||||
|
||||
def start_daily_analytics_scheduler():
|
||||
def _run_and_reschedule():
|
||||
try:
|
||||
get_analytics_overview(force=True)
|
||||
except Exception as e:
|
||||
print(f"分析任务失败: {e}")
|
||||
finally:
|
||||
threading.Timer(24 * 3600, _run_and_reschedule).start()
|
||||
delay = _seconds_until_hour(3)
|
||||
threading.Timer(delay, _run_and_reschedule).start()
|
||||
|
||||
def write_user_data(user_data):
|
||||
"""
|
||||
写入用户数据到 ES
|
||||
@@ -375,6 +439,23 @@ def get_all_users():
|
||||
print(f"获取所有用户失败: {str(e)}")
|
||||
return []
|
||||
|
||||
def get_user_by_id(user_id):
|
||||
try:
|
||||
search = UserDocument.search()
|
||||
search = search.query("term", user_id=int(user_id))
|
||||
response = search.execute()
|
||||
if response.hits:
|
||||
hit = response.hits[0]
|
||||
return {
|
||||
"user_id": hit.user_id,
|
||||
"username": hit.username,
|
||||
"permission": hit.permission,
|
||||
}
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"获取用户数据失败: {str(e)}")
|
||||
return None
|
||||
|
||||
def delete_user_by_username(username):
|
||||
"""
|
||||
根据用户名删除用户
|
||||
|
||||
@@ -17,6 +17,7 @@ urlpatterns = [
|
||||
path('search/', views.search, name='search'),
|
||||
path('fuzzy-search/', views.fuzzy_search, name='fuzzy_search'),
|
||||
path('all-data/', views.get_all_data, name='get_all_data'),
|
||||
path('analytics/overview/', views.analytics_overview, name='analytics_overview'),
|
||||
|
||||
# 用户管理
|
||||
path('users/', views.get_users, name='get_users'),
|
||||
|
||||
@@ -79,6 +79,15 @@ def get_all_data(request):
|
||||
except Exception as e:
|
||||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||||
|
||||
@require_http_methods(["GET"])
|
||||
def analytics_overview(request):
|
||||
try:
|
||||
force = request.GET.get("force") == "1"
|
||||
data = get_analytics_overview(force=force)
|
||||
return JsonResponse({"status": "success", "data": data})
|
||||
except Exception as e:
|
||||
return JsonResponse({"status": "error", "message": str(e)}, status=500)
|
||||
|
||||
|
||||
@require_http_methods(["DELETE"])
|
||||
@csrf_exempt
|
||||
|
||||
Reference in New Issue
Block a user