Files
Achievement_Inputing/accounts/views.py
2025-11-22 11:45:09 +08:00

301 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import json
import os
import io
import random
import string
import time
import smtplib
from django.http import JsonResponse, HttpResponseBadRequest
from django.shortcuts import render, redirect
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_protect, ensure_csrf_cookie
from django.conf import settings
from .es_client import get_user_by_username
from .crypto import get_public_key_spki_b64, rsa_oaep_decrypt_b64, aes_gcm_decrypt_b64, verify_password, generate_rsa_private_pem_b64, public_spki_b64_from_private_pem_b64, rsa_oaep_decrypt_b64_with_private_pem
from elastic.es_connect import get_registration_code, get_user_by_username as es_get_user_by_username, get_all_users as es_get_all_users, write_user_data
@require_http_methods(["GET"])
@ensure_csrf_cookie
def login_page(request):
return render(request, "accounts/login.html")
@require_http_methods(["GET"])
@ensure_csrf_cookie
def pubkey(request):
pem_b64 = request.session.get("rsa_private_pem_b64")
if not pem_b64:
pem_b64 = generate_rsa_private_pem_b64()
request.session["rsa_private_pem_b64"] = pem_b64
pk_b64 = public_spki_b64_from_private_pem_b64(pem_b64)
return JsonResponse({"public_key_spki": pk_b64})
@require_http_methods(["GET"])
@ensure_csrf_cookie
def captcha(request):
try:
from captcha.image import ImageCaptcha
except Exception:
return JsonResponse({"ok": False, "message": "captcha unavailable"}, status=500)
code = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(5))
request.session["captcha_code"] = code
img = ImageCaptcha(width=160, height=60)
image = img.generate_image(code)
buf = io.BytesIO()
image.save(buf, format="PNG")
b64 = base64.b64encode(buf.getvalue()).decode("ascii")
return JsonResponse({"ok": True, "image_b64": b64})
@require_http_methods(["POST"])
@csrf_protect
def set_session_key(request):
try:
payload = json.loads(request.body.decode("utf-8"))
except json.JSONDecodeError:
return HttpResponseBadRequest("Invalid JSON")
enc_key_b64 = payload.get("encrypted_key", "")
if not enc_key_b64:
return HttpResponseBadRequest("Missing fields")
try:
pem_b64 = request.session.get("rsa_private_pem_b64")
if not pem_b64:
return HttpResponseBadRequest("Decrypt error")
key_bytes = rsa_oaep_decrypt_b64_with_private_pem(pem_b64, enc_key_b64)
except Exception:
return HttpResponseBadRequest("Decrypt error")
request.session["session_enc_key_b64"] = base64.b64encode(key_bytes).decode("ascii")
return JsonResponse({"ok": True})
@require_http_methods(["POST"])
@csrf_protect
def secure_login_submit(request):
try:
payload = json.loads(request.body.decode("utf-8"))
except json.JSONDecodeError:
return HttpResponseBadRequest("Invalid JSON")
iv_b64 = payload.get("iv", "")
ct_b64 = payload.get("ciphertext", "")
if not iv_b64 or not ct_b64:
return HttpResponseBadRequest("Missing fields")
key_b64 = request.session.get("session_enc_key_b64")
if not key_b64:
return HttpResponseBadRequest("Session key missing")
try:
key_bytes = base64.b64decode(key_b64)
pt = aes_gcm_decrypt_b64(key_bytes, iv_b64, ct_b64)
obj = json.loads(pt.decode("utf-8"))
except Exception:
return HttpResponseBadRequest("Decrypt error")
username = (obj.get("username") or "").strip()
password = (obj.get("password") or "")
if not username or not password:
return HttpResponseBadRequest("Missing credentials")
if bool(request.session.get("login_failed_once")):
ans = (obj.get("captcha") or "").strip()
code = request.session.get("captcha_code")
if not ans or not code or ans.lower() != str(code).lower():
return JsonResponse({"ok": False, "message": "验证码错误", "captcha_required": True}, status=401)
user = get_user_by_username(username)
if not user:
request.session["login_failed_once"] = True
return JsonResponse({"ok": False, "message": "用户不存在", "captcha_required": True}, status=401)
if not verify_password(password, user.get("password_salt") or "", user.get("password_hash") or ""):
request.session["login_failed_once"] = True
return JsonResponse({"ok": False, "message": "账户或密码错误", "captcha_required": True}, status=401)
try:
request.session.cycle_key()
except Exception:
pass
request.session["user_id"] = user["user_id"]
request.session["username"] = user["username"]
try:
request.session["permission"] = int(user["permission"]) if user.get("permission") is not None else 1
except Exception:
request.session["permission"] = 1
if "session_enc_key_b64" in request.session:
del request.session["session_enc_key_b64"]
if "rsa_private_pem_b64" in request.session:
del request.session["rsa_private_pem_b64"]
if "login_failed_once" in request.session:
del request.session["login_failed_once"]
if "captcha_code" in request.session:
del request.session["captcha_code"]
return JsonResponse({"ok": True, "redirect_url": f"/main/home/?user_id={user['user_id']}"})
@require_http_methods(["GET"])
def home(request):
# Minimal placeholder page per requirement
# Ensure user_id is passed via query and session contains id
user_id = request.GET.get("user_id")
session_user_id = request.session.get("user_id")
context = {
"user_id": user_id or session_user_id,
}
return render(request, "accounts/home.html", context)
@require_http_methods(["POST"])
@csrf_protect
def logout(request):
# Flush the session to clear all data and rotate the key
try:
request.session.flush()
except Exception:
pass
# Return a response that also deletes cookies client-side
resp = JsonResponse({"ok": True, "redirect_url": "/accounts/login/"})
try:
# Delete session cookie
resp.delete_cookie(
settings.SESSION_COOKIE_NAME,
path='/',
samesite=settings.SESSION_COOKIE_SAMESITE,
secure=settings.SESSION_COOKIE_SECURE,
)
# Optionally delete CSRF cookie to satisfy "清除cookie" 的要求
resp.delete_cookie(
settings.CSRF_COOKIE_NAME,
path='/',
samesite=settings.CSRF_COOKIE_SAMESITE,
secure=settings.CSRF_COOKIE_SECURE,
)
except Exception:
pass
return resp
@require_http_methods(["GET"])
@ensure_csrf_cookie
def register_page(request):
return render(request, "accounts/register.html")
@require_http_methods(["POST"])
@csrf_protect
def register_submit(request):
try:
payload = json.loads(request.body.decode("utf-8"))
except json.JSONDecodeError:
return HttpResponseBadRequest("Invalid JSON")
code = (payload.get("code") or "").strip()
email = (payload.get("email") or "").strip()
email_code = (payload.get("email_code") or "").strip()
username = (payload.get("username") or "").strip()
password = (payload.get("password") or "")
if not email or not email_code or not username or not password:
return HttpResponseBadRequest("Missing fields")
v = request.session.get("email_verify") or {}
if (v.get("email") or "") != email:
return JsonResponse({"ok": False, "message": "请先验证邮箱"}, status=400)
try:
exp_ts = int(v.get("expires_at") or 0)
except Exception:
exp_ts = 0
if exp_ts < int(time.time()):
return JsonResponse({"ok": False, "message": "验证码已过期"}, status=400)
if (v.get("code") or "") != email_code:
return JsonResponse({"ok": False, "message": "邮箱验证码错误"}, status=400)
rc = None
if code:
rc = get_registration_code(code)
if not rc:
return JsonResponse({"ok": False, "message": "注册码无效"}, status=400)
try:
exp = rc.get("expires_at")
now = __import__("datetime").datetime.now(__import__("datetime").timezone.utc)
if hasattr(exp, 'isoformat'):
exp_dt = exp
else:
exp_dt = __import__("datetime").datetime.fromisoformat(str(exp))
if exp_dt <= now:
return JsonResponse({"ok": False, "message": "注册码已过期"}, status=400)
except Exception:
pass
existing = es_get_user_by_username(username)
if existing:
return JsonResponse({"ok": False, "message": "用户名已存在"}, status=409)
users = es_get_all_users()
next_id = (max([int(u.get("user_id", 0)) for u in users]) + 1) if users else 1
ok = write_user_data({
"user_id": next_id,
"username": username,
"password": password,
"permission": 1,
"email": email,
"key": (rc.get("keys") if rc else []) or [],
"manage_key": (rc.get("manage_keys") if rc else []) or [],
})
if not ok:
return JsonResponse({"ok": False, "message": "注册失败"}, status=500)
try:
if "email_verify" in request.session:
del request.session["email_verify"]
except Exception:
pass
return JsonResponse({"ok": True, "redirect_url": "/accounts/login/"})
@require_http_methods(["POST"])
@csrf_protect
def send_email_code(request):
try:
payload = json.loads(request.body.decode("utf-8"))
except json.JSONDecodeError:
return HttpResponseBadRequest("Invalid JSON")
email = (payload.get("email") or "").strip()
if not email:
return HttpResponseBadRequest("Missing email")
if "@" not in email:
return JsonResponse({"ok": False, "message": "邮箱格式不正确"}, status=400)
verify_code = "".join(random.choice(string.digits) for _ in range(6))
ttl = int(os.environ.get("SMTP_CODE_TTL", "600") or 600)
request.session["email_verify"] = {"email": email, "code": verify_code, "expires_at": int(time.time()) + max(60, ttl)}
ok, msg = _send_smtp_email(email, verify_code)
if not ok:
return JsonResponse({"ok": False, "message": msg or "验证码发送失败"}, status=500)
return JsonResponse({"ok": True})
def _send_smtp_email(to_email: str, code: str):
host = os.environ.get("SMTP_HOST", "")
port_raw = os.environ.get("SMTP_PORT", "")
try:
port = int(port_raw) if port_raw else 0
except Exception:
port = 0
user = os.environ.get("SMTP_USERNAME") or os.environ.get("SMTP_USER") or ""
password = os.environ.get("SMTP_PASSWORD", "")
use_tls = str(os.environ.get("SMTP_USE_TLS", "")).lower() in ("1", "true", "yes")
use_ssl = str(os.environ.get("SMTP_USE_SSL", "")).lower() in ("1", "true", "yes")
sender = os.environ.get("SMTP_FROM_EMAIL") or os.environ.get("SMTP_FROM") or user or ""
subject = os.environ.get("SMTP_SUBJECT") or "邮箱验证码"
if not host or not port or not sender:
return False, "缺少SMTP配置"
body = f"您的验证码是:{code}10分钟内有效。"
msg = f"From: {sender}\r\nTo: {to_email}\r\nSubject: {subject}\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n{body}"
try:
if use_ssl:
server = smtplib.SMTP_SSL(host, port)
else:
server = smtplib.SMTP(host, port)
server.ehlo()
if use_tls and not use_ssl:
server.starttls()
server.ehlo()
if user and password:
server.login(user, password)
server.sendmail(sender, [to_email], msg.encode("utf-8"))
try:
server.quit()
except Exception:
try:
server.close()
except Exception:
pass
return True, ""
except Exception as e:
return False, str(e)