import time
from IPython import display
from enum import Enum
from pprint import pprint
from fastcore.test import *
from starlette.testclient import TestClient
from starlette.requests import Headers
from starlette.datastructures import UploadFile核心
Starlette 的 FastHTML 子类,以及它自动使用的 RouterX 和 RouteX 类。这是 fasthtml 的源代码。除非您想了解其内部构建原理,或需要某个特定 API 的完整详细信息,否则无需阅读此文档。该 notebook 使用 nbdev 转换为 Python 模块 fasthtml/core.py。
导入和工具函数
我们先编写源代码,然后再编写测试。测试既是确认代码有效性的手段,也作为实际可用的示例。第一个导出的函数 parsed_date 就是这种模式的一个例子。
parsed_date (解析日期)
parsed_date (s:str)
将 s 转换为 datetime 对象
parsed_date('2pm')datetime.datetime(2025, 7, 2, 14, 0)
isinstance(date.fromtimestamp(0), date)True
snake2hyphens (蛇形命名转连字符)
snake2hyphens (s:str)
将 s 从蛇形命名法(snake case)转换为连字符连接且首字母大写的形式
snake2hyphens("snake_case")'Snake-Case'
HtmxHeaders (Htmx 响应头)
HtmxHeaders (boosted:str|None=None, current_url:str|None=None, history_restore_request:str|None=None, prompt:str|None=None, request:str|None=None, target:str|None=None, trigger_name:str|None=None, trigger:str|None=None)
def test_request(url: str='/', headers: dict={}, method: str='get') -> Request:
scope = {
'type': 'http',
'method': method,
'path': url,
'headers': Headers(headers).raw,
'query_string': b'',
'scheme': 'http',
'client': ('127.0.0.1', 8000),
'server': ('127.0.0.1', 8000),
}
receive = lambda: {"body": b"", "more_body": False}
return Request(scope, receive)h = test_request(headers=Headers({'HX-Request':'1'}))
_get_htmx(h.headers)HtmxHeaders(boosted=None, current_url=None, history_restore_request=None, prompt=None, request='1', target=None, trigger_name=None, trigger=None)
请求和响应
test_eq(_fix_anno(Union[str,None], 'a'), 'a')
test_eq(_fix_anno(float, 0.9), 0.9)
test_eq(_fix_anno(int, '1'), 1)
test_eq(_fix_anno(int, ['1','2']), 2)
test_eq(_fix_anno(list[int], ['1','2']), [1,2])
test_eq(_fix_anno(list[int], '1'), [1])d = dict(k=int, l=List[int])
test_eq(_form_arg('k', "1", d), 1)
test_eq(_form_arg('l', "1", d), [1])
test_eq(_form_arg('l', ["1","2"], d), [1,2])HttpHeader (HTTP 响应头)
HttpHeader (k:str, v:str)
_to_htmx_header('trigger_after_settle')'HX-Trigger-After-Settle'
HtmxResponseHeaders (Htmx 响应头)
HtmxResponseHeaders (location=None, push_url=None, redirect=None, refresh=None, replace_url=None, reswap=None, retarget=None, reselect=None, trigger=None, trigger_after_settle=None, trigger_after_swap=None)
HTMX 响应头
HtmxResponseHeaders(trigger_after_settle='hi')HttpHeader(k='HX-Trigger-After-Settle', v='hi')
form2dict (表单转字典)
form2dict (form:starlette.datastructures.FormData)
将 starlette 的表单数据转换为字典
d = [('a',1),('a',2),('b',0)]
fd = FormData(d)
res = form2dict(fd)
test_eq(res['a'], [1,2])
test_eq(res['b'], 0)parse_form (解析表单)
parse_form (req:starlette.requests.Request)
Starlette 在处理空的多部分表单(multipart forms)时会出错,因此这个函数会检查这种情况
JSONResponse (JSON 响应)
JSONResponse (content:Any, status_code:int=200, headers:collections.abc.Mapping[str,str]|None=None, media_type:str|None=None, background:starlette.background.BackgroundTask|None=None)
与 starlette 的版本相同,但会自动将非序列化类型转换为字符串
async def f(req):
def _f(p:HttpHeader): ...
p = first(_params(_f).values())
result = await _from_body(req, p)
return JSONResponse(result.__dict__)
client = TestClient(Starlette(routes=[Route('/', f, methods=['POST'])]))
d = dict(k='value1',v=['value2','value3'])
response = client.post('/', data=d)
print(response.json()){'k': 'value1', 'v': 'value3'}
async def f(req): return Response(str(req.query_params.getlist('x')))
client = TestClient(Starlette(routes=[Route('/', f, methods=['GET'])]))
client.get('/?x=1&x=2').text"['1', '2']"
def g(req, this:Starlette, a:str, b:HttpHeader): ...
async def f(req):
a = await _wrap_req(req, _params(g))
return Response(str(a))
client = TestClient(Starlette(routes=[Route('/', f, methods=['POST'])]))
response = client.post('/?a=1', data=d)
print(response.text)[<starlette.requests.Request object>, <starlette.applications.Starlette object>, '1', HttpHeader(k='value1', v='value3')]
def g(req, this:Starlette, a:str, b:HttpHeader): ...
async def f(req):
a = await _wrap_req(req, _params(g))
return Response(str(a))
client = TestClient(Starlette(routes=[Route('/', f, methods=['POST'])]))
response = client.post('/?a=1', data=d)
print(response.text)[<starlette.requests.Request object>, <starlette.applications.Starlette object>, '1', HttpHeader(k='value1', v='value3')]
缺失的请求参数
如果一个请求参数有默认值(例如 a:str=''),即使用户请求中没有包含该参数,请求也是有效的。
def g(req, this:Starlette, a:str=''): ...
async def f(req):
a = await _wrap_req(req, _params(g))
return Response(str(a))
client = TestClient(Starlette(routes=[Route('/', f, methods=['POST'])]))
response = client.post('/', json={}) # no param in request
print(response.text)[<starlette.requests.Request object>, <starlette.applications.Starlette object>, '']
如果我们移除默认值并重新运行请求,应该会得到以下错误:Missing required field: a (缺少必需字段: a)。
def g(req, this:Starlette, a:str): ...
async def f(req):
a = await _wrap_req(req, _params(g))
return Response(str(a))
client = TestClient(Starlette(routes=[Route('/', f, methods=['POST'])]))
response = client.post('/', json={}) # no param in request
print(response.text)Missing required field: a
flat_xt (扁平化 xt)
flat_xt (lst)
扁平化列表
x = ft('a',1)
test_eq(flat_xt([x, x, [x,x]]), (x,)*4)
test_eq(flat_xt(x), (x,))Beforeware (前置件)
Beforeware (f, skip=None)
初始化 self。请使用 help(type(self)) 查看准确的签名。
Websockets / SSE
def on_receive(self, msg:str): return f"Message text was: {msg}"
c = _ws_endp(on_receive)
cli = TestClient(Starlette(routes=[WebSocketRoute('/', _ws_endp(on_receive))]))
with cli.websocket_connect('/') as ws:
ws.send_text('{"msg":"Hi!"}')
data = ws.receive_text()
assert data == 'Message text was: Hi!'EventStream (事件流)
EventStream (s)
从 s 创建一个 text/event-stream 响应
signal_shutdown (关闭信号)
signal_shutdown ()
路由和应用
uri
uri (_arg, **kwargs)
decode_uri (解码 URI)
decode_uri (s)
StringConvertor.to_string (字符串转换器.to_string)
StringConvertor.to_string (value:str)
HTTPConnection.url_path_for (HTTP连接.url_path_for)
HTTPConnection.url_path_for (name:str, **path_params)
flat_tuple (扁平化元组)
flat_tuple (o)
扁平化列表
noop_body (空操作响应体)
noop_body (c, req)
默认的响应体包装函数,仅返回内容
respond (响应)
respond (req, heads, bdy)
默认的 FT 响应创建函数
如果 HX-Request 响应头存在且 HX-History-Restore-Request 响应头不存在,则渲染页面片段。
is_full_page (是否为完整页面)
is_full_page (req, resp)
Redirect (重定向)
Redirect (loc)
根据需要使用 HTMX 或 Starlette 的 RedirectResponse 重定向到 loc
FastHTML 的 exts 参数支持以下内容
print(' '.join(htmx_exts))morph head-support preload class-tools loading-states multi-swap path-deps remove-me ws chunked-transfer
get_key (获取键)
get_key (key=None, fname='.sesskey')
qp (查询参数)
qp (p:str, **kw)
将参数 kw 添加到路径 p
qp 将查询参数添加到路由路径字符串中
vals = {'a':5, 'b':False, 'c':[1,2], 'd':'bar', 'e':None, 'ab':42}res = qp('/foo', **vals)
test_eq(res, '/foo?a=5&b=&c=1&c=2&d=bar&e=&ab=42')qp 检查每个参数应作为查询参数还是作为路由的一部分发送,并进行正确编码。
path = '/foo/{a}/{d}/{ab:int}'
res = qp(path, **vals)
test_eq(res, '/foo/5/bar/42?b=&c=1&c=2&e=')def_hdrs (默认响应头)
def_hdrs (htmx=True, surreal=True)
FastHTML 应用的默认响应头
FastHTML
FastHTML (debug=False, routes=None, middleware=None, title:str='FastHTML page', exception_handlers=None, on_startup=None, on_shutdown=None, lifespan=None, hdrs=None, ftrs=None, exts=None, before=None, after=None, surreal=True, htmx=True, default_hdrs=True, sess_cls=<class 'starlette.middleware.sessions.SessionMiddleware'>, secret_key=None, session_cookie='session_', max_age=31536000, sess_path='/', same_site='lax', sess_https_only=False, sess_domain=None, key_fname='.sesskey', body_wrap=<function noop_body>, htmlkw=None, nb_hdrs=False, canonical=True, **bodykw)
创建一个 Starlette 应用。
FastHTML.add_route (FastHTML.添加路由)
FastHTML.add_route (route)
FastHTML.ws (FastHTML.ws)
FastHTML.ws (path:str, conn=None, disconn=None, name=None, middleware=None)
在 path 处添加一个 websocket 路由
nested_name (嵌套名称)
nested_name (f)
*获取函数 f 的名称,使用 '_' 连接嵌套函数名*
def f():
def g(): ...
return gfunc = f()
nested_name(func)'f_g'
FastHTML.route (FastHTML.路由)
FastHTML.route (path:str=None, methods=None, name=None, include_in_schema=True, body_wrap=None)
在 path 处添加一个路由
app = FastHTML()
@app.get
def foo(a:str, b:list[int]): ...
foo.to(a='bar', b=[1,2])'/foo?a=bar&b=1&b=2'
@app.get('/foo/{a}')
def foo(a:str, b:list[int]): ...
foo.to(a='bar', b=[1,2])'/foo/bar?b=1&b=2'
FastHTML.set_lifespan (FastHTML.设置生命周期)
FastHTML.set_lifespan (value)
serve (启动服务)
serve (appname=None, app='app', host='0.0.0.0', port=None, reload=True, reload_includes:list[str]|str|None=None, reload_excludes:list[str]|str|None=None)
在一个异步服务器中运行应用,默认启用实时重新加载。
| 类型 | 默认值 | 详情 | |
|---|---|---|---|
| appname | NoneType | None | 模块的名称 |
| app | str | app | 要运行的应用实例 |
| host | str | 0.0.0.0 | 如果 host 是 0.0.0.0,将转换为 localhost |
| port | NoneType | None | 如果 port 为 None,将默认为 5001 或 PORT 环境变量的值 |
| reload | bool | True | 默认在代码更改时重新加载应用 |
| reload_includes | list[str] | str | None | None | 需要监视其更改的额外文件 |
| reload_excludes | list[str] | str | None | None | 要忽略其更改的文件 |
Client (客户端)
Client (app, url='http://testserver')
一个简单的、不需要 async 的 httpx ASGI 客户端
app = FastHTML(routes=[Route('/', lambda _: Response('test'))])
cli = Client(app)
cli.get('/').text'test'
请注意,您也可以使用 Starlette 的 TestClient 来代替 FastHTML 的 Client。它们在很大程度上应该是可以互换的。
FastHTML 测试
def get_cli(app): return app,TestClient(app),app.routeapp,cli,rt = get_cli(FastHTML(secret_key='soopersecret'))app,cli,rt = get_cli(FastHTML(title="My Custom Title"))
@app.get
def foo(): return Div("Hello World")
print(app.routes)
response = cli.get('/foo')
assert '<title>My Custom Title</title>' in response.text
foo.to(param='value')[Route(path='/foo', name='foo', methods=['GET', 'HEAD'])]
'/foo?param=value'
app,cli,rt = get_cli(FastHTML())
@rt('/xt2')
def get(): return H1('bar')
txt = cli.get('/xt2').text
assert '<title>FastHTML page</title>' in txt and '<h1>bar</h1>' in txt and '<html>' in txt@rt("/hi")
def get(): return 'Hi there'
r = cli.get('/hi')
r.text'Hi there'
@rt("/hi")
def post(): return 'Postal'
cli.post('/hi').text'Postal'
@app.get("/hostie")
def show_host(req): return req.headers['host']
cli.get('/hostie').text'testserver'
@app.get("/setsess")
def set_sess(session):
session['foo'] = 'bar'
return 'ok'
@app.ws("/ws")
def ws(self, msg:str, ws:WebSocket, session): return f"Message text was: {msg} with session {session.get('foo')}, from client: {ws.client}"
cli.get('/setsess')
with cli.websocket_connect('/ws') as ws:
ws.send_text('{"msg":"Hi!"}')
data = ws.receive_text()
assert 'Message text was: Hi! with session bar' in data
print(data)Message text was: Hi! with session bar, from client: Address(host='testclient', port=50000)
@rt
def yoyo(): return 'a yoyo'
cli.post('/yoyo').text'a yoyo'
@app.get
def autopost(): return Html(Div('Text.', hx_post=yoyo()))
print(cli.get('/autopost').text) <!doctype html>
<html>
<div hx-post="a yoyo">Text.</div>
</html>
@app.get
def autopost2(): return Html(Body(Div('Text.', cls='px-2', hx_post=show_host.to(a='b'))))
print(cli.get('/autopost2').text) <!doctype html>
<html>
<body>
<div class="px-2" hx-post="/hostie?a=b">Text.</div>
</body>
</html>
@app.get
def autoget2(): return Html(Div('Text.', hx_get=show_host))
print(cli.get('/autoget2').text) <!doctype html>
<html>
<div hx-get="/hostie">Text.</div>
</html>
@rt('/user/{nm}', name='gday')
def get(nm:str=''): return f"Good day to you, {nm}!"
cli.get('/user/Alexis').text'Good day to you, Alexis!'
@app.get
def autolink(): return Html(Div('Text.', link=uri('gday', nm='Alexis')))
print(cli.get('/autolink').text) <!doctype html>
<html>
<div href="/user/Alexis">Text.</div>
</html>
@rt('/link')
def get(req): return f"{req.url_for('gday', nm='Alexis')}; {req.url_for('show_host')}"
cli.get('/link').text'http://testserver/user/Alexis; http://testserver/hostie'
@app.get("/background")
async def background_task(request):
async def long_running_task():
await asyncio.sleep(0.1)
print("Background task completed!")
return P("Task started"), BackgroundTask(long_running_task)
response = cli.get("/background")Background task completed!
test_eq(app.router.url_path_for('gday', nm='Jeremy'), '/user/Jeremy')hxhdr = {'headers':{'hx-request':"1"}}
@rt('/ft')
def get(): return Title('Foo'),H1('bar')
txt = cli.get('/ft').text
assert '<title>Foo</title>' in txt and '<h1>bar</h1>' in txt and '<html>' in txt
@rt('/xt2')
def get(): return H1('bar')
txt = cli.get('/xt2').text
assert '<title>FastHTML page</title>' in txt and '<h1>bar</h1>' in txt and '<html>' in txt
assert cli.get('/xt2', **hxhdr).text.strip() == '<h1>bar</h1>'
@rt('/xt3')
def get(): return Html(Head(Title('hi')), Body(P('there')))
txt = cli.get('/xt3').text
assert '<title>FastHTML page</title>' not in txt and '<title>hi</title>' in txt and '<p>there</p>' in txt@rt('/oops')
def get(nope): return nope
test_warns(lambda: cli.get('/oops?nope=1'))def test_r(cli, path, exp, meth='get', hx=False, **kwargs):
if hx: kwargs['headers'] = {'hx-request':"1"}
test_eq(getattr(cli, meth)(path, **kwargs).text, exp)
ModelName = str_enum('ModelName', "alexnet", "resnet", "lenet")
fake_db = [{"name": "Foo"}, {"name": "Bar"}]@rt('/html/{idx}')
async def get(idx:int): return Body(H4(f'Next is {idx+1}.'))@rt("/models/{nm}")
def get(nm:ModelName): return nm
@rt("/files/{path}")
async def get(path: Path): return path.with_suffix('.txt')
@rt("/items/")
def get(idx:int|None = 0): return fake_db[idx]
@rt("/idxl/")
def get(idx:list[int]): return str(idx)r = cli.get('/html/1', headers={'hx-request':"1"})
assert '<h4>Next is 2.</h4>' in r.text
test_r(cli, '/models/alexnet', 'alexnet')
test_r(cli, '/files/foo', 'foo.txt')
test_r(cli, '/items/?idx=1', '{"name":"Bar"}')
test_r(cli, '/items/', '{"name":"Foo"}')
assert cli.get('/items/?idx=g').text=='404 Not Found'
assert cli.get('/items/?idx=g').status_code == 404
test_r(cli, '/idxl/?idx=1&idx=2', '[1, 2]')
assert cli.get('/idxl/?idx=1&idx=g').status_code == 404app = FastHTML()
rt = app.route
cli = TestClient(app)
@app.route(r'/static/{path:path}.jpg')
def index(path:str): return f'got {path}'
@app.route(r'/static/{path:path}')
def foo(path:str, a:int): return f'also got {path},{a}'
cli.get('/static/sub/a.b.jpg').text'got sub/a.b'
cli.get('/static/sub/a.b?a=1').text'also got sub/a.b,1'
app.chk = 'foo'@app.get("/booly/")
def _(coming:bool=True): return 'Coming' if coming else 'Not coming'
@app.get("/datie/")
def _(d:parsed_date): return d
@app.get("/ua")
async def _(user_agent:str): return user_agent
@app.get("/hxtest")
def _(htmx): return htmx.request
@app.get("/hxtest2")
def _(foo:HtmxHeaders, req): return foo.request
@app.get("/app")
def _(app): return app.chk
@app.get("/app2")
def _(foo:FastHTML): return foo.chk,HttpHeader("mykey", "myval")
@app.get("/app3")
def _(foo:FastHTML): return HtmxResponseHeaders(location="http://example.org")
@app.get("/app4")
def _(foo:FastHTML): return Redirect("http://example.org")test_r(cli, '/booly/?coming=true', 'Coming')
test_r(cli, '/booly/?coming=no', 'Not coming')
date_str = "17th of May, 2024, 2p"
test_r(cli, f'/datie/?d={date_str}', '2024-05-17 14:00:00')
test_r(cli, '/ua', 'FastHTML', headers={'User-Agent':'FastHTML'})
test_r(cli, '/hxtest' , '1', headers={'HX-Request':'1'})
test_r(cli, '/hxtest2', '1', headers={'HX-Request':'1'})
test_r(cli, '/app' , 'foo')r = cli.get('/app2', **hxhdr)
test_eq(r.text, 'foo')
test_eq(r.headers['mykey'], 'myval')r = cli.get('/app3')
test_eq(r.headers['HX-Location'], 'http://example.org')r = cli.get('/app4', follow_redirects=False)
test_eq(r.status_code, 303)r = cli.get('/app4', headers={'HX-Request':'1'})
test_eq(r.headers['HX-Redirect'], 'http://example.org')@rt
def meta():
return ((Title('hi'),H1('hi')),
(Meta(property='image'), Meta(property='site_name'))
)
t = cli.post('/meta').text
assert re.search(r'<body>\s*<h1>hi</h1>\s*</body>', t)
assert '<meta' in t@app.post('/profile/me')
def profile_update(username: str): return username
test_r(cli, '/profile/me', 'Alexis', 'post', data={'username' : 'Alexis'})
test_r(cli, '/profile/me', 'Missing required field: username', 'post', data={})# Example post request with parameter that has a default value
@app.post('/pet/dog')
def pet_dog(dogname: str = None): return dogname
# Working post request with optional parameter
test_r(cli, '/pet/dog', '', 'post', data={})@dataclass
class Bodie: a:int;b:str
@rt("/bodie/{nm}")
def post(nm:str, data:Bodie):
res = asdict(data)
res['nm'] = nm
return res
@app.post("/bodied/")
def bodied(data:dict): return data
nt = namedtuple('Bodient', ['a','b'])
@app.post("/bodient/")
def bodient(data:nt): return asdict(data)
class BodieTD(TypedDict): a:int;b:str='foo'
@app.post("/bodietd/")
def bodient(data:BodieTD): return data
class Bodie2:
a:int|None; b:str
def __init__(self, a, b='foo'): store_attr()
@rt("/bodie2/", methods=['get','post'])
def bodie(d:Bodie2): return f"a: {d.a}; b: {d.b}"from fasthtml.xtend import Titledd = dict(a=1, b='foo')
test_r(cli, '/bodie/me', '{"a":1,"b":"foo","nm":"me"}', 'post', data=dict(a=1, b='foo', nm='me'))
test_r(cli, '/bodied/', '{"a":"1","b":"foo"}', 'post', data=d)
test_r(cli, '/bodie2/', 'a: 1; b: foo', 'post', data={'a':1})
test_r(cli, '/bodie2/?a=1&b=foo&nm=me', 'a: 1; b: foo')
test_r(cli, '/bodient/', '{"a":"1","b":"foo"}', 'post', data=d)
test_r(cli, '/bodietd/', '{"a":1,"b":"foo"}', 'post', data=d)# Testing POST with Content-Type: application/json
@app.post("/")
def index(it: Bodie): return Titled("It worked!", P(f"{it.a}, {it.b}"))
s = json.dumps({"b": "Lorem", "a": 15})
response = cli.post('/', headers={"Content-Type": "application/json"}, data=s).text
assert "<title>It worked!</title>" in response and "<p>15, Lorem</p>" in response# Testing POST with Content-Type: application/json
@app.post("/bodytext")
def index(body): return body
response = cli.post('/bodytext', headers={"Content-Type": "application/json"}, data=s).text
test_eq(response, '{"b": "Lorem", "a": 15}')files = [ ('files', ('file1.txt', b'content1')),
('files', ('file2.txt', b'content2')) ]@rt("/uploads")
async def post(files:list[UploadFile]):
return ','.join([(await file.read()).decode() for file in files])
res = cli.post('/uploads', files=files)
print(res.status_code)
print(res.text)200
content1,content2
res = cli.post('/uploads', files=[files[0]])
print(res.status_code)
print(res.text)200
content1
@rt("/setsess")
def get(sess, foo:str=''):
now = datetime.now()
sess['auth'] = str(now)
return f'Set to {now}'
@rt("/getsess")
def get(sess): return f'Session time: {sess["auth"]}'
print(cli.get('/setsess').text)
time.sleep(0.01)
cli.get('/getsess').textSet to 2025-05-29 08:31:48.235262
'Session time: 2025-05-29 08:31:48.235262'
@rt("/sess-first")
def post(sess, name: str):
sess["name"] = name
return str(sess)
cli.post('/sess-first', data={'name': 2})
@rt("/getsess-all")
def get(sess): return sess['name']
test_eq(cli.get('/getsess-all').text, '2')@rt("/upload")
async def post(uf:UploadFile): return (await uf.read()).decode()
with open('../../CHANGELOG.md', 'rb') as f:
print(cli.post('/upload', files={'uf':f}, data={'msg':'Hello'}).text[:15])# Release notes
@rt("/form-submit/{list_id}")
def options(list_id: str):
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'POST',
'Access-Control-Allow-Headers': '*',
}
return Response(status_code=200, headers=headers)h = cli.options('/form-submit/2').headers
test_eq(h['Access-Control-Allow-Methods'], 'POST')from fasthtml.authmw import user_pwd_authdef _not_found(req, exc): return Div('nope')
app,cli,rt = get_cli(FastHTML(exception_handlers={404:_not_found}))
txt = cli.get('/').text
assert '<div>nope</div>' in txt
assert '<!doctype html>' in txtapp,cli,rt = get_cli(FastHTML())
@rt("/{name}/{age}")
def get(name: str, age: int):
return Titled(f"Hello {name.title()}, age {age}")
assert '<title>Hello Uma, age 5</title>' in cli.get('/uma/5').text
assert '404 Not Found' in cli.get('/uma/five').textauth = user_pwd_auth(testuser='spycraft')
app,cli,rt = get_cli(FastHTML(middleware=[auth]))
@rt("/locked")
def get(auth): return 'Hello, ' + auth
test_eq(cli.get('/locked').text, 'not authenticated')
test_eq(cli.get('/locked', auth=("testuser","spycraft")).text, 'Hello, testuser')auth = user_pwd_auth(testuser='spycraft')
app,cli,rt = get_cli(FastHTML(middleware=[auth]))
@rt("/locked")
def get(auth): return 'Hello, ' + auth
test_eq(cli.get('/locked').text, 'not authenticated')
test_eq(cli.get('/locked', auth=("testuser","spycraft")).text, 'Hello, testuser')APIRouter
RouteFuncs (路由函数)
RouteFuncs ()
初始化 self。请使用 help(type(self)) 查看准确的签名。
APIRouter
APIRouter (prefix:str|None=None, body_wrap=<function noop_body>)
向应用添加路由
ar = APIRouter()@ar("/hi")
def get(): return 'Hi there'
@ar("/hi")
def post(): return 'Postal'
@ar
def ho(): return 'Ho ho'
@ar("/hostie")
def show_host(req): return req.headers['host']
@ar
def yoyo(): return 'a yoyo'
@ar
def index(): return "home page"
@ar.ws("/ws")
def ws(self, msg:str): return f"Message text was: {msg}"app,cli,_ = get_cli(FastHTML())
ar.to_app(app)assert str(yoyo) == '/yoyo'
# ensure route functions are properly discoverable on `APIRouter` and `APIRouter.rt_funcs`
assert ar.prefix == ''
assert str(ar.rt_funcs.index) == '/'
assert str(ar.index) == '/'
with ExceptionExpected(): ar.blah()
with ExceptionExpected(): ar.rt_funcs.blah()
# ensure any route functions named using an HTTPMethod are not discoverable via `rt_funcs`
assert "get" not in ar.rt_funcs._funcs.keys()test_eq(cli.get('/hi').text, 'Hi there')
test_eq(cli.post('/hi').text, 'Postal')
test_eq(cli.get('/hostie').text, 'testserver')
test_eq(cli.post('/yoyo').text, 'a yoyo')
test_eq(cli.get('/ho').text, 'Ho ho')
test_eq(cli.post('/ho').text, 'Ho ho')with cli.websocket_connect('/ws') as ws:
ws.send_text('{"msg":"Hi!"}')
data = ws.receive_text()
assert data == 'Message text was: Hi!'ar2 = APIRouter("/products")@ar2("/hi")
def get(): return 'Hi there'
@ar2("/hi")
def post(): return 'Postal'
@ar2
def ho(): return 'Ho ho'
@ar2("/hostie")
def show_host(req): return req.headers['host']
@ar2
def yoyo(): return 'a yoyo'
@ar2
def index(): return "home page"
@ar2.ws("/ws")
def ws(self, msg:str): return f"Message text was: {msg}"app,cli,_ = get_cli(FastHTML())
ar2.to_app(app)assert str(yoyo) == '/products/yoyo'
assert ar2.prefix == '/products'
assert str(ar2.rt_funcs.index) == '/products/'
assert str(ar2.index) == '/products/'
assert str(ar.index) == '/'
with ExceptionExpected(): ar2.blah()
with ExceptionExpected(): ar2.rt_funcs.blah()
assert "get" not in ar2.rt_funcs._funcs.keys()test_eq(cli.get('/products/hi').text, 'Hi there')
test_eq(cli.post('/products/hi').text, 'Postal')
test_eq(cli.get('/products/hostie').text, 'testserver')
test_eq(cli.post('/products/yoyo').text, 'a yoyo')
test_eq(cli.get('/products/ho').text, 'Ho ho')
test_eq(cli.post('/products/ho').text, 'Ho ho')with cli.websocket_connect('/products/ws') as ws:
ws.send_text('{"msg":"Hi!"}')
data = ws.receive_text()
assert data == 'Message text was: Hi!'@ar.get
def hi2(): return 'Hi there'
@ar.get("/hi3")
def _(): return 'Hi there'
@ar.post("/post2")
def _(): return 'Postal'
@ar2.get
def hi2(): return 'Hi there'
@ar2.get("/hi3")
def _(): return 'Hi there'
@ar2.post("/post2")
def _(): return 'Postal'附加功能
app,cli,rt = get_cli(FastHTML(secret_key='soopersecret'))reg_re_param (注册正则参数)
reg_re_param (m, s)
FastHTML.static_route_exts (FastHTML.静态路由扩展)
FastHTML.static_route_exts (prefix='/', static_path='.', exts='static')
在 URL 路径 prefix 处添加一个静态路由,文件来自 static_path,扩展名由 reg_re_param() 定义
reg_re_param("imgext", "ico|gif|jpg|jpeg|webm|pdf")
@rt(r'/static/{path:path}{fn}.{ext:imgext}')
def get(fn:str, path:str, ext:str): return f"Getting {fn}.{ext} from /{path}"
test_r(cli, '/static/foo/jph.me.ico', 'Getting jph.me.ico from /foo/')app.static_route_exts()
assert 'These are the source notebooks for FastHTML' in cli.get('/README.txt').textFastHTML.static_route (FastHTML.静态路由)
FastHTML.static_route (ext='', prefix='/', static_path='.')
在 URL 路径 prefix 处添加一个静态路由,文件来自 static_path,使用单个 ext(包括‘.’)
app.static_route('.md', static_path='../..')
assert 'THIS FILE WAS AUTOGENERATED' in cli.get('/README.md').textMiddlewareBase (中间件基类)
MiddlewareBase ()
初始化 self。请使用 help(type(self)) 查看准确的签名。
FtResponse (Ft 响应)
FtResponse (content, status_code:int=200, headers=None, cls=<class 'starlette.responses.HTMLResponse'>, media_type:str|None=None, background:starlette.background.BackgroundTask|None=None)
用任何 Starlette Response 包装一个 FT 响应
@rt('/ftr')
def get():
cts = Title('Foo'),H1('bar')
return FtResponse(cts, status_code=201, headers={'Location':'/foo/1'})
r = cli.get('/ftr')
test_eq(r.status_code, 201)
test_eq(r.headers['location'], '/foo/1')
txt = r.text
assert '<title>Foo</title>' in txt and '<h1>bar</h1>' in txt and '<html>' in txt测试单个后台任务
def my_slow_task():
print('Starting slow task')
time.sleep(0.001)
print('Finished slow task')
@rt('/background')
def get():
return P('BG Task'), BackgroundTask(my_slow_task)
r = cli.get('/background')
test_eq(r.status_code, 200)Starting slow task
Finished slow task
测试多个后台任务
def increment(amount):
amount = amount/1000
print(f'Sleeping for {amount}s')
time.sleep(amount)
print(f'Slept for {amount}s')@rt
def backgrounds():
tasks = BackgroundTasks()
for i in range(3): tasks.add_task(increment, i)
return P('BG Tasks'), tasks
r = cli.get('/backgrounds')
test_eq(r.status_code, 200)Sleeping for 0.0s
Slept for 0.0s
Sleeping for 0.001s
Slept for 0.001s
Sleeping for 0.002s
Slept for 0.002s
@rt
def backgrounds2():
tasks = [BackgroundTask(increment,i) for i in range(3)]
return P('BG Tasks'), *tasks
r = cli.get('/backgrounds2')
test_eq(r.status_code, 200)Sleeping for 0.0s
Slept for 0.0s
Sleeping for 0.001s
Slept for 0.001s
Sleeping for 0.002s
Slept for 0.002s
@rt
def backgrounds3():
tasks = [BackgroundTask(increment,i) for i in range(3)]
return {'status':'done'}, *tasks
r = cli.get('/backgrounds3')
test_eq(r.status_code, 200)
r.json()Sleeping for 0.0s
Slept for 0.0s
Sleeping for 0.001s
Slept for 0.001s
Sleeping for 0.002s
Slept for 0.002s
{'status': 'done'}
unqid (唯一 ID)
unqid (seeded=False)
FastHTML.setup_ws (FastHTML.设置 ws)
FastHTML.setup_ws (app:__main__.FastHTML, f=<function noop>)
FastHTML.devtools_json (FastHTML.开发者工具 json)
FastHTML.devtools_json (path=None, uuid=None)