Skip to content

Commit 6a94288

Browse files
author
Martin Vrachev
committed
ngclient TrustedMetadataSet: improve unit testing
The current situation with the TrustedMetadataSet testing is that we don't have a mnimimal amount of unit tests testing the different branches in the various API functionality in the class. This commit proposes simple unit tests covering almost all of the branches in the API functions and increasing the unit test coverage (as reported from the "coverage" tool) from 74 % to 97 %. The code could be complicated at places, because the different branches in the update_* functions depend on other metadata classes as well. Still, I hope we can find a way and simplify the code. Signed-off-by: Martin Vrachev <[email protected]>
1 parent 726af73 commit 6a94288

1 file changed

Lines changed: 209 additions & 35 deletions

File tree

tests/test_trusted_metadata_set.py

Lines changed: 209 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
1+
import copy
12
import json
23
import logging
34
import os
4-
import shutil
55
import sys
6-
import tempfile
76
import unittest
7+
from typing import Dict, Any
8+
from datetime import datetime
89

910
from tuf import exceptions
1011
from tuf.api.metadata import Metadata
11-
from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet
12+
from tuf.ngclient._internal.trusted_metadata_set import(
13+
TrustedMetadataSet
14+
)
15+
from securesystemslib.signer import SSlibSigner
16+
from securesystemslib.interface import(
17+
import_ed25519_privatekey_from_file,
18+
import_rsa_privatekey_from_file
19+
)
1220

1321
from tests import utils
1422

@@ -26,64 +34,92 @@ def setUpClass(cls):
2634
with open(os.path.join(cls.repo_dir, f"{md}.json"), "rb") as f:
2735
cls.metadata[md] = f.read()
2836

37+
keystore_dir = os.path.join(os.getcwd(), 'repository_data', 'keystore')
38+
cls.keystore = {}
39+
root_key_dict = import_rsa_privatekey_from_file(
40+
os.path.join(keystore_dir, "root" + '_key'),
41+
password="password"
42+
)
43+
cls.keystore["root"] = SSlibSigner(root_key_dict)
44+
for role in ["delegation", "snapshot", "targets", "timestamp"]:
45+
key_dict = import_ed25519_privatekey_from_file(
46+
os.path.join(keystore_dir, role + '_key'),
47+
password="password"
48+
)
49+
cls.keystore[role] = SSlibSigner(key_dict)
2950

30-
def test_update(self):
31-
trusted_set = TrustedMetadataSet(self.metadata["root"])
32-
trusted_set.root_update_finished()
51+
def setUp(self) -> None:
52+
self.trusted_set = TrustedMetadataSet(self.metadata["root"])
3353

34-
trusted_set.update_timestamp(self.metadata["timestamp"])
35-
trusted_set.update_snapshot(self.metadata["snapshot"])
36-
trusted_set.update_targets(self.metadata["targets"])
37-
trusted_set.update_delegated_targets(
54+
def _root_update_finished_and_update_timestamp(self):
55+
self.trusted_set.root_update_finished()
56+
self.trusted_set.update_timestamp(self.metadata["timestamp"])
57+
58+
def _update_all_besides_targets(self):
59+
self.trusted_set.root_update_finished()
60+
self.trusted_set.update_timestamp(self.metadata["timestamp"])
61+
self.trusted_set.update_snapshot(self.metadata["snapshot"])
62+
63+
def test_update(self):
64+
self.trusted_set.root_update_finished()
65+
self.trusted_set.update_timestamp(self.metadata["timestamp"])
66+
self.trusted_set.update_snapshot(self.metadata["snapshot"])
67+
self.trusted_set.update_targets(self.metadata["targets"])
68+
self.trusted_set.update_delegated_targets(
3869
self.metadata["role1"], "role1", "targets"
3970
)
40-
trusted_set.update_delegated_targets(
71+
self.trusted_set.update_delegated_targets(
4172
self.metadata["role2"], "role2", "role1"
4273
)
74+
# the 4 top level metadata objects + 2 additional delegated targets
75+
self.assertTrue(len(self.trusted_set), 6)
4376

4477
def test_out_of_order_ops(self):
45-
trusted_set = TrustedMetadataSet(self.metadata["root"])
46-
4778
# Update timestamp before root is finished
4879
with self.assertRaises(RuntimeError):
49-
trusted_set.update_timestamp(self.metadata["timestamp"])
80+
self.trusted_set.update_timestamp(self.metadata["timestamp"])
81+
82+
self.trusted_set.root_update_finished()
83+
with self.assertRaises(RuntimeError):
84+
self.trusted_set.root_update_finished()
5085

51-
trusted_set.root_update_finished()
86+
# Update root after a previous successful root update
5287
with self.assertRaises(RuntimeError):
53-
trusted_set.root_update_finished()
88+
self.trusted_set.update_root(self.metadata["root"])
5489

5590
# Update snapshot before timestamp
5691
with self.assertRaises(RuntimeError):
57-
trusted_set.update_snapshot(self.metadata["snapshot"])
92+
self.trusted_set.update_snapshot(self.metadata["snapshot"])
5893

59-
trusted_set.update_timestamp(self.metadata["timestamp"])
94+
self.trusted_set.update_timestamp(self.metadata["timestamp"])
6095

6196
# Update targets before snapshot
6297
with self.assertRaises(RuntimeError):
63-
trusted_set.update_targets(self.metadata["targets"])
98+
self.trusted_set.update_targets(self.metadata["targets"])
6499

65-
trusted_set.update_snapshot(self.metadata["snapshot"])
100+
self.trusted_set.update_snapshot(self.metadata["snapshot"])
66101

67102
# update timestamp after snapshot
68103
with self.assertRaises(RuntimeError):
69-
trusted_set.update_timestamp(self.metadata["timestamp"])
104+
self.trusted_set.update_timestamp(self.metadata["timestamp"])
70105

71106
# Update delegated targets before targets
72107
with self.assertRaises(RuntimeError):
73-
trusted_set.update_delegated_targets(
108+
self.trusted_set.update_delegated_targets(
74109
self.metadata["role1"], "role1", "targets"
75110
)
76111

77-
trusted_set.update_targets(self.metadata["targets"])
78-
trusted_set.update_delegated_targets(
79-
self.metadata["role1"], "role1", "targets"
80-
)
112+
self.trusted_set.update_targets(self.metadata["targets"])
81113

82-
trusted_set.update_targets(self.metadata["targets"])
83-
trusted_set.update_delegated_targets(
114+
# Update snapshot after sucessful targets update
115+
with self.assertRaises(RuntimeError):
116+
self.trusted_set.update_snapshot(self.metadata["snapshot"])
117+
118+
self.trusted_set.update_delegated_targets(
84119
self.metadata["role1"], "role1", "targets"
85120
)
86121

122+
87123
def test_update_with_invalid_json(self):
88124
# root.json not a json file at all
89125
with self.assertRaises(exceptions.RepositoryError):
@@ -94,20 +130,27 @@ def test_update_with_invalid_json(self):
94130
with self.assertRaises(exceptions.RepositoryError):
95131
TrustedMetadataSet(json.dumps(root.to_dict()).encode())
96132

97-
trusted_set = TrustedMetadataSet(self.metadata["root"])
98-
trusted_set.root_update_finished()
133+
# update_root called with the wrong metadata type
134+
with self.assertRaises(exceptions.RepositoryError):
135+
self.trusted_set.update_root(self.metadata["snapshot"])
136+
137+
self.trusted_set.root_update_finished()
99138

100139
top_level_md = [
101-
(self.metadata["timestamp"], trusted_set.update_timestamp),
102-
(self.metadata["snapshot"], trusted_set.update_snapshot),
103-
(self.metadata["targets"], trusted_set.update_targets),
140+
(self.metadata["timestamp"], self.trusted_set.update_timestamp),
141+
(self.metadata["snapshot"], self.trusted_set.update_snapshot),
142+
(self.metadata["targets"], self.trusted_set.update_targets),
104143
]
105144
for metadata, update_func in top_level_md:
145+
md = Metadata.from_bytes(metadata)
146+
if md.signed.type == "snapshot":
147+
# timestamp hashes and length intervene when testing snapshot
148+
self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None
149+
self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None
106150
# metadata is not json
107151
with self.assertRaises(exceptions.RepositoryError):
108152
update_func(b"")
109153
# metadata is invalid
110-
md = Metadata.from_bytes(metadata)
111154
md.signed.version += 1
112155
with self.assertRaises(exceptions.RepositoryError):
113156
update_func(json.dumps(md.to_dict()).encode())
@@ -119,8 +162,139 @@ def test_update_with_invalid_json(self):
119162
update_func(metadata)
120163

121164

165+
def test_update_root_new_root_cannot_be_verified_with_threshold(self):
166+
# new_root data with threshold which cannot be verified.
167+
modified_threshold_data = copy.deepcopy(
168+
json.loads(self.metadata["root"])
169+
)
170+
# change something in root so signature doesn't match the content.
171+
modified_threshold_data["signed"]["roles"]["root"]["version"] = 2
172+
modified_threshold_data = json.dumps(modified_threshold_data).encode()
173+
with self.assertRaises(exceptions.UnsignedMetadataError):
174+
self.trusted_set.update_root(modified_threshold_data)
175+
176+
def test_update_root_new_root_ver_same_as_trusted_root_ver(self):
177+
with self.assertRaises(exceptions.ReplayedMetadataError):
178+
self.trusted_set.update_root(self.metadata["root"])
179+
180+
181+
def test_root_update_finished_expired(self):
182+
root = Metadata.from_bytes(self.metadata["root"])
183+
root.signed.expires = datetime(1970, 1, 1)
184+
root.sign(self.keystore["root"])
185+
modified_root_data = json.dumps(root.to_dict()).encode()
186+
tmp_trusted_set = TrustedMetadataSet(modified_root_data)
187+
# call root_update_finished when trusted root has expired
188+
with self.assertRaises(exceptions.ExpiredMetadataError):
189+
tmp_trusted_set.root_update_finished()
190+
191+
192+
def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self):
193+
self._root_update_finished_and_update_timestamp()
194+
# new_timestamp.version < trusted_timestamp.version
195+
self.trusted_set.timestamp.signed.version = 2
196+
with self.assertRaises(exceptions.ReplayedMetadataError):
197+
self.trusted_set.update_timestamp(self.metadata["timestamp"])
198+
199+
def test_update_timestamp_snapshot_ver_below_trusted_snapshot_ver(self):
200+
self._root_update_finished_and_update_timestamp()
201+
# new_timestamp.snapshot.version < trusted_timestamp.snapshot.version
202+
self.trusted_set.timestamp.signed.meta["snapshot.json"].version = 2
203+
with self.assertRaises(exceptions.ReplayedMetadataError):
204+
self.trusted_set.update_timestamp(self.metadata["timestamp"])
205+
206+
def test_update_timestamp_expired(self):
207+
self._root_update_finished_and_update_timestamp()
208+
# new_timestamp has expired
209+
timestamp = Metadata.from_bytes(self.metadata["timestamp"])
210+
timestamp.signed.expires = datetime(1970, 1, 1)
211+
timestamp.sign(self.keystore["timestamp"])
212+
new_timestamp_byte_data = json.dumps(timestamp.to_dict()).encode()
213+
with self.assertRaises(exceptions.ExpiredMetadataError):
214+
self.trusted_set.update_timestamp(new_timestamp_byte_data)
215+
216+
217+
def test_update_snapshot_cannot_verify_snapshot_with_threshold(self):
218+
self._root_update_finished_and_update_timestamp()
219+
# remove keyids representing snapshot signatures from root data
220+
self.trusted_set.root.signed.roles["snapshot"].keyids = []
221+
with self.assertRaises(exceptions.UnsignedMetadataError):
222+
self.trusted_set.update_snapshot(self.metadata["snapshot"])
223+
224+
def test_update_snapshot_version_different_timestamp_snapshot_version(self):
225+
self._root_update_finished_and_update_timestamp()
226+
# new_snapshot.version != trusted timestamp.meta["snapshot"].version
227+
self.trusted_set.timestamp.signed.meta["snapshot.json"].version = 2
228+
with self.assertRaises(exceptions.BadVersionNumberError):
229+
self.trusted_set.update_snapshot(self.metadata["snapshot"])
230+
231+
def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self):
232+
self._update_all_besides_targets()
233+
# Test removing a meta_file in new_snapshot compared to the old snapshot
234+
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
235+
snapshot.signed.meta = {}
236+
snapshot.sign(self.keystore["snapshot"])
237+
self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None
238+
self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None
239+
modified_snapshot_data = json.dumps(snapshot.to_dict()).encode()
240+
with self.assertRaises(exceptions.RepositoryError):
241+
self.trusted_set.update_snapshot(modified_snapshot_data)
242+
243+
def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_different(self):
244+
self._update_all_besides_targets()
245+
# snapshot.meta["project1"].version != new_snapshot.meta["project1"].version
246+
for metafile in self.trusted_set.snapshot.signed.meta.values():
247+
metafile.version += 1
248+
with self.assertRaises(exceptions.BadVersionNumberError):
249+
self.trusted_set.update_snapshot(self.metadata["snapshot"])
250+
251+
def test_update_snapshot_after_succesfull_expired_new_snapshot(self):
252+
self._update_all_besides_targets()
253+
# new_snapshot has expired
254+
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
255+
snapshot.signed.expires = datetime(1970, 1, 1)
256+
snapshot.sign(self.keystore["snapshot"])
257+
self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None
258+
self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None
259+
modified_snapshot_data = json.dumps(snapshot.to_dict()).encode()
260+
with self.assertRaises(exceptions.ExpiredMetadataError):
261+
self.trusted_set.update_snapshot(modified_snapshot_data)
262+
263+
264+
def test_update_targets_no_meta_in_snapshot(self):
265+
self._update_all_besides_targets()
266+
# remove meta information with information about targets from snapshot
267+
self.trusted_set.snapshot.signed.meta = {}
268+
with self.assertRaises(exceptions.RepositoryError):
269+
self.trusted_set.update_targets(self.metadata["targets"])
270+
271+
def test_update_targets_hash_different_than_snapshot_meta_hash(self):
272+
self._update_all_besides_targets()
273+
# observed_hash != stored hash in snapshot meta for targets
274+
for target_path in self.trusted_set.snapshot.signed.meta.keys():
275+
self.trusted_set.snapshot.signed.meta[target_path].hashes = {"sha256": "b"}
276+
with self.assertRaises(exceptions.RepositoryError):
277+
self.trusted_set.update_targets(self.metadata["targets"])
278+
279+
def test_update_targets_version_different_snapshot_meta_version(self):
280+
self._update_all_besides_targets()
281+
# new_delegate.signed.version != meta.version stored in snapshot
282+
for target_path in self.trusted_set.snapshot.signed.meta.keys():
283+
self.trusted_set.snapshot.signed.meta[target_path].version = 2
284+
with self.assertRaises(exceptions.BadVersionNumberError):
285+
self.trusted_set.update_targets(self.metadata["targets"])
286+
287+
def test_update_targets_expired_new_target(self):
288+
self._update_all_besides_targets()
289+
# new_delegated_target has expired
290+
targets = Metadata.from_bytes(self.metadata["targets"])
291+
targets.signed.expires = datetime(1970, 1, 1)
292+
targets.sign(self.keystore["targets"])
293+
modified_targets_data = json.dumps(targets.to_dict()).encode()
294+
with self.assertRaises(exceptions.ExpiredMetadataError):
295+
self.trusted_set.update_targets(modified_targets_data)
296+
122297
# TODO test updating over initial metadata (new keys, newer timestamp, etc)
123-
# TODO test the actual specification checks
124298

125299

126300
if __name__ == '__main__':

0 commit comments

Comments
 (0)