-
Notifications
You must be signed in to change notification settings - Fork 295
Expand file tree
/
Copy pathurllib3_fetcher.py
More file actions
178 lines (146 loc) · 5.7 KB
/
urllib3_fetcher.py
File metadata and controls
178 lines (146 loc) · 5.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# Copyright 2021, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""Provides an implementation of ``FetcherInterface`` using the urllib3 HTTP
library.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
# Imports
import urllib3
from urllib3.util.retry import Retry
import tuf
from tuf.api import exceptions
from tuf.ngclient._internal.proxy import ProxyEnvironment
from tuf.ngclient.fetcher import FetcherInterface
if TYPE_CHECKING:
from collections.abc import Iterator
# Globals
logger = logging.getLogger(__name__)
# Classes
class Urllib3Fetcher(FetcherInterface):
"""An implementation of ``FetcherInterface`` based on the urllib3 library.
Attributes:
socket_timeout: Timeout in seconds, used for both initial connection
delay and the maximum delay between bytes received.
chunk_size: Chunk size in bytes used when downloading.
"""
def __init__(
self,
socket_timeout: int = 30,
chunk_size: int = 400000,
app_user_agent: str | None = None,
) -> None:
# Default settings
self.socket_timeout: int = socket_timeout # seconds
self.chunk_size: int = chunk_size # bytes
# Create User-Agent.
ua = f"python-tuf/{tuf.__version__}"
if app_user_agent is not None:
ua = f"{app_user_agent} {ua}"
# Configure retry strategy: retry on read timeouts and connection errors
# This enables retries for streaming failures, not just initial connection
retry_strategy = Retry(
total=3,
read=3,
connect=3,
status_forcelist=[500, 502, 503, 504],
raise_on_status=False,
)
self._proxy_env = ProxyEnvironment(
headers={"User-Agent": ua}, retries=retry_strategy
)
def _fetch(self, url: str) -> Iterator[bytes]:
"""Fetch the contents of HTTP/HTTPS url from a remote server.
Args:
url: URL string that represents a file location.
Raises:
exceptions.SlowRetrievalError: Timeout occurs while receiving
data.
exceptions.DownloadHTTPError: HTTP error code is received.
Returns:
Bytes iterator
"""
# Defer downloading the response body with preload_content=False.
# Always set the timeout. This timeout value is interpreted by
# urllib3 as:
# - connect timeout (max delay before first byte is received)
# - read (gap) timeout (max delay between bytes received)
try:
response = self._proxy_env.request(
"GET",
url,
preload_content=False,
timeout=urllib3.Timeout(self.socket_timeout),
)
except urllib3.exceptions.MaxRetryError as e:
if isinstance(e.reason, urllib3.exceptions.TimeoutError):
raise exceptions.SlowRetrievalError from e
raise
if response.status >= 400:
response.close()
raise exceptions.DownloadHTTPError(
f"HTTP error occurred with status {response.status}",
response.status,
)
return self._chunks(response)
def _chunks(
self, response: urllib3.response.BaseHTTPResponse
) -> Iterator[bytes]:
"""A generator function to be returned by fetch.
This way the caller of fetch can differentiate between connection
and actual data download.
"""
try:
yield from response.stream(self.chunk_size)
except urllib3.exceptions.MaxRetryError as e:
if isinstance(e.reason, urllib3.exceptions.TimeoutError):
raise exceptions.SlowRetrievalError from e
raise
except (
urllib3.exceptions.ReadTimeoutError,
urllib3.exceptions.ProtocolError,
) as e:
raise exceptions.SlowRetrievalError from e
finally:
response.release_conn()
def download_bytes(self, url: str, max_length: int) -> bytes:
"""Download bytes from given ``url`` with retry on streaming failures.
This override adds retry logic for mid-stream timeout and connection
errors that are not automatically retried by urllib3.
Args:
url: URL string that represents the location of the file.
max_length: Upper bound of data size in bytes.
Raises:
exceptions.DownloadError: An error occurred during download.
exceptions.DownloadLengthMismatchError: Downloaded bytes exceed
``max_length``.
exceptions.DownloadHTTPError: An HTTP error code was received.
Returns:
Content of the file in bytes.
"""
max_retries = 3
last_exception: Exception | None = None
for attempt in range(max_retries):
try:
return super().download_bytes(url, max_length)
except exceptions.SlowRetrievalError as e:
last_exception = e
if attempt < max_retries - 1:
logger.debug(
"Retrying download after streaming error "
"(attempt %d/%d): %s",
attempt + 1,
max_retries,
url,
)
continue
raise
except (
exceptions.DownloadHTTPError,
exceptions.DownloadLengthMismatchError,
):
raise
if last_exception:
raise last_exception
raise exceptions.DownloadError(f"Failed to download {url}")