552 lines
22 KiB
Python
552 lines
22 KiB
Python
import base64
|
||
import json
|
||
import os
|
||
import io
|
||
import random
|
||
import string
|
||
import time
|
||
import smtplib
|
||
|
||
from django.http import JsonResponse, HttpResponseBadRequest
|
||
from django.shortcuts import render, redirect
|
||
from django.views.decorators.http import require_http_methods
|
||
from django.views.decorators.csrf import csrf_protect, ensure_csrf_cookie
|
||
from django.conf import settings
|
||
|
||
from .es_client import get_user_by_username
|
||
from .crypto import get_public_key_spki_b64, rsa_oaep_decrypt_b64, aes_gcm_decrypt_b64, verify_password, generate_rsa_private_pem_b64, public_spki_b64_from_private_pem_b64, rsa_oaep_decrypt_b64_with_private_pem
|
||
from elastic.es_connect import get_registration_code, get_user_by_username as es_get_user_by_username, get_all_users as es_get_all_users, write_user_data, update_user_by_id, get_user_by_id, create_registration_code_manage_request, find_pending_registration_code_manage_request, list_registration_code_manage_requests, decide_registration_code_manage_request, get_registration_code_manage_request
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
@ensure_csrf_cookie
|
||
def login_page(request):
|
||
return render(request, "accounts/login.html")
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
@ensure_csrf_cookie
|
||
def pubkey(request):
|
||
pem_b64 = request.session.get("rsa_private_pem_b64")
|
||
if not pem_b64:
|
||
pem_b64 = generate_rsa_private_pem_b64()
|
||
request.session["rsa_private_pem_b64"] = pem_b64
|
||
pk_b64 = public_spki_b64_from_private_pem_b64(pem_b64)
|
||
return JsonResponse({"public_key_spki": pk_b64})
|
||
|
||
@require_http_methods(["GET"])
|
||
@ensure_csrf_cookie
|
||
def captcha(request):
|
||
try:
|
||
from captcha.image import ImageCaptcha
|
||
except Exception:
|
||
return JsonResponse({"ok": False, "message": "captcha unavailable"}, status=500)
|
||
code = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(5))
|
||
request.session["captcha_code"] = code
|
||
img = ImageCaptcha(width=160, height=60)
|
||
image = img.generate_image(code)
|
||
buf = io.BytesIO()
|
||
image.save(buf, format="PNG")
|
||
b64 = base64.b64encode(buf.getvalue()).decode("ascii")
|
||
return JsonResponse({"ok": True, "image_b64": b64})
|
||
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def set_session_key(request):
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except json.JSONDecodeError:
|
||
return HttpResponseBadRequest("Invalid JSON")
|
||
enc_key_b64 = payload.get("encrypted_key", "")
|
||
if not enc_key_b64:
|
||
return HttpResponseBadRequest("Missing fields")
|
||
try:
|
||
pem_b64 = request.session.get("rsa_private_pem_b64")
|
||
if not pem_b64:
|
||
return HttpResponseBadRequest("Decrypt error")
|
||
key_bytes = rsa_oaep_decrypt_b64_with_private_pem(pem_b64, enc_key_b64)
|
||
except Exception:
|
||
return HttpResponseBadRequest("Decrypt error")
|
||
request.session["session_enc_key_b64"] = base64.b64encode(key_bytes).decode("ascii")
|
||
return JsonResponse({"ok": True})
|
||
|
||
def _build_profile_context(request):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
return None
|
||
user = get_user_by_id(session_user_id)
|
||
if not user:
|
||
return None
|
||
from elastic.es_connect import search_all
|
||
from elastic.views import _attach_image_urls
|
||
raw_results = [r for r in search_all() if str(r.get("writer_id", "")) == str(session_user_id)]
|
||
achievements = _attach_image_urls(request, raw_results)
|
||
permission_name = "管理员" if int(user.get("permission", 1)) == 0 else "普通用户"
|
||
return {
|
||
"username": request.session.get("username"),
|
||
"profile_user": user,
|
||
"permission_name": permission_name,
|
||
"achievements": achievements,
|
||
}
|
||
|
||
@require_http_methods(["GET"])
|
||
@ensure_csrf_cookie
|
||
def profile_page(request):
|
||
context = _build_profile_context(request)
|
||
if context is None:
|
||
return redirect("/accounts/login/")
|
||
context["subpage"] = ""
|
||
return render(request, "accounts/profile.html", context)
|
||
|
||
@require_http_methods(["GET"])
|
||
@ensure_csrf_cookie
|
||
def profile_username_page(request):
|
||
context = _build_profile_context(request)
|
||
if context is None:
|
||
return redirect("/accounts/login/")
|
||
context["subpage"] = "username"
|
||
context["subpage_title"] = "修改用户名"
|
||
return render(request, "accounts/profile.html", context)
|
||
|
||
@require_http_methods(["GET"])
|
||
@ensure_csrf_cookie
|
||
def profile_password_page(request):
|
||
context = _build_profile_context(request)
|
||
if context is None:
|
||
return redirect("/accounts/login/")
|
||
context["subpage"] = "password"
|
||
context["subpage_title"] = "修改密码"
|
||
return render(request, "accounts/profile.html", context)
|
||
|
||
@require_http_methods(["GET"])
|
||
@ensure_csrf_cookie
|
||
def profile_registration_code_page(request):
|
||
context = _build_profile_context(request)
|
||
if context is None:
|
||
return redirect("/accounts/login/")
|
||
context["subpage"] = "registration-code"
|
||
context["subpage_title"] = "替换注册码"
|
||
return render(request, "accounts/profile.html", context)
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def secure_login_submit(request):
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except json.JSONDecodeError:
|
||
return HttpResponseBadRequest("Invalid JSON")
|
||
iv_b64 = payload.get("iv", "")
|
||
ct_b64 = payload.get("ciphertext", "")
|
||
if not iv_b64 or not ct_b64:
|
||
return HttpResponseBadRequest("Missing fields")
|
||
key_b64 = request.session.get("session_enc_key_b64")
|
||
if not key_b64:
|
||
return HttpResponseBadRequest("Session key missing")
|
||
try:
|
||
key_bytes = base64.b64decode(key_b64)
|
||
pt = aes_gcm_decrypt_b64(key_bytes, iv_b64, ct_b64)
|
||
obj = json.loads(pt.decode("utf-8"))
|
||
except Exception:
|
||
return HttpResponseBadRequest("Decrypt error")
|
||
username = (obj.get("username") or "").strip()
|
||
password = (obj.get("password") or "")
|
||
if not username or not password:
|
||
return HttpResponseBadRequest("Missing credentials")
|
||
if bool(request.session.get("login_failed_once")):
|
||
ans = (obj.get("captcha") or "").strip()
|
||
code = request.session.get("captcha_code")
|
||
if not ans or not code or ans.lower() != str(code).lower():
|
||
return JsonResponse({"ok": False, "message": "验证码错误", "captcha_required": True}, status=401)
|
||
user = get_user_by_username(username)
|
||
if not user:
|
||
request.session["login_failed_once"] = True
|
||
return JsonResponse({"ok": False, "message": "用户不存在", "captcha_required": True}, status=401)
|
||
if not verify_password(password, user.get("password_salt") or "", user.get("password_hash") or ""):
|
||
request.session["login_failed_once"] = True
|
||
return JsonResponse({"ok": False, "message": "账户或密码错误", "captcha_required": True}, status=401)
|
||
try:
|
||
request.session.cycle_key()
|
||
except Exception:
|
||
pass
|
||
request.session["user_id"] = user["user_id"]
|
||
request.session["username"] = user["username"]
|
||
try:
|
||
request.session["permission"] = int(user["permission"]) if user.get("permission") is not None else 1
|
||
except Exception:
|
||
request.session["permission"] = 1
|
||
if "session_enc_key_b64" in request.session:
|
||
del request.session["session_enc_key_b64"]
|
||
if "rsa_private_pem_b64" in request.session:
|
||
del request.session["rsa_private_pem_b64"]
|
||
if "login_failed_once" in request.session:
|
||
del request.session["login_failed_once"]
|
||
if "captcha_code" in request.session:
|
||
del request.session["captcha_code"]
|
||
return JsonResponse({"ok": True, "redirect_url": f"/main/home/?user_id={user['user_id']}"})
|
||
|
||
|
||
@require_http_methods(["GET"])
|
||
def home(request):
|
||
# Minimal placeholder page per requirement
|
||
# Ensure user_id is passed via query and session contains id
|
||
user_id = request.GET.get("user_id")
|
||
session_user_id = request.session.get("user_id")
|
||
context = {
|
||
"user_id": user_id or session_user_id,
|
||
}
|
||
return render(request, "accounts/home.html", context)
|
||
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def logout(request):
|
||
# Flush the session to clear all data and rotate the key
|
||
try:
|
||
request.session.flush()
|
||
except Exception:
|
||
pass
|
||
|
||
# Return a response that also deletes cookies client-side
|
||
resp = JsonResponse({"ok": True, "redirect_url": "/accounts/login/"})
|
||
try:
|
||
# Delete session cookie
|
||
resp.delete_cookie(
|
||
settings.SESSION_COOKIE_NAME,
|
||
path='/',
|
||
samesite=settings.SESSION_COOKIE_SAMESITE,
|
||
secure=settings.SESSION_COOKIE_SECURE,
|
||
)
|
||
# Optionally delete CSRF cookie to satisfy "清除cookie" 的要求
|
||
resp.delete_cookie(
|
||
settings.CSRF_COOKIE_NAME,
|
||
path='/',
|
||
samesite=settings.CSRF_COOKIE_SAMESITE,
|
||
secure=settings.CSRF_COOKIE_SECURE,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
return resp
|
||
|
||
@require_http_methods(["GET"])
|
||
@ensure_csrf_cookie
|
||
def register_page(request):
|
||
return render(request, "accounts/register.html")
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def register_submit(request):
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except json.JSONDecodeError:
|
||
return HttpResponseBadRequest("Invalid JSON")
|
||
code = (payload.get("code") or "").strip()
|
||
email = (payload.get("email") or "").strip()
|
||
email_code = (payload.get("email_code") or "").strip()
|
||
username = (payload.get("username") or "").strip()
|
||
password = (payload.get("password") or "")
|
||
if not email or not email_code or not username or not password:
|
||
return HttpResponseBadRequest("Missing fields")
|
||
v = request.session.get("email_verify") or {}
|
||
if (v.get("email") or "") != email:
|
||
return JsonResponse({"ok": False, "message": "请先验证邮箱"}, status=400)
|
||
try:
|
||
exp_ts = int(v.get("expires_at") or 0)
|
||
except Exception:
|
||
exp_ts = 0
|
||
if exp_ts < int(time.time()):
|
||
return JsonResponse({"ok": False, "message": "验证码已过期"}, status=400)
|
||
if (v.get("code") or "") != email_code:
|
||
return JsonResponse({"ok": False, "message": "邮箱验证码错误"}, status=400)
|
||
rc = None
|
||
if code:
|
||
rc = get_registration_code(code)
|
||
if not rc:
|
||
return JsonResponse({"ok": False, "message": "注册码无效"}, status=400)
|
||
try:
|
||
exp = rc.get("expires_at")
|
||
now = __import__("datetime").datetime.now(__import__("datetime").timezone.utc)
|
||
if hasattr(exp, 'isoformat'):
|
||
exp_dt = exp
|
||
else:
|
||
exp_dt = __import__("datetime").datetime.fromisoformat(str(exp))
|
||
if exp_dt <= now:
|
||
return JsonResponse({"ok": False, "message": "注册码已过期"}, status=400)
|
||
except Exception:
|
||
pass
|
||
existing = es_get_user_by_username(username)
|
||
if existing:
|
||
return JsonResponse({"ok": False, "message": "用户名已存在"}, status=409)
|
||
users = es_get_all_users()
|
||
next_id = (max([int(u.get("user_id", 0)) for u in users]) + 1) if users else 1
|
||
ok = write_user_data({
|
||
"user_id": next_id,
|
||
"username": username,
|
||
"password": password,
|
||
"permission": 1,
|
||
"email": email,
|
||
"key": (rc.get("keys") if rc else []) or [],
|
||
"manage_key": (rc.get("manage_keys") if rc else []) or [],
|
||
"registration_code": (rc.get("code") if rc else None),
|
||
})
|
||
if not ok:
|
||
return JsonResponse({"ok": False, "message": "注册失败"}, status=500)
|
||
try:
|
||
if "email_verify" in request.session:
|
||
del request.session["email_verify"]
|
||
except Exception:
|
||
pass
|
||
return JsonResponse({"ok": True, "redirect_url": "/accounts/login/"})
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def replace_registration_code_view(request):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
return JsonResponse({"ok": False, "message": "未登录"}, status=401)
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except json.JSONDecodeError:
|
||
return HttpResponseBadRequest("Invalid JSON")
|
||
code = (payload.get("code") or "").strip()
|
||
if not code:
|
||
return JsonResponse({"ok": False, "message": "请输入注册码"}, status=400)
|
||
rc = get_registration_code(code)
|
||
if not rc:
|
||
return JsonResponse({"ok": False, "message": "注册码无效"}, status=400)
|
||
try:
|
||
exp = rc.get("expires_at")
|
||
now = __import__("datetime").datetime.now(__import__("datetime").timezone.utc)
|
||
if hasattr(exp, 'isoformat'):
|
||
exp_dt = exp
|
||
else:
|
||
exp_dt = __import__("datetime").datetime.fromisoformat(str(exp))
|
||
if exp_dt <= now:
|
||
return JsonResponse({"ok": False, "message": "注册码已过期"}, status=400)
|
||
except Exception:
|
||
pass
|
||
keys = list(rc.get("keys") or [])
|
||
manage_keys = list(rc.get("manage_keys") or [])
|
||
ok = update_user_by_id(session_user_id, key=keys, manage_key=manage_keys, registration_code=code)
|
||
if not ok:
|
||
return JsonResponse({"ok": False, "message": "替换失败"}, status=500)
|
||
return JsonResponse({"ok": True})
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def update_profile_username_view(request):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
return JsonResponse({"ok": False, "message": "未登录"}, status=401)
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except json.JSONDecodeError:
|
||
return JsonResponse({"ok": False, "message": "JSON无效"}, status=400)
|
||
new_username = (payload.get("username") or "").strip()
|
||
if not new_username:
|
||
return JsonResponse({"ok": False, "message": "请输入用户名"}, status=400)
|
||
if len(new_username) > 50:
|
||
return JsonResponse({"ok": False, "message": "用户名过长"}, status=400)
|
||
me = get_user_by_id(session_user_id) or {}
|
||
if str(me.get("username", "")).strip() == new_username:
|
||
request.session["username"] = new_username
|
||
return JsonResponse({"ok": True, "username": new_username})
|
||
existing = es_get_user_by_username(new_username)
|
||
if existing and str(existing.get("user_id")) != str(session_user_id):
|
||
return JsonResponse({"ok": False, "message": "用户名已存在"}, status=409)
|
||
ok = update_user_by_id(session_user_id, username=new_username)
|
||
if not ok:
|
||
return JsonResponse({"ok": False, "message": "修改失败"}, status=500)
|
||
request.session["username"] = new_username
|
||
return JsonResponse({"ok": True, "username": new_username})
|
||
|
||
@require_http_methods(["GET"])
|
||
def registration_code_preview_view(request):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
return JsonResponse({"ok": False, "message": "未登录"}, status=401)
|
||
code = (request.GET.get("code") or "").strip()
|
||
if not code:
|
||
return JsonResponse({"ok": False, "message": "请输入注册码"}, status=400)
|
||
rc = get_registration_code(code)
|
||
if not rc:
|
||
return JsonResponse({"ok": False, "message": "注册码无效"}, status=400)
|
||
try:
|
||
exp = rc.get("expires_at")
|
||
now = __import__("datetime").datetime.now(__import__("datetime").timezone.utc)
|
||
if hasattr(exp, 'isoformat'):
|
||
exp_dt = exp
|
||
else:
|
||
exp_dt = __import__("datetime").datetime.fromisoformat(str(exp))
|
||
if exp_dt <= now:
|
||
return JsonResponse({"ok": False, "message": "注册码已过期"}, status=400)
|
||
except Exception:
|
||
pass
|
||
return JsonResponse(
|
||
{
|
||
"ok": True,
|
||
"data": {
|
||
"code": rc.get("code"),
|
||
"keys": list(rc.get("keys") or []),
|
||
"manage_keys": list(rc.get("manage_keys") or []),
|
||
"expires_at": rc.get("expires_at"),
|
||
},
|
||
}
|
||
)
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def submit_registration_code_request_view(request):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
return JsonResponse({"ok": False, "message": "未登录"}, status=401)
|
||
try:
|
||
perm = int(request.session.get("permission", 1))
|
||
except Exception:
|
||
perm = 1
|
||
if perm == 0:
|
||
return JsonResponse({"ok": False, "message": "无权限"}, status=403)
|
||
me = get_user_by_id(session_user_id) or {}
|
||
if (me.get("manage_key") or []) or int(me.get("can_manage_registration_codes") or 0) == 1:
|
||
return JsonResponse({"ok": False, "message": "无需申请"}, status=400)
|
||
if str(me.get("registration_code") or "").strip():
|
||
return JsonResponse({"ok": False, "message": "已有注册码,无法申请"}, status=400)
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except json.JSONDecodeError:
|
||
return HttpResponseBadRequest("Invalid JSON")
|
||
reason = (payload.get("reason") or "").strip()
|
||
if not reason:
|
||
return JsonResponse({"ok": False, "message": "请填写申请理由"}, status=400)
|
||
pending = find_pending_registration_code_manage_request(session_user_id)
|
||
if pending:
|
||
return JsonResponse({"ok": True, "message": "已提交申请"})
|
||
rid = create_registration_code_manage_request(session_user_id, me.get("username"), reason)
|
||
if not rid:
|
||
return JsonResponse({"ok": False, "message": "提交失败"}, status=500)
|
||
return JsonResponse({"ok": True})
|
||
|
||
@require_http_methods(["GET"])
|
||
@ensure_csrf_cookie
|
||
def registration_code_requests_page(request):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
return redirect("/accounts/login/")
|
||
try:
|
||
perm = int(request.session.get("permission", 1))
|
||
except Exception:
|
||
perm = 1
|
||
if perm != 0:
|
||
return redirect("/main/home/")
|
||
me = get_user_by_id(session_user_id) or {}
|
||
return render(request, "accounts/registration_code_requests.html", {"username": me.get("username")})
|
||
|
||
@require_http_methods(["GET"])
|
||
def list_registration_code_requests_view(request):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
return JsonResponse({"ok": False, "message": "未登录"}, status=401)
|
||
try:
|
||
perm = int(request.session.get("permission", 1))
|
||
except Exception:
|
||
perm = 1
|
||
if perm != 0:
|
||
return JsonResponse({"ok": False, "message": "无权限"}, status=403)
|
||
status = (request.GET.get("status") or "").strip() or None
|
||
data = list_registration_code_manage_requests(status=status)
|
||
return JsonResponse({"ok": True, "data": data})
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def decide_registration_code_request_view(request):
|
||
session_user_id = request.session.get("user_id")
|
||
if session_user_id is None:
|
||
return JsonResponse({"ok": False, "message": "未登录"}, status=401)
|
||
try:
|
||
perm = int(request.session.get("permission", 1))
|
||
except Exception:
|
||
perm = 1
|
||
if perm != 0:
|
||
return JsonResponse({"ok": False, "message": "无权限"}, status=403)
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except json.JSONDecodeError:
|
||
return HttpResponseBadRequest("Invalid JSON")
|
||
request_id = (payload.get("request_id") or "").strip()
|
||
action = (payload.get("action") or "").strip().lower()
|
||
note = (payload.get("note") or "").strip()
|
||
if not request_id or action not in ("approve", "reject"):
|
||
return JsonResponse({"ok": False, "message": "参数错误"}, status=400)
|
||
req = get_registration_code_manage_request(request_id)
|
||
if not req:
|
||
return JsonResponse({"ok": False, "message": "申请不存在"}, status=404)
|
||
status = "approved" if action == "approve" else "rejected"
|
||
ok = decide_registration_code_manage_request(request_id, status=status, reviewed_by=session_user_id, reviewer_note=note)
|
||
if not ok:
|
||
return JsonResponse({"ok": False, "message": "操作失败"}, status=500)
|
||
if status == "approved":
|
||
uid = req.get("user_id")
|
||
update_user_by_id(uid, can_manage_registration_codes=1, registration_manage_keys=[])
|
||
return JsonResponse({"ok": True})
|
||
|
||
@require_http_methods(["POST"])
|
||
@csrf_protect
|
||
def send_email_code(request):
|
||
try:
|
||
payload = json.loads(request.body.decode("utf-8"))
|
||
except json.JSONDecodeError:
|
||
return HttpResponseBadRequest("Invalid JSON")
|
||
email = (payload.get("email") or "").strip()
|
||
if not email:
|
||
return HttpResponseBadRequest("Missing email")
|
||
if "@" not in email:
|
||
return JsonResponse({"ok": False, "message": "邮箱格式不正确"}, status=400)
|
||
verify_code = "".join(random.choice(string.digits) for _ in range(6))
|
||
ttl = int(os.environ.get("SMTP_CODE_TTL", "600") or 600)
|
||
request.session["email_verify"] = {"email": email, "code": verify_code, "expires_at": int(time.time()) + max(60, ttl)}
|
||
ok, msg = _send_smtp_email(email, verify_code)
|
||
if not ok:
|
||
return JsonResponse({"ok": False, "message": msg or "验证码发送失败"}, status=500)
|
||
return JsonResponse({"ok": True})
|
||
|
||
def _send_smtp_email(to_email: str, code: str):
|
||
host = os.environ.get("SMTP_HOST", "")
|
||
port_raw = os.environ.get("SMTP_PORT", "")
|
||
try:
|
||
port = int(port_raw) if port_raw else 0
|
||
except Exception:
|
||
port = 0
|
||
user = os.environ.get("SMTP_USERNAME") or os.environ.get("SMTP_USER") or ""
|
||
password = os.environ.get("SMTP_PASSWORD", "")
|
||
use_tls = str(os.environ.get("SMTP_USE_TLS", "")).lower() in ("1", "true", "yes")
|
||
use_ssl = str(os.environ.get("SMTP_USE_SSL", "")).lower() in ("1", "true", "yes")
|
||
sender = os.environ.get("SMTP_FROM_EMAIL") or os.environ.get("SMTP_FROM") or user or ""
|
||
subject = os.environ.get("SMTP_SUBJECT") or "邮箱验证码"
|
||
if not host or not port or not sender:
|
||
return False, "缺少SMTP配置"
|
||
body = f"您的验证码是:{code},10分钟内有效。"
|
||
msg = f"From: {sender}\r\nTo: {to_email}\r\nSubject: {subject}\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n{body}"
|
||
try:
|
||
if use_ssl:
|
||
server = smtplib.SMTP_SSL(host, port)
|
||
else:
|
||
server = smtplib.SMTP(host, port)
|
||
server.ehlo()
|
||
if use_tls and not use_ssl:
|
||
server.starttls()
|
||
server.ehlo()
|
||
if user and password:
|
||
server.login(user, password)
|
||
server.sendmail(sender, [to_email], msg.encode("utf-8"))
|
||
try:
|
||
server.quit()
|
||
except Exception:
|
||
try:
|
||
server.close()
|
||
except Exception:
|
||
pass
|
||
return True, ""
|
||
except Exception as e:
|
||
return False, str(e)
|