Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions src/python_testing/TC_ICDB_2_1_2_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@

class TC_ICDB_2_1_2_2(ICDBaseTest):

# DUT can take more than one cycle to detect a dropped subscriber and resume check-ins, default_timeout
# is raised to accommodate that (can be overridden by a --timeout on the command line)
@property
def default_timeout(self) -> int:
return 5 * 60

@async_test_body
async def setup_class(self):
super().setup_class()
Expand Down Expand Up @@ -239,9 +245,9 @@ def steps_TC_ICDB_2_2(self) -> list[TestStep]:
"Verify ICDCounter is unchanged from icd_counter_at_subscription. An unchanged counter confirms no check-in was sent."),
TestStep(7, "Wait for a full active-to-idle-to-active ICD transition cycle (IdleModeDuration + ActiveModeDuration). TH writes a new value to the NodeLabel attribute. Verify the existing subscription reports the change.",
"The subscription receives a report for the NodeLabel attribute within MaxInterval."),
TestStep(8, "TH shuts down the subscription. Wait for a full active-to-idle-to-active ICD transition cycle (IdleModeDuration + ActiveModeDuration).", """
DUT resumed sending check-in messages after the subscription was torn down.
Verify ICDCounter is greater than icd_counter_at_subscription, indicating a check-in was sent."""),
TestStep(8, "TH shuts down the subscription, then waits for the DUT to detect the dropped subscriber and resume check-ins.", """
DUT resumes sending check-in messages once it detects the subscription is gone.
Verify ICDCounter increments beyond icd_counter_at_subscription, indicating a check-in was sent."""),
]

def pics_TC_ICDB_2_2(self) -> list[str]:
Expand Down Expand Up @@ -364,27 +370,25 @@ def on_report_end(transaction):
except TimeoutError:
asserts.fail(f"Subscription did not receive report for NodeLabel within MaxInterval ({max_interval_s}s)")

subscription.Shutdown()

# *** STEP 8 ***
# TH shuts down the subscription. Wait for a full active-to-idle-to-active ICD transition cycle (IdleModeDuration + ActiveModeDuration).
# - DUT resumed sending check-in messages. Confirmed by ICDCounter increment.
# TH shuts down the subscription, then waits for the DUT to detect the dropped subscriber and resume check-ins
# - DUT resumes sending check-in messages once it detects the subscription is gone.
self.step(8)
subscription.Shutdown()
log.info("Subscription shut down.")
await self.wait_for_transition(ICDTransition.FullCycle,
active_mode_duration_ms=active_mode_duration_ms,
idle_mode_duration_s=idle_mode_duration_s)

# Verify ICDCounter is greater than icd_counter_at_subscription, indicating a check-in was sent
wait_s = self.checkin_resume_wait_s(max_interval_s=max_interval_s,
active_mode_duration_ms=active_mode_duration_ms,
idle_mode_duration_s=idle_mode_duration_s)
await asyncio.sleep(wait_s)

# Verify ICDCounter incremented beyond icd_counter_at_subscription (check-in resumed)
icd_counter_after_shutdown = await self.read_icdm_attribute_expect_success(attributes.ICDCounter)
asserts.assert_greater(icd_counter_after_shutdown, icd_counter_at_subscription,
f"ICDCounter must increment after subscription shutdown (check-in resumed). "
f"Before: {icd_counter_at_subscription}, After: {icd_counter_after_shutdown}")

# Cleanup: unregister TH so the RegisteredClients list is clean after the test
await self.send_single_icdm_command(commands.UnregisterClient(checkInNodeID=TH.nodeId))
log.info("UnregisterClient cleanup SUCCESS for checkInNodeID=%s", TH.nodeId)
await self.unregister_all_clients()


if __name__ == "__main__":
Expand Down
113 changes: 45 additions & 68 deletions src/python_testing/TC_ICDB_2_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@

import asyncio
import logging
import os

from mobly import asserts
from support_modules.icd_support import ICDBaseTest, ICDTransition, assert_subscription_heartbeat_received
from support_modules.icd_support import ICDBaseTest, assert_subscription_heartbeat_received

import matter.clusters as Clusters
from matter import ChipDeviceCtrl
Expand Down Expand Up @@ -82,22 +81,26 @@
cluster = Clusters.Objects.IcdManagement
attributes = cluster.Attributes
commands = cluster.Commands
ClientTypeEnum = cluster.Enums.ClientTypeEnum

ONE_HOUR_S = 3600


class TC_ICDB_2_4(ICDBaseTest):

# DUT can take more than one cycle to detect a dropped subscriber and resume check-ins, default_timeout
# is raised to accommodate that (can be overridden by a --timeout on the command line)
@property
def default_timeout(self) -> int:
Comment thread
raul-marquez-csa marked this conversation as resolved.
return 5 * 60

@async_test_body
async def setup_class(self):
# *** PRECONDITION ***
# Commission DUT to TH1 with ICD registration
# Commission DUT to TH2 with ICD registration
super().setup_class()

# TH1 commissions DUT (ICD registration done explicitly via RegisterClient in the test body)
# TH1 commissions DUT with ICD registration (registers as a kPermanent ICD client during
# commissioning, which also stores the symmetric key client-side so received check-ins decrypt).
self.th1 = self.default_controller
self.th1.EnableICDRegistration(self.th1.GenerateICDRegistrationParameters())
Comment thread
raul-marquez-csa marked this conversation as resolved.
setup_payload_info = get_setup_payload_info_config(self.matter_test_config)
info = setup_payload_info[0]
commissioning_info = CommissioningInfo(
Expand All @@ -112,8 +115,8 @@ async def setup_class(self):
status = await commission_device(self.th1, self.dut_node_id, info, commissioning_info)
asserts.assert_true(status, f"Failed to commission DUT to TH1's fabric: {status}")

# TH2 commissions DUT on a separate fabric (ICD registration done explicitly via RegisterClient in the test body)
self.th2 = self.create_new_controller()
# TH2 commissions DUT on a separate fabric with ICD registration (same client receive-stack arming as TH1)
self.th2 = self.create_new_controller(enable_icd_registration=True)

# TH2 commissions DUT
ecw = await self.open_commissioning_window(dev_ctrl=self.th1, node_id=self.dut_node_id, timeout=600)
Expand All @@ -136,11 +139,11 @@ def desc_TC_ICDB_2_4(self) -> str:
def steps_TC_ICDB_2_4(self) -> list[TestStep]:
return [
TestStep("precondition", "Commission DUT to TH1 and TH2."),
TestStep(1, "TH1 sends the RegisterClient command to the DUT with TH1's node ID as checkInNodeID and monitoredSubject. TH1 reads the RegisteredClients attribute.", """
TestStep(1, "TH1 reads the RegisteredClients attribute (TH1 was registered as an ICD client during commissioning).", """
TH1 is registered as an ICD client on the DUT.
Verify exactly one RegisteredClients entry is present.
Verify that the RegisteredClients entry's checkInNodeID and monitoredSubject match TH1's node ID."""),
TestStep(2, "TH2 sends the RegisterClient command to the DUT with TH2's node ID as checkInNodeID and monitoredSubject. TH2 reads the RegisteredClients attribute.", """
TestStep(2, "TH2 reads the RegisteredClients attribute (TH2 was registered as an ICD client during commissioning).", """
TH2 is registered as an ICD client on the DUT.
Verify exactly one RegisteredClients entry is present.
Verify that the RegisteredClients entry's checkInNodeID and monitoredSubject match TH2's node ID."""),
Expand All @@ -152,14 +155,14 @@ def steps_TC_ICDB_2_4(self) -> list[TestStep]:
No check-in message is sent to TH1 or TH2 while subscriptions are active.
Verify TH1 and TH2 each receive a subscription report within MaxInterval.
Verify ICDCounter is unchanged, confirming no check-in messages were sent."""),
TestStep(6, "Deactivate the subscription between DUT and TH1, and wait for 1 full active-idle-active cycle.", """
DUT starts sending check-in messages to TH1
Verify ICDCounter has incremented for TH1 after subscription shutdown and waiting for 1 full active-idle-active cycle.
TestStep(6, "Deactivate the subscription between DUT and TH1, then wait for the DUT to detect the dropped subscriber and resume check-ins to TH1.", """
DUT starts sending check-in messages to TH1.
Verify TH1 receives a check-in from the DUT within the detection-aware timeout.
Verify TH2 receives a subscription report within MaxInterval."""),
TestStep(7, "Deactivate subscriptions between DUT and TH2, and wait for 1 full active-idle-active cycle.", """
DUT starts sending check-in messages both to TH1 and TH2
ICDCounter increments once per check-in per client, so expecting at least 2 increments
Verify ICDCounter has incremented by at least 2 after TH1 and TH2 subscriptions are shutdown and waiting for 1 full active-idle-active cycle."""),
TestStep(7, "Deactivate the subscription between DUT and TH2, then wait for the DUT to resume check-ins on both fabrics.", """
DUT sends check-in messages to both TH1 and TH2 once no subscription covers them.
Verify TH1 receives a check-in from the DUT within the detection-aware timeout.
Verify TH2 receives a check-in from the DUT within the detection-aware timeout."""),
]

def pics_TC_ICDB_2_4(self) -> list[str]:
Expand All @@ -176,17 +179,9 @@ async def test_TC_ICDB_2_4(self):
self.step("precondition")

# *** STEP 1 ***
# TH1 sends the RegisterClient command to the DUT with TH1's node ID as checkInNodeID and monitoredSubject.
# TH1 reads the RegisteredClients attribute (TH1 was registered as an ICD client during commissioning)
self.step(1)
th1_checkin_key = os.urandom(16)
th1_check_in_node_id = self.th1.nodeId
await self.send_single_icdm_command(commands.RegisterClient(
checkInNodeID=th1_check_in_node_id,
monitoredSubject=th1_check_in_node_id,
key=th1_checkin_key,
clientType=ClientTypeEnum.kPermanent,
))
log.info("TH1 RegisterClient SUCCESS for checkInNodeID=%s", th1_check_in_node_id)

# TH1 reads RegisteredClients to verify registration
th1_registered_clients = await self.read_icdm_attribute_expect_success(attributes.RegisteredClients)
Expand All @@ -203,17 +198,9 @@ async def test_TC_ICDB_2_4(self):
f"monitoredSubject ({rc_th1.monitoredSubject}) must match TH1's node ID ({th1_check_in_node_id})")

# *** STEP 2 ***
# TH2 sends the RegisterClient command to the DUT with TH2's node ID as checkInNodeID and monitoredSubject.
# TH2 reads the RegisteredClients attribute (TH2 was registered as an ICD client during commissioning)
self.step(2)
th2_checkin_key = os.urandom(16)
th2_check_in_node_id = self.th2.nodeId
await self.send_single_icdm_command(commands.RegisterClient(
checkInNodeID=th2_check_in_node_id,
monitoredSubject=th2_check_in_node_id,
key=th2_checkin_key,
clientType=ClientTypeEnum.kPermanent,
), controller=self.th2, node_id=self.th2_dut_node_id)
log.info("TH2 RegisterClient SUCCESS for checkInNodeID=%s", th2_check_in_node_id)

# TH2 reads RegisteredClients to verify registration
th2_registered_clients = await self.read_icdm_attribute_expect_success(
Expand All @@ -232,7 +219,7 @@ async def test_TC_ICDB_2_4(self):
f"monitoredSubject ({rc_th2.monitoredSubject}) must match TH2's node ID ({th2_check_in_node_id})")

# *** STEP 3 ***
# TH1 reads from the DUT the IdleModeDuration, ActiveModeDuration, and ActiveModeThreshold attributes
# TH1 reads from the DUT the IdleModeDuration, ActiveModeDuration, ActiveModeThreshold, and MaximumCheckInBackoff attributes
Comment thread
raul-marquez-csa marked this conversation as resolved.
self.step(3)
idle_mode_duration_s = await self.read_icdm_attribute_expect_success(attributes.IdleModeDuration)
active_mode_duration_ms = await self.read_icdm_attribute_expect_success(attributes.ActiveModeDuration)
Expand Down Expand Up @@ -304,47 +291,37 @@ async def test_TC_ICDB_2_4(self):
)

# *** STEP 6 ***
# Deactivate the subscription between DUT and TH1, and wait for 1 full active-idle-active cycle
# Deactivate the subscription between DUT and TH1, then wait for the DUT to detect the dropped
# subscriber and resume check-ins to TH1
# - DUT starts sending check-in messages to TH1
self.step(6)

# Verify ICDCounter has incremented for TH1 after subscription shutdown
# and waiting for 1 full active-idle-active cycle
icd_counter_before_th1_shutdown = await self.read_icdm_attribute_expect_success(attributes.ICDCounter)
th1_subscription.Shutdown()
await self.wait_for_transition(
ICDTransition.FullCycle,
active_mode_duration_ms=active_mode_duration_ms,
idle_mode_duration_s=idle_mode_duration_s)
icd_counter_after_th1_shutdown = await self.read_icdm_attribute_expect_success(attributes.ICDCounter)
asserts.assert_greater(icd_counter_after_th1_shutdown, icd_counter_before_th1_shutdown,
f"ICDCounter should have incremented after TH1 subscription shutdown. "
f"Before: {icd_counter_before_th1_shutdown}, After: {icd_counter_after_th1_shutdown}")
th1_checkin_timeout_s = self.checkin_resume_wait_s(
max_interval_s=th1_max_interval_s,
active_mode_duration_ms=active_mode_duration_ms, idle_mode_duration_s=idle_mode_duration_s)

# Verify TH1 receives a check-in from the DUT once its subscription is dropped
await self.assert_checkin_received(self.th1, self.dut_node_id, th1_checkin_timeout_s)

# Verify TH2 subscription is still active
await assert_subscription_heartbeat_received(th2_subscription, th2_max_interval_s)

# *** STEP 7 ***
# Deactivate subscriptions between DUT and TH2, and wait for 1 full active-idle-active cycle
# - DUT starts sending check-in messages both to TH1 and TH2
# - ICDCounter increments once per check-in per client, so expecting at least 2 increments
# Deactivate the subscription between DUT and TH2, then wait for the DUT to resume check-ins on both fabrics
# - DUT sends check-in messages to both TH1 and TH2 once no subscription covers them
self.step(7)

# Verify ICDCounter has incremented by at least 2 after TH1 and TH2 subscriptions are shutdown
# and waiting for 1 full active-idle-active cycle
icd_counter_before_th2_shutdown = await self.read_icdm_attribute_expect_success(attributes.ICDCounter)
th2_subscription.Shutdown()
await self.wait_for_transition(ICDTransition.FullCycle,
active_mode_duration_ms=active_mode_duration_ms,
idle_mode_duration_s=idle_mode_duration_s)
icd_counter_after_th2_shutdown = await self.read_icdm_attribute_expect_success(attributes.ICDCounter)
icd_counter_increment = icd_counter_after_th2_shutdown - icd_counter_before_th2_shutdown
asserts.assert_greater_equal(icd_counter_increment, 2,
f"ICDCounter should have incremented by at least 2 (one check-in per client: TH1 and TH2)."
f"Before: {icd_counter_before_th2_shutdown}, After: {icd_counter_after_th2_shutdown}, "
f"Increment: {icd_counter_increment}")

log.info("TH1 and TH2 subscriptions have been shut down; ICDCounter checks above verify transition back to check-in state.")
th2_checkin_timeout_s = self.checkin_resume_wait_s(
max_interval_s=th2_max_interval_s,
active_mode_duration_ms=active_mode_duration_ms, idle_mode_duration_s=idle_mode_duration_s)

# Verify TH1 and TH2 each receive a check-in from the DUT once no subscription covers them
await asyncio.gather(
self.assert_checkin_received(self.th1, self.dut_node_id, th1_checkin_timeout_s),
self.assert_checkin_received(self.th2, self.th2_dut_node_id, th2_checkin_timeout_s),
)

log.info("TH1 and TH2 each received a check-in; DUT has resumed check-in state on both fabrics.")


if __name__ == "__main__":
Expand Down
32 changes: 32 additions & 0 deletions src/python_testing/support_modules/icd_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from mobly import asserts

import matter.clusters as Clusters
from matter.ChipDeviceCtrl import ScopedNodeId, WaitForCheckIn
from matter.interaction_model import InteractionModelError
from matter.testing.matter_testing import MatterBaseTest

Expand Down Expand Up @@ -261,6 +262,37 @@
log.info("Waiting %ss for %s...", wait_s, transition.name)
await asyncio.sleep(wait_s)

def checkin_resume_wait_s(self, *, max_interval_s: float,
active_mode_duration_ms: int, idle_mode_duration_s: int) -> float:
"""Seconds to allow for the DUT to resume check-ins after a subscriber drops locally.

A TH subscription Shutdown() sends nothing to the DUT: it only notices the subscriber is
gone when its next periodic report (up to one MaxInterval away) fails to deliver and the
subscription is torn down, then it checks in on the following idle-to-active transition.
Allow one full cycle for that detection plus one full cycle for the check-in to fire. The
Comment thread
raul-marquez-csa marked this conversation as resolved.
Outdated
wait must stay silent (no polling), so the DUT can actually go idle and cycle. Derived from
the DUT's own timing values, so there are no hard-coded durations.
"""
full_cycle_s = self.compute_wait_time(ICDTransition.FullCycle,
active_mode_duration_ms=active_mode_duration_ms,
idle_mode_duration_s=idle_mode_duration_s)
wait_s = max_interval_s + 2 * full_cycle_s
Comment thread
raul-marquez-csa marked this conversation as resolved.
Outdated
log.info(f"Check-in resume wait: MaxInterval={max_interval_s}s + 2 x FullCycle={full_cycle_s:.1f}s = {wait_s:.1f}s")

Check failure on line 280 in src/python_testing/support_modules/icd_support.py

View workflow job for this annotation

GitHub Actions / code-lints

ruff (G004)

src/python_testing/support_modules/icd_support.py:280:18: G004 Logging statement uses f-string help: Convert to lazy `%` formatting
return wait_s

async def assert_checkin_received(self, dev_ctrl, node_id: int, timeout_s: float) -> None:
"""Assert the DUT delivers a check-in to TH's fabric within timeout_s.

Requires TH to have registered as an ICD client during commissioning
(EnableICDRegistration), so the received check-in can be decrypted.
"""
scoped_node_id = ScopedNodeId(node_id, dev_ctrl.GetFabricIndexInternal())
try:
await WaitForCheckIn(scoped_node_id, timeoutSeconds=timeout_s)
Comment thread
raul-marquez-csa marked this conversation as resolved.
except TimeoutError:
asserts.fail(f"DUT did not send a check-in to node 0x{node_id:016X} within {timeout_s:.1f}s")
log.info(f"Check-in received from DUT for node 0x{node_id:016X}")

Check failure on line 294 in src/python_testing/support_modules/icd_support.py

View workflow job for this annotation

GitHub Actions / code-lints

ruff (G004)

src/python_testing/support_modules/icd_support.py:294:18: G004 Logging statement uses f-string help: Convert to lazy `%` formatting

def create_new_controller(
self,
*,
Expand Down
Loading