211 lines
7.8 KiB
Python
211 lines
7.8 KiB
Python
import base64
|
|
import json
|
|
import os
|
|
import io
|
|
import random
|
|
import string
|
|
|
|
from django.http import JsonResponse, HttpResponseBadRequest
|
|
from django.shortcuts import render, redirect
|
|
from django.views.decorators.http import require_http_methods
|
|
from django.views.decorators.csrf import csrf_protect, ensure_csrf_cookie
|
|
from django.conf import settings
|
|
|
|
from .es_client import get_user_by_username
|
|
from .crypto import get_public_key_spki_b64, rsa_oaep_decrypt_b64, aes_gcm_decrypt_b64, verify_password
|
|
from elastic.es_connect import get_registration_code, get_user_by_username as es_get_user_by_username, get_all_users as es_get_all_users, write_user_data
|
|
|
|
|
|
@require_http_methods(["GET"])
|
|
@ensure_csrf_cookie
|
|
def login_page(request):
|
|
return render(request, "accounts/login.html")
|
|
|
|
|
|
@require_http_methods(["GET"])
|
|
@ensure_csrf_cookie
|
|
def pubkey(request):
|
|
pk_b64 = get_public_key_spki_b64()
|
|
return JsonResponse({"public_key_spki": pk_b64})
|
|
|
|
@require_http_methods(["GET"])
|
|
@ensure_csrf_cookie
|
|
def captcha(request):
|
|
try:
|
|
from captcha.image import ImageCaptcha
|
|
except Exception:
|
|
return JsonResponse({"ok": False, "message": "captcha unavailable"}, status=500)
|
|
code = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(5))
|
|
request.session["captcha_code"] = code
|
|
img = ImageCaptcha(width=160, height=60)
|
|
image = img.generate_image(code)
|
|
buf = io.BytesIO()
|
|
image.save(buf, format="PNG")
|
|
b64 = base64.b64encode(buf.getvalue()).decode("ascii")
|
|
return JsonResponse({"ok": True, "image_b64": b64})
|
|
|
|
|
|
@require_http_methods(["POST"])
|
|
@csrf_protect
|
|
def set_session_key(request):
|
|
try:
|
|
payload = json.loads(request.body.decode("utf-8"))
|
|
except json.JSONDecodeError:
|
|
return HttpResponseBadRequest("Invalid JSON")
|
|
enc_key_b64 = payload.get("encrypted_key", "")
|
|
if not enc_key_b64:
|
|
return HttpResponseBadRequest("Missing fields")
|
|
try:
|
|
key_bytes = rsa_oaep_decrypt_b64(enc_key_b64)
|
|
except Exception:
|
|
return HttpResponseBadRequest("Decrypt error")
|
|
request.session["session_enc_key_b64"] = base64.b64encode(key_bytes).decode("ascii")
|
|
return JsonResponse({"ok": True})
|
|
|
|
@require_http_methods(["POST"])
|
|
@csrf_protect
|
|
def secure_login_submit(request):
|
|
try:
|
|
payload = json.loads(request.body.decode("utf-8"))
|
|
except json.JSONDecodeError:
|
|
return HttpResponseBadRequest("Invalid JSON")
|
|
iv_b64 = payload.get("iv", "")
|
|
ct_b64 = payload.get("ciphertext", "")
|
|
if not iv_b64 or not ct_b64:
|
|
return HttpResponseBadRequest("Missing fields")
|
|
key_b64 = request.session.get("session_enc_key_b64")
|
|
if not key_b64:
|
|
return HttpResponseBadRequest("Session key missing")
|
|
try:
|
|
key_bytes = base64.b64decode(key_b64)
|
|
pt = aes_gcm_decrypt_b64(key_bytes, iv_b64, ct_b64)
|
|
obj = json.loads(pt.decode("utf-8"))
|
|
except Exception:
|
|
return HttpResponseBadRequest("Decrypt error")
|
|
username = (obj.get("username") or "").strip()
|
|
password = (obj.get("password") or "")
|
|
if not username or not password:
|
|
return HttpResponseBadRequest("Missing credentials")
|
|
if bool(request.session.get("login_failed_once")):
|
|
ans = (obj.get("captcha") or "").strip()
|
|
code = request.session.get("captcha_code")
|
|
if not ans or not code or ans.lower() != str(code).lower():
|
|
return JsonResponse({"ok": False, "message": "验证码错误", "captcha_required": True}, status=401)
|
|
user = get_user_by_username(username)
|
|
if not user:
|
|
request.session["login_failed_once"] = True
|
|
return JsonResponse({"ok": False, "message": "用户不存在", "captcha_required": True}, status=401)
|
|
if not verify_password(password, user.get("password_salt") or "", user.get("password_hash") or ""):
|
|
request.session["login_failed_once"] = True
|
|
return JsonResponse({"ok": False, "message": "账户或密码错误", "captcha_required": True}, status=401)
|
|
try:
|
|
request.session.cycle_key()
|
|
except Exception:
|
|
pass
|
|
request.session["user_id"] = user["user_id"]
|
|
request.session["username"] = user["username"]
|
|
try:
|
|
request.session["permission"] = int(user["permission"]) if user.get("permission") is not None else 1
|
|
except Exception:
|
|
request.session["permission"] = 1
|
|
if "session_enc_key_b64" in request.session:
|
|
del request.session["session_enc_key_b64"]
|
|
if "login_failed_once" in request.session:
|
|
del request.session["login_failed_once"]
|
|
if "captcha_code" in request.session:
|
|
del request.session["captcha_code"]
|
|
return JsonResponse({"ok": True, "redirect_url": f"/main/home/?user_id={user['user_id']}"})
|
|
|
|
|
|
@require_http_methods(["GET"])
|
|
def home(request):
|
|
# Minimal placeholder page per requirement
|
|
# Ensure user_id is passed via query and session contains id
|
|
user_id = request.GET.get("user_id")
|
|
session_user_id = request.session.get("user_id")
|
|
context = {
|
|
"user_id": user_id or session_user_id,
|
|
}
|
|
return render(request, "accounts/home.html", context)
|
|
|
|
|
|
@require_http_methods(["POST"])
|
|
@csrf_protect
|
|
def logout(request):
|
|
# Flush the session to clear all data and rotate the key
|
|
try:
|
|
request.session.flush()
|
|
except Exception:
|
|
pass
|
|
|
|
# Return a response that also deletes cookies client-side
|
|
resp = JsonResponse({"ok": True, "redirect_url": "/accounts/login/"})
|
|
try:
|
|
# Delete session cookie
|
|
resp.delete_cookie(
|
|
settings.SESSION_COOKIE_NAME,
|
|
path='/',
|
|
samesite=settings.SESSION_COOKIE_SAMESITE,
|
|
secure=settings.SESSION_COOKIE_SECURE,
|
|
)
|
|
# Optionally delete CSRF cookie to satisfy "清除cookie" 的要求
|
|
resp.delete_cookie(
|
|
settings.CSRF_COOKIE_NAME,
|
|
path='/',
|
|
samesite=settings.CSRF_COOKIE_SAMESITE,
|
|
secure=settings.CSRF_COOKIE_SECURE,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return resp
|
|
|
|
@require_http_methods(["GET"])
|
|
@ensure_csrf_cookie
|
|
def register_page(request):
|
|
return render(request, "accounts/register.html")
|
|
|
|
@require_http_methods(["POST"])
|
|
@csrf_protect
|
|
def register_submit(request):
|
|
try:
|
|
payload = json.loads(request.body.decode("utf-8"))
|
|
except json.JSONDecodeError:
|
|
return HttpResponseBadRequest("Invalid JSON")
|
|
code = (payload.get("code") or "").strip()
|
|
email = (payload.get("email") or "").strip()
|
|
username = (payload.get("username") or "").strip()
|
|
password = (payload.get("password") or "")
|
|
if not code or not email or not username or not password:
|
|
return HttpResponseBadRequest("Missing fields")
|
|
rc = get_registration_code(code)
|
|
if not rc:
|
|
return JsonResponse({"ok": False, "message": "注册码无效"}, status=400)
|
|
try:
|
|
exp = rc.get("expires_at")
|
|
now = __import__("datetime").datetime.now(__import__("datetime").timezone.utc)
|
|
if hasattr(exp, 'isoformat'):
|
|
exp_dt = exp
|
|
else:
|
|
exp_dt = __import__("datetime").datetime.fromisoformat(str(exp))
|
|
if exp_dt <= now:
|
|
return JsonResponse({"ok": False, "message": "注册码已过期"}, status=400)
|
|
except Exception:
|
|
pass
|
|
existing = es_get_user_by_username(username)
|
|
if existing:
|
|
return JsonResponse({"ok": False, "message": "用户名已存在"}, status=409)
|
|
users = es_get_all_users()
|
|
next_id = (max([int(u.get("user_id", 0)) for u in users]) + 1) if users else 1
|
|
ok = write_user_data({
|
|
"user_id": next_id,
|
|
"username": username,
|
|
"password": password,
|
|
"permission": 1,
|
|
"email": email,
|
|
"key": rc.get("keys") or [],
|
|
"manage_key": rc.get("manage_keys") or [],
|
|
})
|
|
if not ok:
|
|
return JsonResponse({"ok": False, "message": "注册失败"}, status=500)
|
|
return JsonResponse({"ok": True, "redirect_url": "/accounts/login/"}) |