2023-11-10 20:51:09 +08:00

44 lines
1.3 KiB
Python

from typing import Any
from fastapi import APIRouter, BackgroundTasks, Request, Depends
from app import schemas
from app.chain.webhook import WebhookChain
from app.core.config import settings
from app.core.security import verify_uri_token
router = APIRouter()
def start_webhook_chain(body: Any, form: Any, args: Any):
"""
启动链式任务
"""
WebhookChain().message(body=body, form=form, args=args)
@router.post("/", summary="Webhook消息响应", response_model=schemas.Response)
async def webhook_message(background_tasks: BackgroundTasks,
request: Request,
_: str = Depends(verify_uri_token)
) -> Any:
"""
Webhook响应
"""
body = await request.body()
form = await request.form()
args = request.query_params
background_tasks.add_task(start_webhook_chain, body, form, args)
return schemas.Response(success=True)
@router.get("/", summary="Webhook消息响应", response_model=schemas.Response)
async def webhook_message(background_tasks: BackgroundTasks,
request: Request, _: str = Depends(verify_uri_token)) -> Any:
"""
Webhook响应
"""
args = request.query_params
background_tasks.add_task(start_webhook_chain, None, None, args)
return schemas.Response(success=True)