2023-07-26 17:47:27 +08:00

528 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import traceback
from datetime import datetime, timedelta
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing.pool import ThreadPool
from typing import Any, List, Dict, Tuple, Optional
from urllib.parse import urljoin
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from ruamel.yaml import CommentedMap
from app import schemas
from app.core.config import settings
from app.core.event import EventManager, eventmanager, Event
from app.helper.browser import PlaywrightHelper
from app.helper.cloudflare import under_challenge
from app.helper.module import ModuleHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.plugins import _PluginBase
from app.schemas.types import EventType, NotificationType
from app.utils.http import RequestUtils
from app.utils.site import SiteUtils
from app.utils.string import StringUtils
from app.utils.timer import TimerUtils
class AutoSignIn(_PluginBase):
# 插件名称
plugin_name = "站点自动签到"
# 插件描述
plugin_desc = "自动模拟登录站点并签到。"
# 插件图标
plugin_icon = "signin.png"
# 主题色
plugin_color = "#4179F4"
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "autosignin_"
# 加载顺序
plugin_order = 0
# 可使用的用户级别
auth_level = 2
# 私有属性
sites: SitesHelper = None
# 事件管理器
event: EventManager = None
# 定时器
_scheduler: Optional[BackgroundScheduler] = None
# 加载的模块
_site_schema: list = []
# 配置属性
_enabled: bool = False
_cron: str = ""
_notify: bool = False
_queue_cnt: int = 5
_sign_sites: list = []
def init_plugin(self, config: dict = None):
self.sites = SitesHelper()
self.event = EventManager()
# 停止现有任务
self.stop_service()
# 配置
if config:
self._enabled = config.get("enabled")
self._cron = config.get("cron")
self._notify = config.get("notify")
self._queue_cnt = config.get("queue_cnt")
self._sign_sites = config.get("sign_sites")
# 加载模块
if self._enabled:
self._site_schema = ModuleHelper.load('app.plugins.autosignin.sites',
filter_func=lambda _, obj: hasattr(obj, 'match'))
# 定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
if self._cron:
try:
self._scheduler.add_job(func=self.sign_in,
trigger=CronTrigger.from_crontab(self._cron),
name="站点自动签到")
except Exception as err:
logger.error(f"定时任务配置错误:{err}")
# 推送实时消息
self.systemmessage.put(f"执行周期配置错误:{err}")
else:
# 随机时间
triggers = TimerUtils.random_scheduler(num_executions=2,
begin_hour=9,
end_hour=23,
max_interval=12 * 60,
min_interval=6 * 60)
for trigger in triggers:
self._scheduler.add_job(self.sign_in, "cron",
hour=trigger.hour, minute=trigger.minute,
name="站点自动签到")
# 启动任务
if self._scheduler.get_jobs():
self._scheduler.print_jobs()
self._scheduler.start()
@staticmethod
def get_command() -> List[Dict[str, Any]]:
"""
定义远程控制命令
:return: 命令关键字、事件、描述、附带数据
"""
return [{
"cmd": "/site_signin",
"event": EventType.SiteSignin,
"desc": "站点签到",
"data": {}
}]
def get_api(self) -> List[Dict[str, Any]]:
"""
获取插件API
[{
"path": "/xx",
"endpoint": self.xxx,
"methods": ["GET", "POST"],
"summary": "API说明"
}]
"""
return [{
"path": "/signin_by_domain",
"endpoint": self.signin_by_domain,
"methods": ["GET"],
"summary": "站点签到",
"description": "使用站点域名签到站点",
}]
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
# 站点的可选项
site_options = [{"title": site.get("name"), "value": site.get("id")}
for site in self.sites.get_indexers()]
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'notify',
'label': '发送通知',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'cron',
'label': '执行周期',
'placeholder': '5位cron表达式留空自动'
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'queue_cnt',
'label': '队列数量'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'content': [
{
'component': 'VSelect',
'props': {
'chips': True,
'multiple': True,
'model': 'sign_sites',
'label': '签到站点',
'items': site_options
}
}
]
}
]
}
]
}
], {
"enabled": False,
"notify": True,
"cron": "",
"queue_cnt": 5,
"sign_sites": []
}
def get_page(self) -> List[dict]:
"""
拼装插件详情页面,需要返回页面配置,同时附带数据
"""
# 最近两天的日期数组
date_list = [(datetime.now() - timedelta(days=i)).date() for i in range(2)]
# 最近一天的签到数据
current_day = ""
sign_data = []
for day in date_list:
current_day = f"{day.month}{day.day}"
sign_data = self.get_data(current_day)
if sign_data:
break
if sign_data:
contents = [
{
'component': 'tr',
'props': {
'class': 'text-sm'
},
'content': [
{
'component': 'td',
'props': {
'class': 'whitespace-nowrap break-keep'
},
'text': current_day
},
{
'component': 'td',
'text': data.get("site")
},
{
'component': 'td',
'text': data.get("status")
}
]
} for data in sign_data
]
else:
contents = [
{
'component': 'tr',
'props': {
'class': 'text-sm'
},
'content': [
{
'component': 'td',
'props': {
'colspan': 3,
'class': 'text-center'
},
'text': '暂无数据'
}
]
}
]
return [
{
'component': 'VTable',
'props': {
'hover': True
},
'content': [
{
'component': 'thead',
'content': [
{
'component': 'th',
'text': '日期'
},
{
'component': 'th',
'text': '站点'
},
{
'component': 'th',
'text': '状态'
}
]
},
{
'component': 'tbody',
'content': contents
}
]
}
]
@eventmanager.register(EventType.SiteSignin)
def sign_in(self, event: Event = None):
"""
自动签到
"""
if event:
logger.info("收到命令,开始站点签到 ...")
self.post_message(channel=event.event_data.get("channel"),
title="开始站点签到 ...",
userid=event.event_data.get("user"))
# 查询签到站点
sign_sites = [site for site in self.sites.get_indexers() if not site.get("public")]
# 过滤掉没有选中的站点
if self._sign_sites:
sign_sites = [site for site in sign_sites if site.get("id") in self._sign_sites]
if not sign_sites:
logger.info("没有需要签到的站点")
return
# 执行签到
logger.info("开始执行签到任务 ...")
with ThreadPool(min(len(sign_sites), self._queue_cnt)) as p:
status = p.map(self.signin_site, sign_sites)
if status:
logger.info("站点签到任务完成!")
# 获取今天的日期
key = f"{datetime.now().month}{datetime.now().day}"
# 保存数据
self.save_data(key, [{
"site": s[0],
"status": s[1]
} for s in status])
# 发送通知
if self._notify:
self.post_message(title="站点自动签到",
mtype=NotificationType.SiteMessage,
text="\n".join([f'{s[0]}{s[1]}' for s in status if s]))
if event:
self.post_message(channel=event.event_data.get("channel"),
title="站点签到完成!", userid=event.event_data.get("user"))
else:
logger.error("站点签到任务失败!")
if event:
self.post_message(channel=event.event_data.get("channel"),
title="站点签到任务失败!", userid=event.event_data.get("user"))
def __build_class(self, url) -> Any:
for site_schema in self._site_schema:
try:
if site_schema.match(url):
return site_schema
except Exception as e:
logger.error("站点模块加载失败:%s" % str(e))
return None
def signin_by_domain(self, url: str) -> schemas.Response:
"""
签到一个站点可由API调用
"""
domain = StringUtils.get_url_domain(url)
site_info = self.sites.get_indexer(domain)
if not site_info:
return schemas.Response(
success=True,
message=f"站点【{url}】不存在"
)
else:
return schemas.Response(
success=True,
message=self.signin_site(site_info)
)
def signin_site(self, site_info: CommentedMap) -> Tuple[str, str]:
"""
签到一个站点
"""
site_module = self.__build_class(site_info.get("url"))
if site_module and hasattr(site_module, "signin"):
try:
_, msg = site_module().signin(site_info)
# 特殊站点直接返回签到信息,防止仿真签到、模拟登陆有歧义
return site_info.get("name"), msg or ""
except Exception as e:
traceback.print_exc()
return site_info.get("name"), f"签到失败:{str(e)}"
else:
return site_info.get("name"), self.__signin_base(site_info)
@staticmethod
def __signin_base(site_info: CommentedMap) -> str:
"""
通用签到处理
:param site_info: 站点信息
:return: 签到结果信息
"""
if not site_info:
return ""
site = site_info.get("name")
site_url = site_info.get("url")
site_cookie = site_info.get("cookie")
ua = site_info.get("ua")
render = site_info.get("render")
proxies = settings.PROXY if site_info.get("proxy") else None
proxy_server = settings.PROXY_SERVER if site_info.get("proxy") else None
if not site_url or not site_cookie:
logger.warn(f"未配置 {site} 的站点地址或Cookie无法签到")
return ""
# 模拟登录
try:
# 访问链接
checkin_url = site_url
if site_url.find("attendance.php") == -1:
# 拼登签到地址
checkin_url = urljoin(site_url, "attendance.php")
logger.info(f"开始站点签到:{site},地址:{checkin_url}...")
if render:
page_source = PlaywrightHelper().get_page_source(url=checkin_url,
cookies=site_cookie,
ua=ua,
proxies=proxy_server)
if not SiteUtils.is_logged_in(page_source):
if under_challenge(page_source):
return f"无法通过Cloudflare"
return f"仿真登录失败Cookie已失效"
else:
res = RequestUtils(cookies=site_cookie,
ua=ua,
proxies=proxies
).get_res(url=checkin_url)
if not res and site_url != checkin_url:
logger.info(f"开始站点模拟登录:{site},地址:{site_url}...")
res = RequestUtils(cookies=site_cookie,
ua=ua,
proxies=proxies
).get_res(url=site_url)
# 判断登录状态
if res and res.status_code in [200, 500, 403]:
if not SiteUtils.is_logged_in(res.text):
if under_challenge(res.text):
msg = "站点被Cloudflare防护请打开站点浏览器仿真"
elif res.status_code == 200:
msg = "Cookie已失效"
else:
msg = f"状态码:{res.status_code}"
logger.warn(f"{site} 签到失败,{msg}")
return f"签到失败,{msg}"
else:
logger.info(f"{site} 签到成功")
return f"签到成功"
elif res is not None:
logger.warn(f"{site} 签到失败,状态码:{res.status_code}")
return f"签到失败,状态码:{res.status_code}"
else:
logger.warn(f"{site} 签到失败,无法打开网站")
return f"签到失败,无法打开网站!"
except Exception as e:
logger.warn("%s 签到失败:%s" % (site, str(e)))
traceback.print_exc()
return f"签到失败:{str(e)}"
def stop_service(self):
"""
退出插件
"""
try:
if self._scheduler:
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._scheduler.shutdown()
self._scheduler = None
except Exception as e:
logger.error("退出插件失败:%s" % str(e))