Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/_newsfragments/2337.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Static routes now support ``HEAD`` without opening file streams, and reject unsupported methods with a proper ``405 Method Not Allowed`` response.
130 changes: 87 additions & 43 deletions falcon/routing/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pathlib import Path
import re
from re import Pattern
import stat
from typing import Any, ClassVar, IO, TYPE_CHECKING

import falcon
Expand Down Expand Up @@ -39,57 +40,81 @@ def _open_file(file_path: str | Path) -> tuple[io.BufferedReader, os.stat_result
return fh, st


def _set_range(
fh: io.BufferedReader, st: os.stat_result, req_range: tuple[int, int] | None
) -> tuple[ReadableIO, int, tuple[int, int, int] | None]:
"""Process file handle for a ranged request.
def _stat_file(file_path: str | Path) -> os.stat_result:
"""Read file stat for a static file request without opening it."""

Args:
fh (io.BufferedReader): file handle of the file.
st (os.stat_result): fs stat result of the file.
req_range (Optional[Tuple[int, int]]): Request.range value.
Returns:
tuple: Three-member tuple of (stream, content-length, content-range).
If req_range is ``None`` or ignored, content-range will be
``None``; otherwise, the stream will be appropriately seeked and
possibly bounded, and the content-range will be a tuple of
(start, end, size).
"""
try:
st = os.stat(file_path)
if not stat.S_ISREG(st.st_mode):
raise OSError
except OSError:
raise falcon.HTTPNotFound()

return st


def _get_range(
st: os.stat_result, req_range: tuple[int, int] | None
) -> tuple[int, tuple[int, int, int] | None]:
size = st.st_size
if req_range is None:
return fh, size, None
return size, None

start, end = req_range
if size == 0:
# NOTE(tipabu): Ignore Range headers for zero-byte files; just serve
# the empty body since Content-Range can't be used to express a
# zero-byte body.
return fh, 0, None
return 0, None

if start < 0 and end == -1:
# NOTE(tipabu): Special case: only want the last N bytes.
start = max(start, -size)
fh.seek(start, os.SEEK_END)
# NOTE(vytas): Wrap in order to prevent sendfile from being used, as
# its implementation was found to be buggy in many popular WSGI
# servers for open files with a non-zero offset.
return _BoundedFile(fh, -start), -start, (size + start, size - 1, size)
return -start, (size + start, size - 1, size)

if start >= size:
fh.close()
raise falcon.HTTPRangeNotSatisfiable(size)

fh.seek(start)
if end == -1:
# NOTE(vytas): Wrap in order to prevent sendfile from being used, as
# its implementation was found to be buggy in many popular WSGI
# servers for open files with a non-zero offset.
length = size - start
return _BoundedFile(fh, length), length, (start, size - 1, size)
return length, (start, size - 1, size)

end = min(end, size - 1)
length = end - start + 1
return _BoundedFile(fh, length), length, (start, end, size)
return length, (start, end, size)


def _set_range(
fh: io.BufferedReader, st: os.stat_result, req_range: tuple[int, int] | None
) -> tuple[ReadableIO, int, tuple[int, int, int] | None]:
"""Process file handle for a ranged request.

Args:
fh (io.BufferedReader): file handle of the file.
st (os.stat_result): fs stat result of the file.
req_range (Optional[Tuple[int, int]]): Request.range value.
Returns:
tuple: Three-member tuple of (stream, content-length, content-range).
If req_range is ``None`` or ignored, content-range will be
``None``; otherwise, the stream will be appropriately seeked and
possibly bounded, and the content-range will be a tuple of
(start, end, size).
"""
try:
length, content_range = _get_range(st, req_range)
except falcon.HTTPRangeNotSatisfiable:
fh.close()
raise

if content_range is None:
return fh, length, None

fh.seek(content_range[0])

# NOTE(vytas): Wrap in order to prevent sendfile from being used, as
# its implementation was found to be buggy in many popular WSGI
# servers for open files with a non-zero offset.
return _BoundedFile(fh, length), length, content_range


def _is_not_modified(
Expand Down Expand Up @@ -228,10 +253,13 @@ def __call__(self, req: Request, resp: Response, **kw: Any) -> None:
assert not kw
if req.method == 'OPTIONS':
# it's likely a CORS request. Set the allow header to the appropriate value.
resp.set_header('Allow', 'GET')
resp.set_header('Allow', 'GET, HEAD')
resp.set_header('Content-Length', '0')
return

if req.method not in ('GET', 'HEAD'):
raise falcon.HTTPMethodNotAllowed(['GET', 'HEAD'])

without_prefix = req.path[len(self._prefix) :]

# NOTE(kgriffs): Check surrounding whitespace and strip trailing
Expand Down Expand Up @@ -260,14 +288,24 @@ def __call__(self, req: Request, resp: Response, **kw: Any) -> None:
if '..' in file_path or not file_path.startswith(self._directory):
raise falcon.HTTPNotFound()

if self._fallback_filename is None:
fh, st = _open_file(file_path)
if req.method == 'HEAD':
if self._fallback_filename is None:
st = _stat_file(file_path)
else:
try:
st = _stat_file(file_path)
except falcon.HTTPNotFound:
st = _stat_file(self._fallback_filename)
file_path = self._fallback_filename
else:
try:
if self._fallback_filename is None:
fh, st = _open_file(file_path)
except falcon.HTTPNotFound:
fh, st = _open_file(self._fallback_filename)
file_path = self._fallback_filename
else:
try:
fh, st = _open_file(file_path)
except falcon.HTTPNotFound:
fh, st = _open_file(self._fallback_filename)
file_path = self._fallback_filename

etag = f'{int(st.st_mtime):x}-{st.st_size:x}'
resp.etag = etag
Expand All @@ -280,18 +318,24 @@ def __call__(self, req: Request, resp: Response, **kw: Any) -> None:
resp.last_modified = last_modified

if _is_not_modified(req, etag, last_modified):
fh.close()
if req.method != 'HEAD':
fh.close()
resp.status = falcon.HTTP_304
return

req_range = req.range if req.range_unit == 'bytes' else None
try:
stream, length, content_range = _set_range(fh, st, req_range)
except OSError:
fh.close()
raise falcon.HTTPNotFound()
if req.method == 'HEAD':
length, content_range = _get_range(st, req_range)
resp.content_length = length
else:
try:
stream, length, content_range = _set_range(fh, st, req_range)
except OSError:
fh.close()
raise falcon.HTTPNotFound()

resp.set_stream(stream, length)

resp.set_stream(stream, length)
suffix = os.path.splitext(file_path)[1]
resp.content_type = resp.options.static_media_types.get(
suffix, 'application/octet-stream'
Expand Down
58 changes: 57 additions & 1 deletion tests/test_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,62 @@ def test_file_closed(client, patch_open):
assert patch_open.current_file.closed


def test_head_request_does_not_open_file(client, monkeypatch, tmp_path):
file_path = tmp_path / 'main.css'
file_content = b'body { color: black; }'
file_path.write_bytes(file_content)

open_mock = mock.Mock(side_effect=AssertionError('HEAD must not open file streams'))
monkeypatch.setattr(io, 'open', open_mock)

client.app.add_static_route('/assets/', tmp_path)

resp = client.simulate_head(path='/assets/main.css')

assert resp.status == falcon.HTTP_200
assert resp.text == ''
assert resp.headers['Content-Type'] == 'text/css'
assert int(resp.headers['Content-Length']) == len(file_content)
assert resp.headers['Accept-Ranges'] == 'bytes'
open_mock.assert_not_called()


def test_head_request_honors_range(client, monkeypatch, tmp_path):
file_path = tmp_path / 'main.css'
file_path.write_bytes(b'0123456789')

open_mock = mock.Mock(side_effect=AssertionError('HEAD must not open file streams'))
monkeypatch.setattr(io, 'open', open_mock)

client.app.add_static_route('/assets/', tmp_path)

resp = client.simulate_head(
path='/assets/main.css',
headers={'Range': 'bytes=2-5'},
)

assert resp.status == falcon.HTTP_206
assert resp.text == ''
assert int(resp.headers['Content-Length']) == 4
assert resp.headers['Content-Range'] == 'bytes 2-5/10'
open_mock.assert_not_called()


def test_static_route_method_not_allowed(client, monkeypatch, tmp_path):
(tmp_path / 'main.css').write_bytes(b'body { color: black; }')

open_mock = mock.Mock(side_effect=AssertionError('unsupported methods must not open files'))
monkeypatch.setattr(io, 'open', open_mock)

client.app.add_static_route('/assets/', tmp_path)

resp = client.simulate_post(path='/assets/main.css')

assert resp.status == falcon.HTTP_405
assert resp.headers['Allow'] == 'GET, HEAD'
open_mock.assert_not_called()


def test_options_request(client, patch_open):
patch_open()

Expand All @@ -634,7 +690,7 @@ def test_options_request(client, patch_open):
assert resp.status_code == 200
assert resp.text == ''
assert int(resp.headers['Content-Length']) == 0
assert resp.headers['Access-Control-Allow-Methods'] == 'GET'
assert resp.headers['Access-Control-Allow-Methods'] == 'GET, HEAD'


def test_last_modified(client, patch_open):
Expand Down
Loading