Files
Achievement_Inputing/accounts/views.py

220 lines
8.4 KiB
Python

import base64
import json
import os
import io
import random
import string
from django.http import JsonResponse, HttpResponseBadRequest
from django.shortcuts import render, redirect
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_protect, ensure_csrf_cookie
from django.conf import settings
from .es_client import get_user_by_username
from .crypto import get_public_key_spki_b64, rsa_oaep_decrypt_b64, aes_gcm_decrypt_b64, verify_password, generate_rsa_private_pem_b64, public_spki_b64_from_private_pem_b64, rsa_oaep_decrypt_b64_with_private_pem
from elastic.es_connect import get_registration_code, get_user_by_username as es_get_user_by_username, get_all_users as es_get_all_users, write_user_data
@require_http_methods(["GET"])
@ensure_csrf_cookie
def login_page(request):
return render(request, "accounts/login.html")
@require_http_methods(["GET"])
@ensure_csrf_cookie
def pubkey(request):
pem_b64 = request.session.get("rsa_private_pem_b64")
if not pem_b64:
pem_b64 = generate_rsa_private_pem_b64()
request.session["rsa_private_pem_b64"] = pem_b64
pk_b64 = public_spki_b64_from_private_pem_b64(pem_b64)
return JsonResponse({"public_key_spki": pk_b64})
@require_http_methods(["GET"])
@ensure_csrf_cookie
def captcha(request):
try:
from captcha.image import ImageCaptcha
except Exception:
return JsonResponse({"ok": False, "message": "captcha unavailable"}, status=500)
code = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(5))
request.session["captcha_code"] = code
img = ImageCaptcha(width=160, height=60)
image = img.generate_image(code)
buf = io.BytesIO()
image.save(buf, format="PNG")
b64 = base64.b64encode(buf.getvalue()).decode("ascii")
return JsonResponse({"ok": True, "image_b64": b64})
@require_http_methods(["POST"])
@csrf_protect
def set_session_key(request):
try:
payload = json.loads(request.body.decode("utf-8"))
except json.JSONDecodeError:
return HttpResponseBadRequest("Invalid JSON")
enc_key_b64 = payload.get("encrypted_key", "")
if not enc_key_b64:
return HttpResponseBadRequest("Missing fields")
try:
pem_b64 = request.session.get("rsa_private_pem_b64")
if not pem_b64:
return HttpResponseBadRequest("Decrypt error")
key_bytes = rsa_oaep_decrypt_b64_with_private_pem(pem_b64, enc_key_b64)
except Exception:
return HttpResponseBadRequest("Decrypt error")
request.session["session_enc_key_b64"] = base64.b64encode(key_bytes).decode("ascii")
return JsonResponse({"ok": True})
@require_http_methods(["POST"])
@csrf_protect
def secure_login_submit(request):
try:
payload = json.loads(request.body.decode("utf-8"))
except json.JSONDecodeError:
return HttpResponseBadRequest("Invalid JSON")
iv_b64 = payload.get("iv", "")
ct_b64 = payload.get("ciphertext", "")
if not iv_b64 or not ct_b64:
return HttpResponseBadRequest("Missing fields")
key_b64 = request.session.get("session_enc_key_b64")
if not key_b64:
return HttpResponseBadRequest("Session key missing")
try:
key_bytes = base64.b64decode(key_b64)
pt = aes_gcm_decrypt_b64(key_bytes, iv_b64, ct_b64)
obj = json.loads(pt.decode("utf-8"))
except Exception:
return HttpResponseBadRequest("Decrypt error")
username = (obj.get("username") or "").strip()
password = (obj.get("password") or "")
if not username or not password:
return HttpResponseBadRequest("Missing credentials")
if bool(request.session.get("login_failed_once")):
ans = (obj.get("captcha") or "").strip()
code = request.session.get("captcha_code")
if not ans or not code or ans.lower() != str(code).lower():
return JsonResponse({"ok": False, "message": "验证码错误", "captcha_required": True}, status=401)
user = get_user_by_username(username)
if not user:
request.session["login_failed_once"] = True
return JsonResponse({"ok": False, "message": "用户不存在", "captcha_required": True}, status=401)
if not verify_password(password, user.get("password_salt") or "", user.get("password_hash") or ""):
request.session["login_failed_once"] = True
return JsonResponse({"ok": False, "message": "账户或密码错误", "captcha_required": True}, status=401)
try:
request.session.cycle_key()
except Exception:
pass
request.session["user_id"] = user["user_id"]
request.session["username"] = user["username"]
try:
request.session["permission"] = int(user["permission"]) if user.get("permission") is not None else 1
except Exception:
request.session["permission"] = 1
if "session_enc_key_b64" in request.session:
del request.session["session_enc_key_b64"]
if "rsa_private_pem_b64" in request.session:
del request.session["rsa_private_pem_b64"]
if "login_failed_once" in request.session:
del request.session["login_failed_once"]
if "captcha_code" in request.session:
del request.session["captcha_code"]
return JsonResponse({"ok": True, "redirect_url": f"/main/home/?user_id={user['user_id']}"})
@require_http_methods(["GET"])
def home(request):
# Minimal placeholder page per requirement
# Ensure user_id is passed via query and session contains id
user_id = request.GET.get("user_id")
session_user_id = request.session.get("user_id")
context = {
"user_id": user_id or session_user_id,
}
return render(request, "accounts/home.html", context)
@require_http_methods(["POST"])
@csrf_protect
def logout(request):
# Flush the session to clear all data and rotate the key
try:
request.session.flush()
except Exception:
pass
# Return a response that also deletes cookies client-side
resp = JsonResponse({"ok": True, "redirect_url": "/accounts/login/"})
try:
# Delete session cookie
resp.delete_cookie(
settings.SESSION_COOKIE_NAME,
path='/',
samesite=settings.SESSION_COOKIE_SAMESITE,
secure=settings.SESSION_COOKIE_SECURE,
)
# Optionally delete CSRF cookie to satisfy "清除cookie" 的要求
resp.delete_cookie(
settings.CSRF_COOKIE_NAME,
path='/',
samesite=settings.CSRF_COOKIE_SAMESITE,
secure=settings.CSRF_COOKIE_SECURE,
)
except Exception:
pass
return resp
@require_http_methods(["GET"])
@ensure_csrf_cookie
def register_page(request):
return render(request, "accounts/register.html")
@require_http_methods(["POST"])
@csrf_protect
def register_submit(request):
try:
payload = json.loads(request.body.decode("utf-8"))
except json.JSONDecodeError:
return HttpResponseBadRequest("Invalid JSON")
code = (payload.get("code") or "").strip()
email = (payload.get("email") or "").strip()
username = (payload.get("username") or "").strip()
password = (payload.get("password") or "")
if not code or not email or not username or not password:
return HttpResponseBadRequest("Missing fields")
rc = get_registration_code(code)
if not rc:
return JsonResponse({"ok": False, "message": "注册码无效"}, status=400)
try:
exp = rc.get("expires_at")
now = __import__("datetime").datetime.now(__import__("datetime").timezone.utc)
if hasattr(exp, 'isoformat'):
exp_dt = exp
else:
exp_dt = __import__("datetime").datetime.fromisoformat(str(exp))
if exp_dt <= now:
return JsonResponse({"ok": False, "message": "注册码已过期"}, status=400)
except Exception:
pass
existing = es_get_user_by_username(username)
if existing:
return JsonResponse({"ok": False, "message": "用户名已存在"}, status=409)
users = es_get_all_users()
next_id = (max([int(u.get("user_id", 0)) for u in users]) + 1) if users else 1
ok = write_user_data({
"user_id": next_id,
"username": username,
"password": password,
"permission": 1,
"email": email,
"key": rc.get("keys") or [],
"manage_key": rc.get("manage_keys") or [],
})
if not ok:
return JsonResponse({"ok": False, "message": "注册失败"}, status=500)
return JsonResponse({"ok": True, "redirect_url": "/accounts/login/"})