后台任务
这对于用户可以快速得到响应,但无需等待操作完成的场景非常有用。典型场景包括:
- 在复杂系统中进行用户设置,你可以稍后通过电子邮件通知用户和其他人其账户已创建完成
- 需要大量时间的批处理过程(例如批量发送电子邮件或 API 调用)
- 任何其他可以通过电子邮件、websocket、webhook 或弹窗稍后通知用户的过程
FastHTML 中的后台任务构建于 Starlette 的后台任务之上,并增加了一些便利性。Starlette 的后台任务设计是对 Python 的 async 和 threading 库的易用封装。后台任务能让应用对最终用户的响应更快捷,并通常能提高应用的运行速度。
一个简单的后台任务示例
在此示例中,我们通过 `background` 参数将一个任务附加到 `FtResponse` 上。当页面被访问时,它会几乎立即显示“简单的后台任务示例”,同时在终端中会从 0 开始缓慢向上计数。
main.py
from fasthtml.common import *
from starlette.background import BackgroundTask
from time import sleep
app, rt = fast_app()
1def counter(loops:int):
"Slowly print integers to the terminal"
for i in range(loops):
print(i)
sleep(i)
@rt
def index():
2 task = BackgroundTask(counter, loops=5)
3 return Titled('Simple Background Task Example'), task
serve()- 1
-
counter是我们的任务函数。它本身没什么特别之处,但一个好的实践是使其参数可以序列化为 JSON。 - 2
-
我们使用
starlette.background.BackgroundTask将counter()转换为后台任务。 - 3
- 要将后台任务添加到处理函数中,我们将其添加到响应顶层的返回值中。
一个更实际的例子
让我们想象一下,我们正在访问一个处理缓慢但至关重要的服务。我们不希望用户必须等待。虽然我们可以设置 SSE 在完成时通知用户,但我们决定改为定期检查其记录状态是否已更改。
模拟慢速 API 服务
首先,创建一个非常简单的慢速时间戳 API。它所做的只是将请求延迟几秒钟,然后返回包含时间戳的 JSON。
# slow_api.py
from fasthtml.common import *
from time import sleep, time
app, rt = fast_app()
@rt('/slow')
def slow(ts: int):
1 sleep(3)
2 return dict(request_time=ts, response_time=int(time()))
serve(port=8123)- 1
- 这代表了慢速处理过程。
- 2
- 返回任务的原始时间戳和完成后的时间
主 FastHTML 应用
现在,让我们创建一个面向用户的应用,该应用使用此 API 从这个极其缓慢的服务中获取时间戳。
# main.py
from fasthtml.common import *
from starlette.background import BackgroundTask
import time
import httpx
app, rt = fast_app()
db = database(':memory:')
1class TStamp: request_time: int; response_time: int
tstamps = db.create(TStamp, pk='request_time')
2def task_submit(request_time: int):
client = httpx.Client()
3 response = client.post(f'http://127.0.0.1:8123/slow?ts={request_time}')
4 tstamps.insert(**response.json())
@rt
def submit():
"Route that initiates a background task and returns immediately."
request_time = int(time.time())
5 task = BackgroundTask(task_submit, request_time=request_time)
6 return P(f'Request submitted at: {request_time}'), task
@rt
7def show_tstamps(): return Ul(map(Li, tstamps()))
@rt
def index():
return Titled('Background Task Dashboard',
8 P(Button('Press to call slow service',
hx_post=submit, hx_target='#res')),
H2('Responses from Tasks'),
P('', id='res'),
Div(Ul(map(Li, tstamps())),
9 hx_get=show_tstamps, hx_trigger='every 5s'),
)
serve()- 1
- 跟踪请求发送和响应接收的时间
- 2
- 在路由处理函数的后台运行的、调用慢速服务的任务函数。通常的做法是给任务函数加上“task_”前缀,但这不是必须的。
- 3
- 调用慢速 API 服务(模拟一个耗时的操作)
- 4
- 将两个时间戳都存储到我们的数据库中
- 5
- 通过将函数及任何参数传递给 `BackgroundTask` 对象来创建后台任务。
- 6
- 在 `FtResponse` 中,使用 `background` 关键字参数来设置在 HTTP 响应生成后运行的任务。
- 7
- 显示所有已记录时间戳对的端点。
- 8
- 当按下此按钮时,“submit”处理函数将立即响应。`task_submit` 函数将在稍后把慢速 API 的响应插入数据库。
- 9
- 每 5 秒获取一次存储在数据库中的时间戳。
在上面的示例中,我们在同步处理函数的 FtResponse 中设置了一个同步的后台任务函数。但是,我们也可以使用异步函数和异步处理函数。
在处理函数中处理多个后台任务
可以向一个 `FtResponse` 添加多个后台任务。
一个后台任务中的多个任务是按顺序执行的。如果某个任务引发了异常,其后的任务将没有机会被执行。
from starlette.background import BackgroundTasks
@rt
async def signup(email, username):
tasks = BackgroundTasks()
tasks.add_task(send_welcome_email, to_address=email)
tasks.add_task(send_admin_notification, username=username)
return Titled('Signup successful!'), tasks
async def send_welcome_email(to_address):
...
async def send_admin_notification(username):
...大规模后台任务
后台任务通过异步处理阻塞过程来提升用户和应用的性能,即使这些任务被定义为同步函数也是如此。
当 FastHTML 的后台任务不够用,并且你的应用在服务器上运行缓慢时,一个选择是手动将进程卸载到 multiprocessing 库。通过这样做,你可以利用多个核心并绕过 GIL,以增加复杂性为代价,显著提高速度和性能。
有时服务器会达到其处理极限,这时就像 Celery 和 Dramatiq 这样的分布式任务队列系统就派上用场了。它们被设计用来将任务分配到多台服务器上,提供更好的可观察性、重试机制和持久性,但代价是复杂性大大增加。
然而,大多数应用使用像 FastHTML 中那样的内置后台任务就能很好地工作,我们建议首先尝试这种方式。编写这些函数时使用 JSON 可序列化的参数,可以确保在需要时能直接转换为其他并发方法。