核心

StarletteFastHTML 子类,以及它自动使用的 RouterXRouteX 类。

这是 fasthtml 的源代码。除非您想了解其内部构建原理,或需要某个特定 API 的完整详细信息,否则无需阅读此文档。该 notebook 使用 nbdev 转换为 Python 模块 fasthtml/core.py

导入和工具函数

import time

from IPython import display
from enum import Enum
from pprint import pprint

from fastcore.test import *
from starlette.testclient import TestClient
from starlette.requests import Headers
from starlette.datastructures import UploadFile

我们编写源代码,然后再编写测试。测试既是确认代码有效性的手段,也作为实际可用的示例。第一个导出的函数 parsed_date 就是这种模式的一个例子。


源代码

parsed_date (解析日期)

 parsed_date (s:str)

s 转换为 datetime 对象

parsed_date('2pm')
datetime.datetime(2025, 7, 2, 14, 0)
isinstance(date.fromtimestamp(0), date)
True

源代码

snake2hyphens (蛇形命名转连字符)

 snake2hyphens (s:str)

s 从蛇形命名法(snake case)转换为连字符连接且首字母大写的形式

snake2hyphens("snake_case")
'Snake-Case'

源代码

HtmxHeaders (Htmx 响应头)

 HtmxHeaders (boosted:str|None=None, current_url:str|None=None,
              history_restore_request:str|None=None, prompt:str|None=None,
              request:str|None=None, target:str|None=None,
              trigger_name:str|None=None, trigger:str|None=None)
def test_request(url: str='/', headers: dict={}, method: str='get') -> Request:
    scope = {
        'type': 'http',
        'method': method,
        'path': url,
        'headers': Headers(headers).raw,
        'query_string': b'',
        'scheme': 'http',
        'client': ('127.0.0.1', 8000),
        'server': ('127.0.0.1', 8000),
    }
    receive = lambda: {"body": b"", "more_body": False}
    return Request(scope, receive)
h = test_request(headers=Headers({'HX-Request':'1'}))
_get_htmx(h.headers)
HtmxHeaders(boosted=None, current_url=None, history_restore_request=None, prompt=None, request='1', target=None, trigger_name=None, trigger=None)

请求和响应

test_eq(_fix_anno(Union[str,None], 'a'), 'a')
test_eq(_fix_anno(float, 0.9), 0.9)
test_eq(_fix_anno(int, '1'), 1)
test_eq(_fix_anno(int, ['1','2']), 2)
test_eq(_fix_anno(list[int], ['1','2']), [1,2])
test_eq(_fix_anno(list[int], '1'), [1])
d = dict(k=int, l=List[int])
test_eq(_form_arg('k', "1", d), 1)
test_eq(_form_arg('l', "1", d), [1])
test_eq(_form_arg('l', ["1","2"], d), [1,2])

源代码

HttpHeader (HTTP 响应头)

 HttpHeader (k:str, v:str)
_to_htmx_header('trigger_after_settle')
'HX-Trigger-After-Settle'

源代码

HtmxResponseHeaders (Htmx 响应头)

 HtmxResponseHeaders (location=None, push_url=None, redirect=None,
                      refresh=None, replace_url=None, reswap=None,
                      retarget=None, reselect=None, trigger=None,
                      trigger_after_settle=None, trigger_after_swap=None)

HTMX 响应头

HtmxResponseHeaders(trigger_after_settle='hi')
HttpHeader(k='HX-Trigger-After-Settle', v='hi')

源代码

form2dict (表单转字典)

 form2dict (form:starlette.datastructures.FormData)

将 starlette 的表单数据转换为字典

d = [('a',1),('a',2),('b',0)]
fd = FormData(d)
res = form2dict(fd)
test_eq(res['a'], [1,2])
test_eq(res['b'], 0)

源代码

parse_form (解析表单)

 parse_form (req:starlette.requests.Request)

Starlette 在处理空的多部分表单(multipart forms)时会出错,因此这个函数会检查这种情况


源代码

JSONResponse (JSON 响应)

 JSONResponse (content:Any, status_code:int=200,
               headers:collections.abc.Mapping[str,str]|None=None,
               media_type:str|None=None,
               background:starlette.background.BackgroundTask|None=None)

与 starlette 的版本相同,但会自动将非序列化类型转换为字符串

async def f(req):
    def _f(p:HttpHeader): ...
    p = first(_params(_f).values())
    result = await _from_body(req, p)
    return JSONResponse(result.__dict__)

client = TestClient(Starlette(routes=[Route('/', f, methods=['POST'])]))

d = dict(k='value1',v=['value2','value3'])
response = client.post('/', data=d)
print(response.json())
{'k': 'value1', 'v': 'value3'}
async def f(req): return Response(str(req.query_params.getlist('x')))
client = TestClient(Starlette(routes=[Route('/', f, methods=['GET'])]))
client.get('/?x=1&x=2').text
"['1', '2']"
def g(req, this:Starlette, a:str, b:HttpHeader): ...

async def f(req):
    a = await _wrap_req(req, _params(g))
    return Response(str(a))

client = TestClient(Starlette(routes=[Route('/', f, methods=['POST'])]))
response = client.post('/?a=1', data=d)
print(response.text)
[<starlette.requests.Request object>, <starlette.applications.Starlette object>, '1', HttpHeader(k='value1', v='value3')]
def g(req, this:Starlette, a:str, b:HttpHeader): ...

async def f(req):
    a = await _wrap_req(req, _params(g))
    return Response(str(a))

client = TestClient(Starlette(routes=[Route('/', f, methods=['POST'])]))
response = client.post('/?a=1', data=d)
print(response.text)
[<starlette.requests.Request object>, <starlette.applications.Starlette object>, '1', HttpHeader(k='value1', v='value3')]

缺失的请求参数

如果一个请求参数有默认值(例如 a:str=''),即使用户请求中没有包含该参数,请求也是有效的。

def g(req, this:Starlette, a:str=''): ...

async def f(req):
    a = await _wrap_req(req, _params(g))
    return Response(str(a))

client = TestClient(Starlette(routes=[Route('/', f, methods=['POST'])]))
response = client.post('/', json={}) # no param in request
print(response.text)
[<starlette.requests.Request object>, <starlette.applications.Starlette object>, '']

如果我们移除默认值并重新运行请求,应该会得到以下错误:Missing required field: a (缺少必需字段: a)。

def g(req, this:Starlette, a:str): ...

async def f(req):
    a = await _wrap_req(req, _params(g))
    return Response(str(a))

client = TestClient(Starlette(routes=[Route('/', f, methods=['POST'])]))
response = client.post('/', json={}) # no param in request
print(response.text)
Missing required field: a

源代码

flat_xt (扁平化 xt)

 flat_xt (lst)

扁平化列表

x = ft('a',1)
test_eq(flat_xt([x, x, [x,x]]), (x,)*4)
test_eq(flat_xt(x), (x,))

源代码

Beforeware (前置件)

 Beforeware (f, skip=None)

初始化 self。请使用 help(type(self)) 查看准确的签名。

Websockets / SSE

def on_receive(self, msg:str): return f"Message text was: {msg}"
c = _ws_endp(on_receive)
cli = TestClient(Starlette(routes=[WebSocketRoute('/', _ws_endp(on_receive))]))
with cli.websocket_connect('/') as ws:
    ws.send_text('{"msg":"Hi!"}')
    data = ws.receive_text()
    assert data == 'Message text was: Hi!'

源代码

EventStream (事件流)

 EventStream (s)

s 创建一个 text/event-stream 响应


源代码

signal_shutdown (关闭信号)

 signal_shutdown ()

路由和应用


源代码

uri

 uri (_arg, **kwargs)

源代码

decode_uri (解码 URI)

 decode_uri (s)

源代码

StringConvertor.to_string (字符串转换器.to_string)

 StringConvertor.to_string (value:str)

源代码

HTTPConnection.url_path_for (HTTP连接.url_path_for)

 HTTPConnection.url_path_for (name:str, **path_params)

源代码

flat_tuple (扁平化元组)

 flat_tuple (o)

扁平化列表


源代码

noop_body (空操作响应体)

 noop_body (c, req)

默认的响应体包装函数,仅返回内容


源代码

respond (响应)

 respond (req, heads, bdy)

默认的 FT 响应创建函数

如果 HX-Request 响应头存在HX-History-Restore-Request 响应头不存在,则渲染页面片段。


源代码

is_full_page (是否为完整页面)

 is_full_page (req, resp)

源代码

Redirect (重定向)

 Redirect (loc)

根据需要使用 HTMX 或 Starlette 的 RedirectResponse 重定向到 loc

FastHTML 的 exts 参数支持以下内容

print(' '.join(htmx_exts))
morph head-support preload class-tools loading-states multi-swap path-deps remove-me ws chunked-transfer

源代码

get_key (获取键)

 get_key (key=None, fname='.sesskey')

源代码

qp (查询参数)

 qp (p:str, **kw)

将参数 kw 添加到路径 p

qp 将查询参数添加到路由路径字符串中

vals = {'a':5, 'b':False, 'c':[1,2], 'd':'bar', 'e':None, 'ab':42}
res = qp('/foo', **vals)
test_eq(res, '/foo?a=5&b=&c=1&c=2&d=bar&e=&ab=42')

qp 检查每个参数应作为查询参数还是作为路由的一部分发送,并进行正确编码。

path = '/foo/{a}/{d}/{ab:int}'
res = qp(path, **vals)
test_eq(res, '/foo/5/bar/42?b=&c=1&c=2&e=')

源代码

def_hdrs (默认响应头)

 def_hdrs (htmx=True, surreal=True)

FastHTML 应用的默认响应头


源代码

FastHTML

 FastHTML (debug=False, routes=None, middleware=None, title:str='FastHTML
           page', exception_handlers=None, on_startup=None,
           on_shutdown=None, lifespan=None, hdrs=None, ftrs=None,
           exts=None, before=None, after=None, surreal=True, htmx=True,
           default_hdrs=True, sess_cls=<class
           'starlette.middleware.sessions.SessionMiddleware'>,
           secret_key=None, session_cookie='session_', max_age=31536000,
           sess_path='/', same_site='lax', sess_https_only=False,
           sess_domain=None, key_fname='.sesskey', body_wrap=<function
           noop_body>, htmlkw=None, nb_hdrs=False, canonical=True,
           **bodykw)

创建一个 Starlette 应用。


源代码

FastHTML.add_route (FastHTML.添加路由)

 FastHTML.add_route (route)

源代码

FastHTML.ws (FastHTML.ws)

 FastHTML.ws (path:str, conn=None, disconn=None, name=None,
              middleware=None)

path 处添加一个 websocket 路由


源代码

nested_name (嵌套名称)

 nested_name (f)

*获取函数 f 的名称,使用 '_' 连接嵌套函数名*

def f():
    def g(): ...
    return g
func = f()
nested_name(func)
'f_g'

源代码

FastHTML.route (FastHTML.路由)

 FastHTML.route (path:str=None, methods=None, name=None,
                 include_in_schema=True, body_wrap=None)

path 处添加一个路由

app = FastHTML()
@app.get
def foo(a:str, b:list[int]): ...

foo.to(a='bar', b=[1,2])
'/foo?a=bar&b=1&b=2'
@app.get('/foo/{a}')
def foo(a:str, b:list[int]): ...

foo.to(a='bar', b=[1,2])
'/foo/bar?b=1&b=2'

源代码

FastHTML.set_lifespan (FastHTML.设置生命周期)

 FastHTML.set_lifespan (value)

源代码

serve (启动服务)

 serve (appname=None, app='app', host='0.0.0.0', port=None, reload=True,
        reload_includes:list[str]|str|None=None,
        reload_excludes:list[str]|str|None=None)

在一个异步服务器中运行应用,默认启用实时重新加载。

类型 默认值 详情
appname NoneType None 模块的名称
app str app 要运行的应用实例
host str 0.0.0.0 如果 host 是 0.0.0.0,将转换为 localhost
port NoneType None 如果 port 为 None,将默认为 5001 或 PORT 环境变量的值
reload bool True 默认在代码更改时重新加载应用
reload_includes list[str] | str | None None 需要监视其更改的额外文件
reload_excludes list[str] | str | None None 要忽略其更改的文件

源代码

Client (客户端)

 Client (app, url='http://testserver')

一个简单的、不需要 async 的 httpx ASGI 客户端

app = FastHTML(routes=[Route('/', lambda _: Response('test'))])
cli = Client(app)

cli.get('/').text
'test'

请注意,您也可以使用 Starlette 的 TestClient 来代替 FastHTML 的 Client。它们在很大程度上应该是可以互换的。

FastHTML 测试

def get_cli(app): return app,TestClient(app),app.route
app,cli,rt = get_cli(FastHTML(secret_key='soopersecret'))
app,cli,rt = get_cli(FastHTML(title="My Custom Title"))
@app.get
def foo(): return Div("Hello World")

print(app.routes)

response = cli.get('/foo')
assert '<title>My Custom Title</title>' in response.text

foo.to(param='value')
[Route(path='/foo', name='foo', methods=['GET', 'HEAD'])]
'/foo?param=value'
app,cli,rt = get_cli(FastHTML())

@rt('/xt2')
def get(): return H1('bar')

txt = cli.get('/xt2').text
assert '<title>FastHTML page</title>' in txt and '<h1>bar</h1>' in txt and '<html>' in txt
@rt("/hi")
def get(): return 'Hi there'

r = cli.get('/hi')
r.text
'Hi there'
@rt("/hi")
def post(): return 'Postal'

cli.post('/hi').text
'Postal'
@app.get("/hostie")
def show_host(req): return req.headers['host']

cli.get('/hostie').text
'testserver'
@app.get("/setsess")
def set_sess(session):
   session['foo'] = 'bar'
   return 'ok'

@app.ws("/ws")
def ws(self, msg:str, ws:WebSocket, session): return f"Message text was: {msg} with session {session.get('foo')}, from client: {ws.client}"

cli.get('/setsess')
with cli.websocket_connect('/ws') as ws:
    ws.send_text('{"msg":"Hi!"}')
    data = ws.receive_text()
assert 'Message text was: Hi! with session bar' in data
print(data)
Message text was: Hi! with session bar, from client: Address(host='testclient', port=50000)
@rt
def yoyo(): return 'a yoyo'

cli.post('/yoyo').text
'a yoyo'
@app.get
def autopost(): return Html(Div('Text.', hx_post=yoyo()))
print(cli.get('/autopost').text)
 <!doctype html>
 <html>
   <div hx-post="a yoyo">Text.</div>
 </html>
@app.get
def autopost2(): return Html(Body(Div('Text.', cls='px-2', hx_post=show_host.to(a='b'))))
print(cli.get('/autopost2').text)
 <!doctype html>
 <html>
   <body>
     <div class="px-2" hx-post="/hostie?a=b">Text.</div>
   </body>
 </html>
@app.get
def autoget2(): return Html(Div('Text.', hx_get=show_host))
print(cli.get('/autoget2').text)
 <!doctype html>
 <html>
   <div hx-get="/hostie">Text.</div>
 </html>
@rt('/user/{nm}', name='gday')
def get(nm:str=''): return f"Good day to you, {nm}!"
cli.get('/user/Alexis').text
'Good day to you, Alexis!'
@app.get
def autolink(): return Html(Div('Text.', link=uri('gday', nm='Alexis')))
print(cli.get('/autolink').text)
 <!doctype html>
 <html>
   <div href="/user/Alexis">Text.</div>
 </html>
@rt('/link')
def get(req): return f"{req.url_for('gday', nm='Alexis')}; {req.url_for('show_host')}"

cli.get('/link').text
'http://testserver/user/Alexis; http://testserver/hostie'
@app.get("/background")
async def background_task(request):
    async def long_running_task():
        await asyncio.sleep(0.1)
        print("Background task completed!")
    return P("Task started"), BackgroundTask(long_running_task)

response = cli.get("/background")
Background task completed!
test_eq(app.router.url_path_for('gday', nm='Jeremy'), '/user/Jeremy')
hxhdr = {'headers':{'hx-request':"1"}}

@rt('/ft')
def get(): return Title('Foo'),H1('bar')

txt = cli.get('/ft').text
assert '<title>Foo</title>' in txt and '<h1>bar</h1>' in txt and '<html>' in txt

@rt('/xt2')
def get(): return H1('bar')

txt = cli.get('/xt2').text
assert '<title>FastHTML page</title>' in txt and '<h1>bar</h1>' in txt and '<html>' in txt

assert cli.get('/xt2', **hxhdr).text.strip() == '<h1>bar</h1>'

@rt('/xt3')
def get(): return Html(Head(Title('hi')), Body(P('there')))

txt = cli.get('/xt3').text
assert '<title>FastHTML page</title>' not in txt and '<title>hi</title>' in txt and '<p>there</p>' in txt
@rt('/oops')
def get(nope): return nope
test_warns(lambda: cli.get('/oops?nope=1'))
def test_r(cli, path, exp, meth='get', hx=False, **kwargs):
    if hx: kwargs['headers'] = {'hx-request':"1"}
    test_eq(getattr(cli, meth)(path, **kwargs).text, exp)

ModelName = str_enum('ModelName', "alexnet", "resnet", "lenet")
fake_db = [{"name": "Foo"}, {"name": "Bar"}]
@rt('/html/{idx}')
async def get(idx:int): return Body(H4(f'Next is {idx+1}.'))
@rt("/models/{nm}")
def get(nm:ModelName): return nm

@rt("/files/{path}")
async def get(path: Path): return path.with_suffix('.txt')

@rt("/items/")
def get(idx:int|None = 0): return fake_db[idx]

@rt("/idxl/")
def get(idx:list[int]): return str(idx)
r = cli.get('/html/1', headers={'hx-request':"1"})
assert '<h4>Next is 2.</h4>' in r.text
test_r(cli, '/models/alexnet', 'alexnet')
test_r(cli, '/files/foo', 'foo.txt')
test_r(cli, '/items/?idx=1', '{"name":"Bar"}')
test_r(cli, '/items/', '{"name":"Foo"}')
assert cli.get('/items/?idx=g').text=='404 Not Found'
assert cli.get('/items/?idx=g').status_code == 404
test_r(cli, '/idxl/?idx=1&idx=2', '[1, 2]')
assert cli.get('/idxl/?idx=1&idx=g').status_code == 404
app = FastHTML()
rt = app.route
cli = TestClient(app)
@app.route(r'/static/{path:path}.jpg')
def index(path:str): return f'got {path}'
@app.route(r'/static/{path:path}')
def foo(path:str, a:int): return f'also got {path},{a}'
cli.get('/static/sub/a.b.jpg').text
'got sub/a.b'
cli.get('/static/sub/a.b?a=1').text
'also got sub/a.b,1'
app.chk = 'foo'
@app.get("/booly/")
def _(coming:bool=True): return 'Coming' if coming else 'Not coming'

@app.get("/datie/")
def _(d:parsed_date): return d

@app.get("/ua")
async def _(user_agent:str): return user_agent

@app.get("/hxtest")
def _(htmx): return htmx.request

@app.get("/hxtest2")
def _(foo:HtmxHeaders, req): return foo.request

@app.get("/app")
def _(app): return app.chk

@app.get("/app2")
def _(foo:FastHTML): return foo.chk,HttpHeader("mykey", "myval")

@app.get("/app3")
def _(foo:FastHTML): return HtmxResponseHeaders(location="http://example.org")

@app.get("/app4")
def _(foo:FastHTML): return Redirect("http://example.org")
test_r(cli, '/booly/?coming=true', 'Coming')
test_r(cli, '/booly/?coming=no', 'Not coming')
date_str = "17th of May, 2024, 2p"
test_r(cli, f'/datie/?d={date_str}', '2024-05-17 14:00:00')
test_r(cli, '/ua', 'FastHTML', headers={'User-Agent':'FastHTML'})
test_r(cli, '/hxtest' , '1', headers={'HX-Request':'1'})
test_r(cli, '/hxtest2', '1', headers={'HX-Request':'1'})
test_r(cli, '/app' , 'foo')
r = cli.get('/app2', **hxhdr)
test_eq(r.text, 'foo')
test_eq(r.headers['mykey'], 'myval')
r = cli.get('/app3')
test_eq(r.headers['HX-Location'], 'http://example.org')
r = cli.get('/app4', follow_redirects=False)
test_eq(r.status_code, 303)
r = cli.get('/app4', headers={'HX-Request':'1'})
test_eq(r.headers['HX-Redirect'], 'http://example.org')
@rt
def meta():
    return ((Title('hi'),H1('hi')),
        (Meta(property='image'), Meta(property='site_name'))
    )

t = cli.post('/meta').text
assert re.search(r'<body>\s*<h1>hi</h1>\s*</body>', t)
assert '<meta' in t
@app.post('/profile/me')
def profile_update(username: str): return username

test_r(cli, '/profile/me', 'Alexis', 'post', data={'username' : 'Alexis'})
test_r(cli, '/profile/me', 'Missing required field: username', 'post', data={})
# Example post request with parameter that has a default value
@app.post('/pet/dog')
def pet_dog(dogname: str = None): return dogname

# Working post request with optional parameter
test_r(cli, '/pet/dog', '', 'post', data={})
@dataclass
class Bodie: a:int;b:str

@rt("/bodie/{nm}")
def post(nm:str, data:Bodie):
    res = asdict(data)
    res['nm'] = nm
    return res

@app.post("/bodied/")
def bodied(data:dict): return data

nt = namedtuple('Bodient', ['a','b'])

@app.post("/bodient/")
def bodient(data:nt): return asdict(data)

class BodieTD(TypedDict): a:int;b:str='foo'

@app.post("/bodietd/")
def bodient(data:BodieTD): return data

class Bodie2:
    a:int|None; b:str
    def __init__(self, a, b='foo'): store_attr()

@rt("/bodie2/", methods=['get','post'])
def bodie(d:Bodie2): return f"a: {d.a}; b: {d.b}"
from fasthtml.xtend import Titled
d = dict(a=1, b='foo')

test_r(cli, '/bodie/me', '{"a":1,"b":"foo","nm":"me"}', 'post', data=dict(a=1, b='foo', nm='me'))
test_r(cli, '/bodied/', '{"a":"1","b":"foo"}', 'post', data=d)
test_r(cli, '/bodie2/', 'a: 1; b: foo', 'post', data={'a':1})
test_r(cli, '/bodie2/?a=1&b=foo&nm=me', 'a: 1; b: foo')
test_r(cli, '/bodient/', '{"a":"1","b":"foo"}', 'post', data=d)
test_r(cli, '/bodietd/', '{"a":1,"b":"foo"}', 'post', data=d)
# Testing POST with Content-Type: application/json
@app.post("/")
def index(it: Bodie): return Titled("It worked!", P(f"{it.a}, {it.b}"))

s = json.dumps({"b": "Lorem", "a": 15})
response = cli.post('/', headers={"Content-Type": "application/json"}, data=s).text
assert "<title>It worked!</title>" in response and "<p>15, Lorem</p>" in response
# Testing POST with Content-Type: application/json
@app.post("/bodytext")
def index(body): return body

response = cli.post('/bodytext', headers={"Content-Type": "application/json"}, data=s).text
test_eq(response, '{"b": "Lorem", "a": 15}')
files = [ ('files', ('file1.txt', b'content1')),
         ('files', ('file2.txt', b'content2')) ]
@rt("/uploads")
async def post(files:list[UploadFile]):
    return ','.join([(await file.read()).decode() for file in files])

res = cli.post('/uploads', files=files)
print(res.status_code)
print(res.text)
200
content1,content2
res = cli.post('/uploads', files=[files[0]])
print(res.status_code)
print(res.text)
200
content1
@rt("/setsess")
def get(sess, foo:str=''):
    now = datetime.now()
    sess['auth'] = str(now)
    return f'Set to {now}'

@rt("/getsess")
def get(sess): return f'Session time: {sess["auth"]}'

print(cli.get('/setsess').text)
time.sleep(0.01)

cli.get('/getsess').text
Set to 2025-05-29 08:31:48.235262
'Session time: 2025-05-29 08:31:48.235262'
@rt("/sess-first")
def post(sess, name: str):
    sess["name"] = name
    return str(sess)

cli.post('/sess-first', data={'name': 2})

@rt("/getsess-all")
def get(sess): return sess['name']

test_eq(cli.get('/getsess-all').text, '2')
@rt("/upload")
async def post(uf:UploadFile): return (await uf.read()).decode()

with open('../../CHANGELOG.md', 'rb') as f:
    print(cli.post('/upload', files={'uf':f}, data={'msg':'Hello'}).text[:15])
# Release notes
@rt("/form-submit/{list_id}")
def options(list_id: str):
    headers = {
        'Access-Control-Allow-Origin': '*',
        'Access-Control-Allow-Methods': 'POST',
        'Access-Control-Allow-Headers': '*',
    }
    return Response(status_code=200, headers=headers)
h = cli.options('/form-submit/2').headers
test_eq(h['Access-Control-Allow-Methods'], 'POST')
from fasthtml.authmw import user_pwd_auth
def _not_found(req, exc): return Div('nope')

app,cli,rt = get_cli(FastHTML(exception_handlers={404:_not_found}))

txt = cli.get('/').text
assert '<div>nope</div>' in txt
assert '<!doctype html>' in txt
app,cli,rt = get_cli(FastHTML())

@rt("/{name}/{age}")
def get(name: str, age: int):
    return Titled(f"Hello {name.title()}, age {age}")

assert '<title>Hello Uma, age 5</title>' in cli.get('/uma/5').text
assert '404 Not Found' in cli.get('/uma/five').text
auth = user_pwd_auth(testuser='spycraft')
app,cli,rt = get_cli(FastHTML(middleware=[auth]))

@rt("/locked")
def get(auth): return 'Hello, ' + auth

test_eq(cli.get('/locked').text, 'not authenticated')
test_eq(cli.get('/locked', auth=("testuser","spycraft")).text, 'Hello, testuser')
auth = user_pwd_auth(testuser='spycraft')
app,cli,rt = get_cli(FastHTML(middleware=[auth]))

@rt("/locked")
def get(auth): return 'Hello, ' + auth

test_eq(cli.get('/locked').text, 'not authenticated')
test_eq(cli.get('/locked', auth=("testuser","spycraft")).text, 'Hello, testuser')

APIRouter


源代码

RouteFuncs (路由函数)

 RouteFuncs ()

初始化 self。请使用 help(type(self)) 查看准确的签名。


源代码

APIRouter

 APIRouter (prefix:str|None=None, body_wrap=<function noop_body>)

向应用添加路由

ar = APIRouter()
@ar("/hi")
def get(): return 'Hi there'
@ar("/hi")
def post(): return 'Postal'
@ar
def ho(): return 'Ho ho'
@ar("/hostie")
def show_host(req): return req.headers['host']
@ar
def yoyo(): return 'a yoyo'
@ar
def index(): return "home page"

@ar.ws("/ws")
def ws(self, msg:str): return f"Message text was: {msg}"
app,cli,_ = get_cli(FastHTML())
ar.to_app(app)
assert str(yoyo) == '/yoyo'
# ensure route functions are properly discoverable on `APIRouter` and `APIRouter.rt_funcs`
assert ar.prefix == ''
assert str(ar.rt_funcs.index) == '/'
assert str(ar.index) == '/'
with ExceptionExpected(): ar.blah()
with ExceptionExpected(): ar.rt_funcs.blah()
# ensure any route functions named using an HTTPMethod are not discoverable via `rt_funcs`
assert "get" not in ar.rt_funcs._funcs.keys()
test_eq(cli.get('/hi').text, 'Hi there')
test_eq(cli.post('/hi').text, 'Postal')
test_eq(cli.get('/hostie').text, 'testserver')
test_eq(cli.post('/yoyo').text, 'a yoyo')

test_eq(cli.get('/ho').text, 'Ho ho')
test_eq(cli.post('/ho').text, 'Ho ho')
with cli.websocket_connect('/ws') as ws:
    ws.send_text('{"msg":"Hi!"}')
    data = ws.receive_text()
    assert data == 'Message text was: Hi!'
ar2 = APIRouter("/products")
@ar2("/hi")
def get(): return 'Hi there'
@ar2("/hi")
def post(): return 'Postal'
@ar2
def ho(): return 'Ho ho'
@ar2("/hostie")
def show_host(req): return req.headers['host']
@ar2
def yoyo(): return 'a yoyo'
@ar2
def index(): return "home page"

@ar2.ws("/ws")
def ws(self, msg:str): return f"Message text was: {msg}"
app,cli,_ = get_cli(FastHTML())
ar2.to_app(app)
assert str(yoyo) == '/products/yoyo'
assert ar2.prefix == '/products'
assert str(ar2.rt_funcs.index) == '/products/'
assert str(ar2.index) == '/products/'
assert str(ar.index) == '/'
with ExceptionExpected(): ar2.blah()
with ExceptionExpected(): ar2.rt_funcs.blah()
assert "get" not in ar2.rt_funcs._funcs.keys()
test_eq(cli.get('/products/hi').text, 'Hi there')
test_eq(cli.post('/products/hi').text, 'Postal')
test_eq(cli.get('/products/hostie').text, 'testserver')
test_eq(cli.post('/products/yoyo').text, 'a yoyo')

test_eq(cli.get('/products/ho').text, 'Ho ho')
test_eq(cli.post('/products/ho').text, 'Ho ho')
with cli.websocket_connect('/products/ws') as ws:
    ws.send_text('{"msg":"Hi!"}')
    data = ws.receive_text()
    assert data == 'Message text was: Hi!'
@ar.get
def hi2(): return 'Hi there'
@ar.get("/hi3")
def _(): return 'Hi there'
@ar.post("/post2")
def _(): return 'Postal'

@ar2.get
def hi2(): return 'Hi there'
@ar2.get("/hi3")
def _(): return 'Hi there'
@ar2.post("/post2")
def _(): return 'Postal'

附加功能

app,cli,rt = get_cli(FastHTML(secret_key='soopersecret'))

源代码

reg_re_param (注册正则参数)

 reg_re_param (m, s)

源代码

FastHTML.static_route_exts (FastHTML.静态路由扩展)

 FastHTML.static_route_exts (prefix='/', static_path='.', exts='static')

在 URL 路径 prefix 处添加一个静态路由,文件来自 static_path,扩展名由 reg_re_param() 定义

reg_re_param("imgext", "ico|gif|jpg|jpeg|webm|pdf")

@rt(r'/static/{path:path}{fn}.{ext:imgext}')
def get(fn:str, path:str, ext:str): return f"Getting {fn}.{ext} from /{path}"

test_r(cli, '/static/foo/jph.me.ico', 'Getting jph.me.ico from /foo/')
app.static_route_exts()
assert 'These are the source notebooks for FastHTML' in cli.get('/README.txt').text

源代码

FastHTML.static_route (FastHTML.静态路由)

 FastHTML.static_route (ext='', prefix='/', static_path='.')

在 URL 路径 prefix 处添加一个静态路由,文件来自 static_path,使用单个 ext(包括‘.’)

app.static_route('.md', static_path='../..')
assert 'THIS FILE WAS AUTOGENERATED' in cli.get('/README.md').text

源代码

MiddlewareBase (中间件基类)

 MiddlewareBase ()

初始化 self。请使用 help(type(self)) 查看准确的签名。


源代码

FtResponse (Ft 响应)

 FtResponse (content, status_code:int=200, headers=None, cls=<class
             'starlette.responses.HTMLResponse'>,
             media_type:str|None=None,
             background:starlette.background.BackgroundTask|None=None)

用任何 Starlette Response 包装一个 FT 响应

@rt('/ftr')
def get():
    cts = Title('Foo'),H1('bar')
    return FtResponse(cts, status_code=201, headers={'Location':'/foo/1'})

r = cli.get('/ftr')

test_eq(r.status_code, 201)
test_eq(r.headers['location'], '/foo/1')
txt = r.text
assert '<title>Foo</title>' in txt and '<h1>bar</h1>' in txt and '<html>' in txt

测试单个后台任务

def my_slow_task():
    print('Starting slow task')    
    time.sleep(0.001)
    print('Finished slow task')        

@rt('/background')
def get():
    return P('BG Task'), BackgroundTask(my_slow_task)

r = cli.get('/background')

test_eq(r.status_code, 200)
Starting slow task
Finished slow task

测试多个后台任务

def increment(amount):
    amount = amount/1000
    print(f'Sleeping for {amount}s')    
    time.sleep(amount)
    print(f'Slept for {amount}s')
@rt
def backgrounds():
    tasks = BackgroundTasks()
    for i in range(3): tasks.add_task(increment, i)
    return P('BG Tasks'), tasks

r = cli.get('/backgrounds')
test_eq(r.status_code, 200)
Sleeping for 0.0s
Slept for 0.0s
Sleeping for 0.001s
Slept for 0.001s
Sleeping for 0.002s
Slept for 0.002s
@rt
def backgrounds2():
    tasks = [BackgroundTask(increment,i) for i in range(3)]
    return P('BG Tasks'), *tasks

r = cli.get('/backgrounds2')
test_eq(r.status_code, 200)
Sleeping for 0.0s
Slept for 0.0s
Sleeping for 0.001s
Slept for 0.001s
Sleeping for 0.002s
Slept for 0.002s
@rt
def backgrounds3():
    tasks = [BackgroundTask(increment,i) for i in range(3)]
    return {'status':'done'}, *tasks

r = cli.get('/backgrounds3')
test_eq(r.status_code, 200)
r.json()
Sleeping for 0.0s
Slept for 0.0s
Sleeping for 0.001s
Slept for 0.001s
Sleeping for 0.002s
Slept for 0.002s
{'status': 'done'}

源代码

unqid (唯一 ID)

 unqid (seeded=False)

源代码

FastHTML.setup_ws (FastHTML.设置 ws)

 FastHTML.setup_ws (app:__main__.FastHTML, f=<function noop>)

源代码

FastHTML.devtools_json (FastHTML.开发者工具 json)

 FastHTML.devtools_json (path=None, uuid=None)