修改登录逻辑,使用RSA-OAEP 包裹每会话独立 AES-GCM 密钥 + 加密提交凭据
This commit is contained in:
@@ -1,5 +1,23 @@
|
||||
import hashlib
|
||||
import hmac
|
||||
import os
|
||||
import base64
|
||||
from typing import Tuple
|
||||
|
||||
try:
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
||||
from cryptography.hazmat.primitives import serialization, hashes
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
except Exception:
|
||||
rsa = None
|
||||
padding = None
|
||||
serialization = None
|
||||
hashes = None
|
||||
Cipher = None
|
||||
algorithms = None
|
||||
modes = None
|
||||
default_backend = None
|
||||
|
||||
|
||||
def salt_for_username(username: str) -> bytes:
|
||||
@@ -17,4 +35,59 @@ def derive_password(password_plain: str, salt: bytes, iterations: int = 100_000,
|
||||
|
||||
def hmac_sha256(key: bytes, message: bytes) -> bytes:
|
||||
"""Compute HMAC-SHA256 signature for the given message using key bytes."""
|
||||
return hmac.new(key, message, hashlib.sha256).digest()
|
||||
return hmac.new(key, message, hashlib.sha256).digest()
|
||||
|
||||
|
||||
_RSA_PRIVATE = None
|
||||
_RSA_PUBLIC = None
|
||||
|
||||
def _ensure_rsa_keys():
|
||||
global _RSA_PRIVATE, _RSA_PUBLIC
|
||||
if _RSA_PRIVATE is None:
|
||||
if rsa is None:
|
||||
raise RuntimeError("cryptography library is required for RSA operations")
|
||||
_RSA_PRIVATE = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
||||
_RSA_PUBLIC = _RSA_PRIVATE.public_key()
|
||||
|
||||
def get_public_key_spki_b64() -> str:
|
||||
_ensure_rsa_keys()
|
||||
spki = _RSA_PUBLIC.public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo)
|
||||
return base64.b64encode(spki).decode('ascii')
|
||||
|
||||
def rsa_oaep_decrypt_b64(ciphertext_b64: str) -> bytes:
|
||||
_ensure_rsa_keys()
|
||||
ct = base64.b64decode(ciphertext_b64)
|
||||
return _RSA_PRIVATE.decrypt(ct, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
|
||||
|
||||
def aes_gcm_decrypt_b64(key_bytes: bytes, iv_b64: str, ciphertext_b64: str) -> bytes:
|
||||
if Cipher is None:
|
||||
raise RuntimeError("cryptography library is required for AES operations")
|
||||
iv = base64.b64decode(iv_b64)
|
||||
data = base64.b64decode(ciphertext_b64)
|
||||
if len(data) < 16:
|
||||
raise ValueError("ciphertext too short")
|
||||
ct = data[:-16]
|
||||
tag = data[-16:]
|
||||
decryptor = Cipher(algorithms.AES(key_bytes), modes.GCM(iv, tag), backend=default_backend()).decryptor()
|
||||
pt = decryptor.update(ct) + decryptor.finalize()
|
||||
return pt
|
||||
|
||||
def gen_salt(length: int = 16) -> bytes:
|
||||
return os.urandom(length)
|
||||
|
||||
def hash_password_with_salt(password_plain: str, salt: bytes, iterations: int = 200_000, dklen: int = 32) -> bytes:
|
||||
return hashlib.pbkdf2_hmac('sha256', password_plain.encode('utf-8'), salt, iterations, dklen=dklen)
|
||||
|
||||
def hash_password_random_salt(password_plain: str) -> Tuple[str, str]:
|
||||
salt = gen_salt(16)
|
||||
h = hash_password_with_salt(password_plain, salt)
|
||||
return base64.b64encode(salt).decode('ascii'), base64.b64encode(h).decode('ascii')
|
||||
|
||||
def verify_password(password_plain: str, salt_b64: str, hash_b64: str) -> bool:
|
||||
try:
|
||||
salt = base64.b64decode(salt_b64)
|
||||
expected = base64.b64decode(hash_b64)
|
||||
actual = hash_password_with_salt(password_plain, salt)
|
||||
return hmac.compare_digest(actual, expected)
|
||||
except Exception:
|
||||
return False
|
||||
Reference in New Issue
Block a user