fix user auth
This commit is contained in:
@ -1,6 +1,13 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Union, Optional
|
||||
import jwt
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
from passlib.context import CryptContext
|
||||
from app.core.config import settings
|
||||
@ -37,7 +44,7 @@ def get_password_hash(password: str) -> str:
|
||||
return pwd_context.hash(password)
|
||||
|
||||
|
||||
def decrypt(data, key) -> Optional[bytes]:
|
||||
def decrypt(data: bytes, key: bytes) -> Optional[bytes]:
|
||||
"""
|
||||
解密二进制数据
|
||||
"""
|
||||
@ -47,3 +54,81 @@ def decrypt(data, key) -> Optional[bytes]:
|
||||
except Exception as e:
|
||||
print(str(e))
|
||||
return None
|
||||
|
||||
|
||||
def encrypt_message(message: str, key: bytes):
|
||||
"""
|
||||
使用给定的key对消息进行加密,并返回加密后的字符串
|
||||
"""
|
||||
f = Fernet(key)
|
||||
encrypted_message = f.encrypt(message.encode())
|
||||
return encrypted_message.decode()
|
||||
|
||||
|
||||
def hash_sha256(message):
|
||||
"""
|
||||
对字符串做hash运算
|
||||
"""
|
||||
return hashlib.sha256(message.encode()).hexdigest()
|
||||
|
||||
|
||||
def aes_decrypt(data, key):
|
||||
"""
|
||||
AES解密
|
||||
"""
|
||||
if not data:
|
||||
return ""
|
||||
data = base64.b64decode(data)
|
||||
iv = data[:16]
|
||||
encrypted = data[16:]
|
||||
# 使用AES-256-CBC解密
|
||||
cipher = AES.new(key.encode('utf-8'), AES.MODE_CBC, iv)
|
||||
result = cipher.decrypt(encrypted)
|
||||
# 去除填充
|
||||
padding = result[-1]
|
||||
if padding < 1 or padding > AES.block_size:
|
||||
return ""
|
||||
result = result[:-padding]
|
||||
return result.decode('utf-8')
|
||||
|
||||
|
||||
def aes_encrypt(data, key):
|
||||
"""
|
||||
AES加密
|
||||
"""
|
||||
if not data:
|
||||
return ""
|
||||
# 使用AES-256-CBC加密
|
||||
cipher = AES.new(key.encode('utf-8'), AES.MODE_CBC)
|
||||
# 填充
|
||||
padding = AES.block_size - len(data) % AES.block_size
|
||||
data += chr(padding) * padding
|
||||
result = cipher.encrypt(data.encode('utf-8'))
|
||||
# 使用base64编码
|
||||
return base64.b64encode(cipher.iv + result).decode('utf-8')
|
||||
|
||||
|
||||
def nexusphp_encrypt(data_str: str, key):
|
||||
"""
|
||||
NexusPHP加密
|
||||
"""
|
||||
# 生成16字节长的随机字符串
|
||||
iv = os.urandom(16)
|
||||
# 对向量进行 Base64 编码
|
||||
iv_base64 = base64.b64encode(iv)
|
||||
# 加密数据
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
ciphertext = cipher.encrypt(pad(data_str.encode(), AES.block_size))
|
||||
ciphertext_base64 = base64.b64encode(ciphertext)
|
||||
# 对向量的字符串表示进行签名
|
||||
mac = hmac.new(key, msg=iv_base64 + ciphertext_base64, digestmod=hashlib.sha256).hexdigest()
|
||||
# 构造 JSON 字符串
|
||||
json_str = json.dumps({
|
||||
'iv': iv_base64.decode(),
|
||||
'value': ciphertext_base64.decode(),
|
||||
'mac': mac,
|
||||
'tag': ''
|
||||
})
|
||||
|
||||
# 对 JSON 字符串进行 Base64 编码
|
||||
return base64.b64encode(json_str.encode()).decode()
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -52,6 +52,8 @@ def start_module():
|
||||
Scheduler()
|
||||
# 启动事件消费
|
||||
Command()
|
||||
# 用户认证
|
||||
SitesHelper().check_user()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -27,6 +27,8 @@ class EventType(Enum):
|
||||
class SystemConfigKey(Enum):
|
||||
# 用户已安装的插件
|
||||
UserInstalledPlugins = "UserInstalledPlugins"
|
||||
# 用户认证参数
|
||||
UserSiteAuthParams = "UserSiteAuthParams"
|
||||
|
||||
|
||||
# 站点框架
|
||||
|
Reference in New Issue
Block a user