2023-09-03 18:36:06 +08:00

201 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import tailer
from datetime import datetime
from typing import Union
from fastapi import APIRouter, HTTPException, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from app import schemas
from app.chain.search import SearchChain
from app.core.config import settings
from app.core.security import verify_token
from app.db import get_db
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.message import MessageHelper
from app.helper.progress import ProgressHelper
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from version import APP_VERSION
router = APIRouter()
@router.get("/env", summary="查询系统环境变量", response_model=schemas.Response)
def get_setting(_: schemas.TokenPayload = Depends(verify_token)):
"""
查询系统环境变量,包括当前版本号
"""
info = settings.dict(
exclude={"SECRET_KEY", "SUPERUSER_PASSWORD", "API_TOKEN"}
)
info.update({
"VERSION": APP_VERSION
})
return schemas.Response(success=True,
data=info)
@router.get("/progress/{process_type}", summary="实时进度")
def get_progress(process_type: str, token: str):
"""
实时获取处理进度返回格式为SSE
"""
if not token or not verify_token(token):
raise HTTPException(
status_code=403,
detail="认证失败!",
)
progress = ProgressHelper()
def event_generator():
while True:
detail = progress.get(process_type)
yield 'data: %s\n\n' % json.dumps(detail)
time.sleep(0.2)
return StreamingResponse(event_generator(), media_type="text/event-stream")
@router.get("/setting/{key}", summary="查询系统设置", response_model=schemas.Response)
def get_setting(key: str,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(verify_token)):
"""
查询系统设置
"""
return schemas.Response(success=True, data={
"value": SystemConfigOper(db).get(key)
})
@router.post("/setting/{key}", summary="更新系统设置", response_model=schemas.Response)
def set_setting(key: str, value: Union[list, dict, str, int] = None,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(verify_token)):
"""
更新系统设置
"""
SystemConfigOper(db).set(key, value)
return schemas.Response(success=True)
@router.get("/message", summary="实时消息")
def get_progress(token: str):
"""
实时获取系统消息返回格式为SSE
"""
if not token or not verify_token(token):
raise HTTPException(
status_code=403,
detail="认证失败!",
)
message = MessageHelper()
def event_generator():
while True:
detail = message.get()
yield 'data: %s\n\n' % (detail or '')
time.sleep(3)
return StreamingResponse(event_generator(), media_type="text/event-stream")
@router.get("/logging", summary="实时日志")
def get_logging(token: str):
"""
实时获取系统日志返回格式为SSE
"""
if not token or not verify_token(token):
raise HTTPException(
status_code=403,
detail="认证失败!",
)
def log_generator():
log_path = settings.LOG_PATH / 'moviepilot.log'
# 读取文件末尾50行不使用tailer模块
with open(log_path, 'r', encoding='utf-8') as f:
for line in f.readlines()[-50:]:
yield 'data: %s\n\n' % line
while True:
for text in tailer.follow(open(log_path, 'r', encoding='utf-8')):
yield 'data: %s\n\n' % (text or '')
time.sleep(1)
return StreamingResponse(log_generator(), media_type="text/event-stream")
@router.get("/nettest", summary="测试网络连通性")
def nettest(url: str,
proxy: bool,
_: schemas.TokenPayload = Depends(verify_token)):
"""
测试网络连通性
"""
# 记录开始的毫秒数
start_time = datetime.now()
url = url.replace("{TMDBAPIKEY}", settings.TMDB_API_KEY)
result = RequestUtils(proxies=settings.PROXY if proxy else None,
ua=settings.USER_AGENT).get_res(url)
# 计时结束的毫秒数
end_time = datetime.now()
# 计算相关秒数
if result and result.status_code == 200:
return schemas.Response(success=True, data={
"time": round((end_time - start_time).microseconds / 1000)
})
elif result:
return schemas.Response(success=False, message=f"错误码:{result.status_code}", data={
"time": round((end_time - start_time).microseconds / 1000)
})
else:
return schemas.Response(success=False, message="网络连接失败!")
@router.get("/versions", summary="查询Github所有Release版本", response_model=schemas.Response)
def latest_version(_: schemas.TokenPayload = Depends(verify_token)):
"""
查询Github所有Release版本
"""
version_res = RequestUtils().get_res(f"https://api.github.com/repos/jxxghp/MoviePilot/releases")
if version_res:
ver_json = version_res.json()
if ver_json:
return schemas.Response(success=True, data=ver_json)
return schemas.Response(success=False)
@router.get("/ruletest", summary="过滤规则测试", response_model=schemas.Response)
def ruletest(title: str,
subtitle: str = None,
ruletype: str = None,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(verify_token)):
"""
过滤规则测试,规则类型 1-订阅2-洗版
"""
torrent = schemas.TorrentInfo(
title=title,
description=subtitle,
)
if ruletype == "2":
rule_string = SystemConfigOper(db).get(SystemConfigKey.FilterRules2)
else:
rule_string = SystemConfigOper(db).get(SystemConfigKey.FilterRules)
if not rule_string:
return schemas.Response(success=False, message="过滤规则未设置!")
# 过滤
result = SearchChain(db).filter_torrents(rule_string=rule_string,
torrent_list=[torrent])
if not result:
return schemas.Response(success=False, message="不符合过滤规则!")
return schemas.Response(success=True, data={
"priority": 100 - result[0].pri_order + 1
})