Merge pull request #2518 from InfinityPacer/main

This commit is contained in:
jxxghp 2024-07-07 08:58:39 +08:00 committed by GitHub
commit 14e28ed693
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 209 additions and 18 deletions

View File

@ -224,6 +224,8 @@ class Settings(BaseSettings):
PLUGIN_MARKET: str = "https://github.com/jxxghp/MoviePilot-Plugins,https://github.com/thsrite/MoviePilot-Plugins,https://github.com/honue/MoviePilot-Plugins,https://github.com/InfinityPacer/MoviePilot-Plugins"
# Github token提高请求api限流阈值 ghp_****
GITHUB_TOKEN: Optional[str] = None
# 指定的仓库Github token多个仓库使用,分隔,格式:{user1}/{repo1}:ghp_****,{user2}/{repo2}:github_pat_****
GITHUB_TOKEN_FOR_REPO: Optional[str] = None
# Github代理服务器格式https://mirror.ghproxy.com/
GITHUB_PROXY: Optional[str] = ''
# 自动检查和更新站点资源包(站点索引、认证等)
@ -362,6 +364,37 @@ class Settings(BaseSettings):
}
return {}
def GITHUB_HEADERS_FOR_REPO(self, repo: str = None):
"""
Github指定的仓库请求头
:param repo: 指定的仓库名称格式为 "user/repo"如果为空或者没有找到指定仓库请求头则返回默认的请求头信息
:return: Github请求头
"""
# 如果没有传入指定的仓库名称或没有配置指定的仓库Token则返回默认的请求头信息
if not repo or not self.GITHUB_TOKEN_FOR_REPO:
return self.GITHUB_HEADERS
headers = {}
# 格式:{user1}/{repo1}:ghp_****,{user2}/{repo2}:github_pat_****
token_pairs = self.GITHUB_TOKEN_FOR_REPO.split(",")
for token_pair in token_pairs:
try:
parts = token_pair.split(":")
if len(parts) != 2:
print(f"无效的令牌格式: {token_pair}")
continue
repo_info = parts[0].strip()
token = parts[1].strip()
if not repo_info or not token:
print(f"无效的令牌或仓库信息: {token_pair}")
continue
headers[repo_info] = {
"Authorization": f"Bearer {token}"
}
except Exception as e:
print(f"处理令牌对 '{token_pair}' 时出错: {e}")
# 如果传入了指定的仓库名称,则返回该仓库的请求头信息,否则返回默认请求头
return headers.get(repo, self.GITHUB_HEADERS)
@property
def DEFAULT_DOWNLOADER(self):
"""

View File

@ -2,11 +2,12 @@ import concurrent
import concurrent.futures
import importlib.util
import inspect
import os
import threading
import time
import traceback
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
@ -21,6 +22,7 @@ from app.helper.plugin import PluginHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.crypto import RSAUtils
from app.utils.object import ObjectUtils
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
@ -159,11 +161,12 @@ class PluginManager(metaclass=Singleton):
if pid and plugin_id != pid:
continue
try:
# 如果插件具有认证级别且当前认证级别不足,则不进行实例化
if hasattr(plugin, "auth_level"):
plugin.auth_level = plugin.auth_level
if self.siteshelper.auth_level < plugin.auth_level:
continue
# 判断插件是否满足认证要求,如不满足则不进行实例化
if not self.__set_and_check_auth_level(plugin=plugin):
# 如果是插件热更新实例,这里则进行替换
if plugin_id in self._plugins:
self._plugins[plugin_id] = plugin
continue
# 存储Class
self._plugins[plugin_id] = plugin
# 未安装的不加载
@ -601,11 +604,12 @@ class PluginManager(metaclass=Singleton):
if plugin_obj and hasattr(plugin_obj, "get_page"):
if ObjectUtils.check_method(plugin_obj.get_page):
plugin.has_page = True
# 公钥
if plugin_info.get("key"):
plugin.plugin_public_key = plugin_info.get("key")
# 权限
if plugin_info.get("level"):
plugin.auth_level = plugin_info.get("level")
if self.siteshelper.auth_level < plugin.auth_level:
continue
if not self.__set_and_check_auth_level(plugin=plugin, source=plugin_info):
continue
# 名称
if plugin_info.get("name"):
plugin.plugin_name = plugin_info.get("name")
@ -708,11 +712,12 @@ class PluginManager(metaclass=Singleton):
plugin.has_page = True
else:
plugin.has_page = False
# 公钥
if hasattr(plugin_class, "plugin_public_key"):
plugin.plugin_public_key = plugin_class.plugin_public_key
# 权限
if hasattr(plugin_class, "auth_level"):
plugin.auth_level = plugin_class.auth_level
if self.siteshelper.auth_level < plugin.auth_level:
continue
if not self.__set_and_check_auth_level(plugin=plugin, source=plugin_class):
continue
# 名称
if hasattr(plugin_class, "plugin_name"):
plugin.plugin_name = plugin_class.plugin_name
@ -762,3 +767,57 @@ class PluginManager(metaclass=Singleton):
except Exception as e:
logger.debug(f"获取插件是否在本地包中存在失败,{e}")
return False
def __set_and_check_auth_level(self, plugin: Union[schemas.Plugin, Type[Any]],
source: Optional[Union[dict, Type[Any]]] = None) -> bool:
"""
设置并检查插件的认证级别
:param plugin: 插件对象或包含 auth_level 属性的对象
:param source: 可选的字典对象或类对象可能包含 "level" "auth_level"
:return: 如果插件的认证级别有效且当前环境的认证级别满足要求返回 True否则返回 False
"""
# 检查并赋值 source 中的 level 或 auth_level
if source:
if isinstance(source, dict) and "level" in source:
plugin.auth_level = source.get("level")
elif hasattr(source, "auth_level"):
plugin.auth_level = source.auth_level
# 如果 source 为空且 plugin 本身没有 auth_level直接返回 True
elif not hasattr(plugin, "auth_level"):
return True
# auth_level 级别说明
# 1 - 所有用户可见
# 2 - 站点认证用户可见
# 3 - 站点&密钥认证可见
# 99 - 站点&特殊密钥认证可见
# 如果当前站点认证级别大于 1 且插件级别为 99并存在插件公钥说明为特殊密钥认证通过密钥匹配进行认证
if self.siteshelper.auth_level > 1 and plugin.auth_level == 99 and hasattr(plugin, "plugin_public_key"):
plugin_id = plugin.id if isinstance(plugin, schemas.Plugin) else plugin.__name__
public_key = plugin.plugin_public_key
if public_key:
private_key = PluginManager.__get_plugin_private_key(plugin_id)
verify = RSAUtils.verify_rsa_keys(public_key=public_key, private_key=private_key)
return verify
# 如果当前站点认证级别小于插件级别,则返回 False
if self.siteshelper.auth_level < plugin.auth_level:
return False
return True
@staticmethod
def __get_plugin_private_key(plugin_id: str) -> Optional[str]:
"""
根据插件标识获取对应的私钥
:param plugin_id: 插件标识
:return: 对应的插件私钥如果未找到则返回 None
"""
try:
# 将插件标识转换为大写并构建环境变量名称
env_var_name = f"PLUGIN_{plugin_id.upper()}_PRIVATE_KEY"
private_key = os.environ.get(env_var_name)
if private_key is None:
logger.debug(f"环境变量 {env_var_name} 未找到。")
return private_key
except Exception as e:
logger.debug(f"获取插件 {plugin_id} 的私钥时发生错误:{e}")
return None

View File

@ -51,7 +51,8 @@ class PluginHelper(metaclass=Singleton):
if not user or not repo:
return {}
raw_url = self._base_url % (user, repo)
res = RequestUtils(proxies=self.proxies, headers=settings.GITHUB_HEADERS,
res = RequestUtils(proxies=self.proxies,
headers=settings.GITHUB_HEADERS_FOR_REPO(repo=f"{user}/{repo}"),
timeout=10).get_res(f"{raw_url}package.json")
if res:
try:
@ -137,12 +138,16 @@ class PluginHelper(metaclass=Singleton):
if not user or not repo:
return False, "不支持的插件仓库地址格式"
user_repo = f"{user}/{repo}"
def __get_filelist(_p: str) -> Tuple[Optional[list], Optional[str]]:
"""
获取插件的文件列表
"""
file_api = f"https://api.github.com/repos/{user}/{repo}/contents/plugins/{_p}"
r = RequestUtils(proxies=settings.PROXY, headers=settings.GITHUB_HEADERS, timeout=30).get_res(file_api)
file_api = f"https://api.github.com/repos/{user_repo}/contents/plugins/{_p}"
r = RequestUtils(proxies=settings.PROXY,
headers=settings.GITHUB_HEADERS_FOR_REPO(repo=user_repo),
timeout=30).get_res(file_api)
if r is None:
return None, "连接仓库失败"
elif r.status_code != 200:
@ -164,7 +169,8 @@ class PluginHelper(metaclass=Singleton):
download_url = f"{settings.GITHUB_PROXY}{item.get('download_url')}"
# 下载插件文件
res = RequestUtils(proxies=self.proxies,
headers=settings.GITHUB_HEADERS, timeout=60).get_res(download_url)
headers=settings.GITHUB_HEADERS_FOR_REPO(repo=user_repo),
timeout=60).get_res(download_url)
if not res:
return False, f"文件 {item.get('name')} 下载失败!"
elif res.status_code != 200:

View File

@ -46,6 +46,8 @@ class Plugin(BaseModel):
history: Optional[dict] = {}
# 添加时间,值越小表示越靠后发布
add_time: Optional[int] = 0
# 插件公钥
plugin_public_key: Optional[str] = None
class PluginDashboard(Plugin):

91
app/utils/crypto.py Normal file
View File

@ -0,0 +1,91 @@
import base64
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding
class RSAUtils:
@staticmethod
def generate_rsa_key_pair() -> (str, str):
"""
生成RSA密钥对并返回Base64编码的公钥和私钥DER格式
:return: Tuple containing Base64 encoded public key and private key
"""
# 生成RSA密钥对
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
public_key = private_key.public_key()
# 导出私钥为DER格式
private_key_der = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# 导出公钥为DER格式
public_key_der = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# 将DER格式的密钥编码为Base64
private_key_b64 = base64.b64encode(private_key_der).decode('utf-8')
public_key_b64 = base64.b64encode(public_key_der).decode('utf-8')
return private_key_b64, public_key_b64
@staticmethod
def verify_rsa_keys(private_key: str, public_key: str) -> bool:
"""
使用 RSA 验证公钥和私钥是否匹配
:param private_key: 私钥字符串 (Base64 编码无标识符)
:param public_key: 公钥字符串 (Base64 编码无标识符)
:return: 如果匹配则返回 True否则返回 False
"""
if not private_key or not public_key:
return False
try:
# 解码 Base64 编码的公钥和私钥
public_key_bytes = base64.b64decode(public_key)
private_key_bytes = base64.b64decode(private_key)
# 加载公钥
public_key = serialization.load_der_public_key(public_key_bytes, backend=default_backend())
# 加载私钥
private_key = serialization.load_der_private_key(private_key_bytes, password=None,
backend=default_backend())
# 测试加解密
message = b'test'
encrypted_message = public_key.encrypt(
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
decrypted_message = private_key.decrypt(
encrypted_message,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return message == decrypted_message
except Exception as e:
print(f"RSA 密钥验证失败: {e}")
return False