support message sse

This commit is contained in:
jxxghp 2023-07-04 20:14:31 +08:00
parent 609c820584
commit 01c88c7dc8
2 changed files with 50 additions and 2 deletions

View File

@ -1,21 +1,29 @@
import asyncio
import json
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from app import schemas
from app.core.config import settings
from app.core.security import verify_token
from app.helper.message import MessageHelper
from app.helper.progress import ProgressHelper
router = APIRouter()
@router.get("/progress/{process_type}", summary="实时进度")
async def get_progress(process_type: str, _: schemas.TokenPayload = Depends(verify_token)):
async def get_progress(process_type: str, token: str):
"""
实时获取处理进度返回格式为SSE
"""
if not token or not verify_token(token):
raise HTTPException(
status_code=403,
detail="认证失败!",
)
progress = ProgressHelper()
async def event_generator():
@ -25,3 +33,25 @@ async def get_progress(process_type: str, _: schemas.TokenPayload = Depends(veri
await asyncio.sleep(0.2)
return StreamingResponse(event_generator(), media_type="text/event-stream")
@router.get("/message", summary="实时消息")
async def get_progress(token: str):
"""
实时获取系统消息返回格式为SSE
"""
if not token or not verify_token(token):
raise HTTPException(
status_code=403,
detail="认证失败!",
)
message = MessageHelper()
async def event_generator():
while True:
detail = message.get()
yield 'data: %s\n\n' % detail
await asyncio.sleep(1)
return StreamingResponse(event_generator(), media_type="text/event-stream")

18
app/helper/message.py Normal file
View File

@ -0,0 +1,18 @@
import queue
class MessageHelper:
"""
消息队列管理器
"""
def __init__(self):
self.queue = queue.Queue()
def put(self, message: str):
self.queue.put(message)
def get(self):
if not self.queue.empty():
return self.queue.get(block=True)
else:
return None