import base64 import json import os import io import random import string import time import smtplib from django.http import JsonResponse, HttpResponseBadRequest from django.shortcuts import render, redirect from django.views.decorators.http import require_http_methods from django.views.decorators.csrf import csrf_protect, ensure_csrf_cookie from django.conf import settings from .es_client import get_user_by_username from .crypto import get_public_key_spki_b64, rsa_oaep_decrypt_b64, aes_gcm_decrypt_b64, verify_password, generate_rsa_private_pem_b64, public_spki_b64_from_private_pem_b64, rsa_oaep_decrypt_b64_with_private_pem from elastic.es_connect import get_registration_code, get_user_by_username as es_get_user_by_username, get_all_users as es_get_all_users, write_user_data, update_user_by_id, get_user_by_id, create_registration_code_manage_request, find_pending_registration_code_manage_request, list_registration_code_manage_requests, decide_registration_code_manage_request, get_registration_code_manage_request @require_http_methods(["GET"]) @ensure_csrf_cookie def login_page(request): return render(request, "accounts/login.html") @require_http_methods(["GET"]) @ensure_csrf_cookie def pubkey(request): pem_b64 = request.session.get("rsa_private_pem_b64") if not pem_b64: pem_b64 = generate_rsa_private_pem_b64() request.session["rsa_private_pem_b64"] = pem_b64 pk_b64 = public_spki_b64_from_private_pem_b64(pem_b64) return JsonResponse({"public_key_spki": pk_b64}) @require_http_methods(["GET"]) @ensure_csrf_cookie def captcha(request): try: from captcha.image import ImageCaptcha except Exception: return JsonResponse({"ok": False, "message": "captcha unavailable"}, status=500) code = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(5)) request.session["captcha_code"] = code img = ImageCaptcha(width=160, height=60) image = img.generate_image(code) buf = io.BytesIO() image.save(buf, format="PNG") b64 = base64.b64encode(buf.getvalue()).decode("ascii") return JsonResponse({"ok": True, "image_b64": b64}) @require_http_methods(["POST"]) @csrf_protect def set_session_key(request): try: payload = json.loads(request.body.decode("utf-8")) except json.JSONDecodeError: return HttpResponseBadRequest("Invalid JSON") enc_key_b64 = payload.get("encrypted_key", "") if not enc_key_b64: return HttpResponseBadRequest("Missing fields") try: pem_b64 = request.session.get("rsa_private_pem_b64") if not pem_b64: return HttpResponseBadRequest("Decrypt error") key_bytes = rsa_oaep_decrypt_b64_with_private_pem(pem_b64, enc_key_b64) except Exception: return HttpResponseBadRequest("Decrypt error") request.session["session_enc_key_b64"] = base64.b64encode(key_bytes).decode("ascii") return JsonResponse({"ok": True}) @require_http_methods(["GET"]) @ensure_csrf_cookie def profile_page(request): session_user_id = request.session.get("user_id") if session_user_id is None: return redirect("/accounts/login/") # 获取用户信息 user = get_user_by_id(session_user_id) if not user: return redirect("/accounts/login/") # 获取个人提交的成就(图片) from elastic.es_connect import search_all from elastic.views import _attach_image_urls raw_results = [r for r in search_all() if str(r.get("writer_id", "")) == str(session_user_id)] achievements = _attach_image_urls(request, raw_results) permission_name = "管理员" if int(user.get("permission", 1)) == 0 else "普通用户" context = { "username": request.session.get("username"), "profile_user": user, "permission_name": permission_name, "achievements": achievements, } return render(request, "accounts/profile.html", context) @require_http_methods(["POST"]) @csrf_protect def secure_login_submit(request): try: payload = json.loads(request.body.decode("utf-8")) except json.JSONDecodeError: return HttpResponseBadRequest("Invalid JSON") iv_b64 = payload.get("iv", "") ct_b64 = payload.get("ciphertext", "") if not iv_b64 or not ct_b64: return HttpResponseBadRequest("Missing fields") key_b64 = request.session.get("session_enc_key_b64") if not key_b64: return HttpResponseBadRequest("Session key missing") try: key_bytes = base64.b64decode(key_b64) pt = aes_gcm_decrypt_b64(key_bytes, iv_b64, ct_b64) obj = json.loads(pt.decode("utf-8")) except Exception: return HttpResponseBadRequest("Decrypt error") username = (obj.get("username") or "").strip() password = (obj.get("password") or "") if not username or not password: return HttpResponseBadRequest("Missing credentials") if bool(request.session.get("login_failed_once")): ans = (obj.get("captcha") or "").strip() code = request.session.get("captcha_code") if not ans or not code or ans.lower() != str(code).lower(): return JsonResponse({"ok": False, "message": "验证码错误", "captcha_required": True}, status=401) user = get_user_by_username(username) if not user: request.session["login_failed_once"] = True return JsonResponse({"ok": False, "message": "用户不存在", "captcha_required": True}, status=401) if not verify_password(password, user.get("password_salt") or "", user.get("password_hash") or ""): request.session["login_failed_once"] = True return JsonResponse({"ok": False, "message": "账户或密码错误", "captcha_required": True}, status=401) try: request.session.cycle_key() except Exception: pass request.session["user_id"] = user["user_id"] request.session["username"] = user["username"] try: request.session["permission"] = int(user["permission"]) if user.get("permission") is not None else 1 except Exception: request.session["permission"] = 1 if "session_enc_key_b64" in request.session: del request.session["session_enc_key_b64"] if "rsa_private_pem_b64" in request.session: del request.session["rsa_private_pem_b64"] if "login_failed_once" in request.session: del request.session["login_failed_once"] if "captcha_code" in request.session: del request.session["captcha_code"] return JsonResponse({"ok": True, "redirect_url": f"/main/home/?user_id={user['user_id']}"}) @require_http_methods(["GET"]) def home(request): # Minimal placeholder page per requirement # Ensure user_id is passed via query and session contains id user_id = request.GET.get("user_id") session_user_id = request.session.get("user_id") context = { "user_id": user_id or session_user_id, } return render(request, "accounts/home.html", context) @require_http_methods(["POST"]) @csrf_protect def logout(request): # Flush the session to clear all data and rotate the key try: request.session.flush() except Exception: pass # Return a response that also deletes cookies client-side resp = JsonResponse({"ok": True, "redirect_url": "/accounts/login/"}) try: # Delete session cookie resp.delete_cookie( settings.SESSION_COOKIE_NAME, path='/', samesite=settings.SESSION_COOKIE_SAMESITE, secure=settings.SESSION_COOKIE_SECURE, ) # Optionally delete CSRF cookie to satisfy "清除cookie" 的要求 resp.delete_cookie( settings.CSRF_COOKIE_NAME, path='/', samesite=settings.CSRF_COOKIE_SAMESITE, secure=settings.CSRF_COOKIE_SECURE, ) except Exception: pass return resp @require_http_methods(["GET"]) @ensure_csrf_cookie def register_page(request): return render(request, "accounts/register.html") @require_http_methods(["POST"]) @csrf_protect def register_submit(request): try: payload = json.loads(request.body.decode("utf-8")) except json.JSONDecodeError: return HttpResponseBadRequest("Invalid JSON") code = (payload.get("code") or "").strip() email = (payload.get("email") or "").strip() email_code = (payload.get("email_code") or "").strip() username = (payload.get("username") or "").strip() password = (payload.get("password") or "") if not email or not email_code or not username or not password: return HttpResponseBadRequest("Missing fields") v = request.session.get("email_verify") or {} if (v.get("email") or "") != email: return JsonResponse({"ok": False, "message": "请先验证邮箱"}, status=400) try: exp_ts = int(v.get("expires_at") or 0) except Exception: exp_ts = 0 if exp_ts < int(time.time()): return JsonResponse({"ok": False, "message": "验证码已过期"}, status=400) if (v.get("code") or "") != email_code: return JsonResponse({"ok": False, "message": "邮箱验证码错误"}, status=400) rc = None if code: rc = get_registration_code(code) if not rc: return JsonResponse({"ok": False, "message": "注册码无效"}, status=400) try: exp = rc.get("expires_at") now = __import__("datetime").datetime.now(__import__("datetime").timezone.utc) if hasattr(exp, 'isoformat'): exp_dt = exp else: exp_dt = __import__("datetime").datetime.fromisoformat(str(exp)) if exp_dt <= now: return JsonResponse({"ok": False, "message": "注册码已过期"}, status=400) except Exception: pass existing = es_get_user_by_username(username) if existing: return JsonResponse({"ok": False, "message": "用户名已存在"}, status=409) users = es_get_all_users() next_id = (max([int(u.get("user_id", 0)) for u in users]) + 1) if users else 1 ok = write_user_data({ "user_id": next_id, "username": username, "password": password, "permission": 1, "email": email, "key": (rc.get("keys") if rc else []) or [], "manage_key": (rc.get("manage_keys") if rc else []) or [], "registration_code": (rc.get("code") if rc else None), }) if not ok: return JsonResponse({"ok": False, "message": "注册失败"}, status=500) try: if "email_verify" in request.session: del request.session["email_verify"] except Exception: pass return JsonResponse({"ok": True, "redirect_url": "/accounts/login/"}) @require_http_methods(["POST"]) @csrf_protect def replace_registration_code_view(request): session_user_id = request.session.get("user_id") if session_user_id is None: return JsonResponse({"ok": False, "message": "未登录"}, status=401) try: payload = json.loads(request.body.decode("utf-8")) except json.JSONDecodeError: return HttpResponseBadRequest("Invalid JSON") code = (payload.get("code") or "").strip() if not code: return JsonResponse({"ok": False, "message": "请输入注册码"}, status=400) rc = get_registration_code(code) if not rc: return JsonResponse({"ok": False, "message": "注册码无效"}, status=400) try: exp = rc.get("expires_at") now = __import__("datetime").datetime.now(__import__("datetime").timezone.utc) if hasattr(exp, 'isoformat'): exp_dt = exp else: exp_dt = __import__("datetime").datetime.fromisoformat(str(exp)) if exp_dt <= now: return JsonResponse({"ok": False, "message": "注册码已过期"}, status=400) except Exception: pass keys = list(rc.get("keys") or []) manage_keys = list(rc.get("manage_keys") or []) ok = update_user_by_id(session_user_id, key=keys, manage_key=manage_keys, registration_code=code) if not ok: return JsonResponse({"ok": False, "message": "替换失败"}, status=500) return JsonResponse({"ok": True}) @require_http_methods(["GET"]) def registration_code_preview_view(request): session_user_id = request.session.get("user_id") if session_user_id is None: return JsonResponse({"ok": False, "message": "未登录"}, status=401) code = (request.GET.get("code") or "").strip() if not code: return JsonResponse({"ok": False, "message": "请输入注册码"}, status=400) rc = get_registration_code(code) if not rc: return JsonResponse({"ok": False, "message": "注册码无效"}, status=400) try: exp = rc.get("expires_at") now = __import__("datetime").datetime.now(__import__("datetime").timezone.utc) if hasattr(exp, 'isoformat'): exp_dt = exp else: exp_dt = __import__("datetime").datetime.fromisoformat(str(exp)) if exp_dt <= now: return JsonResponse({"ok": False, "message": "注册码已过期"}, status=400) except Exception: pass return JsonResponse( { "ok": True, "data": { "code": rc.get("code"), "keys": list(rc.get("keys") or []), "manage_keys": list(rc.get("manage_keys") or []), "expires_at": rc.get("expires_at"), }, } ) @require_http_methods(["POST"]) @csrf_protect def submit_registration_code_request_view(request): session_user_id = request.session.get("user_id") if session_user_id is None: return JsonResponse({"ok": False, "message": "未登录"}, status=401) try: perm = int(request.session.get("permission", 1)) except Exception: perm = 1 if perm == 0: return JsonResponse({"ok": False, "message": "无权限"}, status=403) me = get_user_by_id(session_user_id) or {} if (me.get("manage_key") or []) or int(me.get("can_manage_registration_codes") or 0) == 1: return JsonResponse({"ok": False, "message": "无需申请"}, status=400) if str(me.get("registration_code") or "").strip(): return JsonResponse({"ok": False, "message": "已有注册码,无法申请"}, status=400) try: payload = json.loads(request.body.decode("utf-8")) except json.JSONDecodeError: return HttpResponseBadRequest("Invalid JSON") reason = (payload.get("reason") or "").strip() if not reason: return JsonResponse({"ok": False, "message": "请填写申请理由"}, status=400) pending = find_pending_registration_code_manage_request(session_user_id) if pending: return JsonResponse({"ok": True, "message": "已提交申请"}) rid = create_registration_code_manage_request(session_user_id, me.get("username"), reason) if not rid: return JsonResponse({"ok": False, "message": "提交失败"}, status=500) return JsonResponse({"ok": True}) @require_http_methods(["GET"]) @ensure_csrf_cookie def registration_code_requests_page(request): session_user_id = request.session.get("user_id") if session_user_id is None: return redirect("/accounts/login/") try: perm = int(request.session.get("permission", 1)) except Exception: perm = 1 if perm != 0: return redirect("/main/home/") me = get_user_by_id(session_user_id) or {} return render(request, "accounts/registration_code_requests.html", {"username": me.get("username")}) @require_http_methods(["GET"]) def list_registration_code_requests_view(request): session_user_id = request.session.get("user_id") if session_user_id is None: return JsonResponse({"ok": False, "message": "未登录"}, status=401) try: perm = int(request.session.get("permission", 1)) except Exception: perm = 1 if perm != 0: return JsonResponse({"ok": False, "message": "无权限"}, status=403) status = (request.GET.get("status") or "").strip() or None data = list_registration_code_manage_requests(status=status) return JsonResponse({"ok": True, "data": data}) @require_http_methods(["POST"]) @csrf_protect def decide_registration_code_request_view(request): session_user_id = request.session.get("user_id") if session_user_id is None: return JsonResponse({"ok": False, "message": "未登录"}, status=401) try: perm = int(request.session.get("permission", 1)) except Exception: perm = 1 if perm != 0: return JsonResponse({"ok": False, "message": "无权限"}, status=403) try: payload = json.loads(request.body.decode("utf-8")) except json.JSONDecodeError: return HttpResponseBadRequest("Invalid JSON") request_id = (payload.get("request_id") or "").strip() action = (payload.get("action") or "").strip().lower() note = (payload.get("note") or "").strip() if not request_id or action not in ("approve", "reject"): return JsonResponse({"ok": False, "message": "参数错误"}, status=400) req = get_registration_code_manage_request(request_id) if not req: return JsonResponse({"ok": False, "message": "申请不存在"}, status=404) status = "approved" if action == "approve" else "rejected" ok = decide_registration_code_manage_request(request_id, status=status, reviewed_by=session_user_id, reviewer_note=note) if not ok: return JsonResponse({"ok": False, "message": "操作失败"}, status=500) if status == "approved": uid = req.get("user_id") update_user_by_id(uid, can_manage_registration_codes=1, registration_manage_keys=[]) return JsonResponse({"ok": True}) @require_http_methods(["POST"]) @csrf_protect def send_email_code(request): try: payload = json.loads(request.body.decode("utf-8")) except json.JSONDecodeError: return HttpResponseBadRequest("Invalid JSON") email = (payload.get("email") or "").strip() if not email: return HttpResponseBadRequest("Missing email") if "@" not in email: return JsonResponse({"ok": False, "message": "邮箱格式不正确"}, status=400) verify_code = "".join(random.choice(string.digits) for _ in range(6)) ttl = int(os.environ.get("SMTP_CODE_TTL", "600") or 600) request.session["email_verify"] = {"email": email, "code": verify_code, "expires_at": int(time.time()) + max(60, ttl)} ok, msg = _send_smtp_email(email, verify_code) if not ok: return JsonResponse({"ok": False, "message": msg or "验证码发送失败"}, status=500) return JsonResponse({"ok": True}) def _send_smtp_email(to_email: str, code: str): host = os.environ.get("SMTP_HOST", "") port_raw = os.environ.get("SMTP_PORT", "") try: port = int(port_raw) if port_raw else 0 except Exception: port = 0 user = os.environ.get("SMTP_USERNAME") or os.environ.get("SMTP_USER") or "" password = os.environ.get("SMTP_PASSWORD", "") use_tls = str(os.environ.get("SMTP_USE_TLS", "")).lower() in ("1", "true", "yes") use_ssl = str(os.environ.get("SMTP_USE_SSL", "")).lower() in ("1", "true", "yes") sender = os.environ.get("SMTP_FROM_EMAIL") or os.environ.get("SMTP_FROM") or user or "" subject = os.environ.get("SMTP_SUBJECT") or "邮箱验证码" if not host or not port or not sender: return False, "缺少SMTP配置" body = f"您的验证码是:{code},10分钟内有效。" msg = f"From: {sender}\r\nTo: {to_email}\r\nSubject: {subject}\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n{body}" try: if use_ssl: server = smtplib.SMTP_SSL(host, port) else: server = smtplib.SMTP(host, port) server.ehlo() if use_tls and not use_ssl: server.starttls() server.ehlo() if user and password: server.login(user, password) server.sendmail(sender, [to_email], msg.encode("utf-8")) try: server.quit() except Exception: try: server.close() except Exception: pass return True, "" except Exception as e: return False, str(e)