Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions openthread_border_router/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__/
.DS_Store
242 changes: 185 additions & 57 deletions openthread_border_router/rootfs/usr/local/bin/migrate_otbr_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import zigpy.serial
from pathlib import Path
from serialx import PinState
import logging

from enum import Enum
from universal_silabs_flasher.spinel import (
Expand All @@ -14,9 +15,12 @@
ResetReason,
)

_LOGGER = logging.getLogger(__name__)

CONNECT_TIMEOUT = 10
AFTER_DISCONNECT_DELAY = 1
SETTINGS_FILE_PATTERN = re.compile(r"^\d+_[0-9a-f]+\.data$")
MIN_ACTIVE_DATASET_SIZE = 32


class OtbrSettingsKey(Enum):
Expand All @@ -35,28 +39,46 @@ class OtbrSettingsKey(Enum):
BORDER_AGENT_ID = 0x0011


MIGRATED_KEYS = frozenset(
{
OtbrSettingsKey.ACTIVE_DATASET,
OtbrSettingsKey.PENDING_DATASET,
OtbrSettingsKey.CHILD_INFO,
}
)


def parse_otbr_settings(data: bytes) -> list[tuple[OtbrSettingsKey, bytes]]:
"""Parses an OTBR binary settings file."""
orig_data = data
settings = []

while data:
key_bytes = data[:2]
if not key_bytes:
break

assert len(key_bytes) == 2
if len(key_bytes) != 2:
raise ValueError("Key is not 2 bytes long")
key = int.from_bytes(key_bytes, "little")

length_bytes = data[2:4]
assert len(length_bytes) == 2
if len(length_bytes) != 2:
raise ValueError("Length is not 2 bytes long")

length = int.from_bytes(length_bytes, "little")
value = data[4 : 4 + length]
assert len(value) == length
if len(value) != length:
raise ValueError(
f"Value is truncated, expected {length} bytes, got {len(value)} bytes"
)

settings.append((OtbrSettingsKey(key), value))
data = data[4 + length :]

if serialize_otbr_settings(settings) != orig_data:
raise ValueError("Parsed settings do not match original data")

return settings


Expand All @@ -74,7 +96,86 @@ def serialize_otbr_settings(settings: list[tuple[OtbrSettingsKey, bytes]]) -> by

def is_valid_otbr_settings_file(settings: list[tuple[OtbrSettingsKey, bytes]]) -> bool:
"""Check if parsed settings represent a valid OTBR settings file."""
return {OtbrSettingsKey.ACTIVE_DATASET} <= {key for key, _ in settings}
active_dataset = get_active_dataset(settings)
return active_dataset is not None and len(active_dataset) >= MIN_ACTIVE_DATASET_SIZE
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess instead of going by length we could just test for some usually available tags 🤔

I'd go for these five, they are pretty much required:

  1. Network Key
  2. Network Name
  3. Extended PAN ID
  4. PAN ID
  5. Channel



def get_active_dataset(settings: list[tuple[OtbrSettingsKey, bytes]]) -> bytes | None:
"""Get the active dataset value from parsed settings."""
for key, value in settings:
if key == OtbrSettingsKey.ACTIVE_DATASET:
return value
return None


def filter_settings_for_migration(
settings: list[tuple[OtbrSettingsKey, bytes]],
) -> list[tuple[OtbrSettingsKey, bytes]]:
"""Keep only the keys that should be preserved during migration."""
return [(key, value) for key, value in settings if key in MIGRATED_KEYS]


def find_valid_settings_files(
data_dir: Path,
) -> list[tuple[float, Path, list[tuple[OtbrSettingsKey, bytes]]]]:
"""Scan data directory for valid OTBR settings files, sorted newest first."""
all_settings = []

for settings_path in data_dir.glob("*.data"):
if not SETTINGS_FILE_PATTERN.match(settings_path.name):
continue

mod_time = settings_path.stat().st_mtime

try:
otbr_settings = parse_otbr_settings(settings_path.read_bytes())
except ValueError:
_LOGGER.debug(
f"{settings_path} is not a valid TLV file, skipping", exc_info=True
)
continue

if not is_valid_otbr_settings_file(otbr_settings):
_LOGGER.info(
f"TLV file {settings_path} is not a valid OTBR settings file, skipping"
)
continue

all_settings.append((mod_time, settings_path, otbr_settings))

all_settings.sort(reverse=True)
return all_settings


def find_best_backup_settings(
settings_path: Path,
) -> list[tuple[OtbrSettingsKey, bytes]] | None:
"""Find the most recent backup with a valid active dataset, filtered for migration."""
backups = []

for backup_path in settings_path.parent.glob(settings_path.name + ".backup-*"):
try:
otbr_settings = parse_otbr_settings(backup_path.read_bytes())
except ValueError:
_LOGGER.debug(
f"{backup_path} is not a valid TLV file, skipping", exc_info=True
)
continue

active_dataset = get_active_dataset(otbr_settings)
if active_dataset is None or len(active_dataset) < MIN_ACTIVE_DATASET_SIZE:
continue

mod_time = backup_path.stat().st_mtime
backups.append((mod_time, backup_path, otbr_settings))

if not backups:
return None

_, backup_path, backup_settings = sorted(backups, reverse=True)[0]
_LOGGER.info(f"Found valid backup: {backup_path}")

return filter_settings_for_migration(backup_settings)


async def get_adapter_hardware_addr(
Expand Down Expand Up @@ -127,6 +228,71 @@ def backup_file(path: Path) -> Path:
return backup_path


def try_recover_corrupted_settings(
expected_settings_path: Path,
) -> list[tuple[OtbrSettingsKey, bytes]] | None:
"""If the expected settings file exists but has a corrupted active dataset,
attempt recovery from a backup file.

Returns recovered settings, or None if no recovery was needed or possible.
Archives the corrupted file as a side effect when recovery succeeds.
"""
if not expected_settings_path.exists():
return None

current_settings = parse_otbr_settings(expected_settings_path.read_bytes())

if is_valid_otbr_settings_file(current_settings):
return None

# Active dataset is missing or too small, likely corrupted by erroneous
# tmp file migration. Try to recover from a backup.
active_dataset = get_active_dataset(current_settings)
_LOGGER.info(
f"Active dataset in {expected_settings_path} is only"
f" {len(active_dataset) if active_dataset else 0} bytes,"
f" attempting recovery from backup"
)

recovered = find_best_backup_settings(expected_settings_path)

if recovered is None:
_LOGGER.info("No valid backup found, cannot recover")
return None

backup_file(expected_settings_path)
return recovered


def resolve_migration_settings(
expected_settings_path: Path,
all_settings: list[tuple[float, Path, list[tuple[OtbrSettingsKey, bytes]]]],
) -> list[tuple[OtbrSettingsKey, bytes]] | None:
"""Determine what settings should be written for the current adapter.

Returns the settings to write, or None if no migration is needed.
Archives stale settings files as a side effect.
"""
_, most_recent_path, _ = all_settings[0]

if most_recent_path == expected_settings_path:
_LOGGER.info(
f"Adapter settings file {expected_settings_path} is the most"
f" recently used, skipping"
)
return None

# Adapter either has a stale settings file or no settings file at all.
if expected_settings_path.exists():
_LOGGER.info(
f"Settings file for adapter already exists at"
f" {expected_settings_path} but appears to be old, archiving"
)
backup_file(expected_settings_path)

return filter_settings_for_migration(all_settings[0][2])


async def main() -> None:
parser = argparse.ArgumentParser(description="Migrate OTBR settings to new adapter")
parser.add_argument(
Expand All @@ -147,76 +313,38 @@ async def main() -> None:

args = parser.parse_args()

flow_control = args.flow_control

if flow_control == "none":
flow_control = None
flow_control = args.flow_control if args.flow_control != "none" else None

# First, read the hardware address of the new adapter
hwaddr = await get_adapter_hardware_addr(
port=args.adapter,
baudrate=args.baudrate,
flow_control=flow_control,
)

# Then, look at existing settings
all_settings = []

for settings_path in args.data_dir.glob("*.data"):
if not SETTINGS_FILE_PATTERN.match(settings_path.name):
continue

mod_time = settings_path.stat().st_mtime
otbr_settings = parse_otbr_settings(settings_path.read_bytes())
expected_settings_path = args.data_dir / hwaddr_to_filename(hwaddr)

# Ensure our parsing is valid
assert serialize_otbr_settings(otbr_settings) == settings_path.read_bytes()
# If the current adapter's settings file has a corrupted dataset (e.g. from
# an erroneous tmp file migration), try to restore it from a backup first.
recovered = try_recover_corrupted_settings(expected_settings_path)

if not is_valid_otbr_settings_file(otbr_settings):
print(
f"Settings file {settings_path} is not a valid OTBR settings file, skipping"
)
continue
if recovered is not None:
expected_settings_path.write_bytes(serialize_otbr_settings(recovered))
_LOGGER.info(f"Recovered settings written to {expected_settings_path}")
return

all_settings.append((mod_time, settings_path, otbr_settings))
all_settings = find_valid_settings_files(args.data_dir)

if not all_settings:
print("No existing settings files found, skipping")
_LOGGER.info("No existing settings files found, skipping")
return

most_recent_settings_info = sorted(all_settings, reverse=True)[0]
most_recent_settings_path = most_recent_settings_info[1]
most_recent_settings = most_recent_settings_info[2]
new_settings = resolve_migration_settings(expected_settings_path, all_settings)

expected_settings_path = args.data_dir / hwaddr_to_filename(hwaddr)

if expected_settings_path.exists():
if most_recent_settings_path == expected_settings_path:
print(
f"Adapter settings file {expected_settings_path} is the most recently used, skipping"
)
return

# If the settings file is old, we should "delete" it
print(
f"Settings file for adapter {hwaddr} already exists at {expected_settings_path} but appears to be old, archiving"
)
backup_file(expected_settings_path)

# Write back a new settings file that keeps only a few keys
new_settings = [
(key, value)
for key, value in most_recent_settings
if key
in (
OtbrSettingsKey.ACTIVE_DATASET,
OtbrSettingsKey.PENDING_DATASET,
OtbrSettingsKey.CHILD_INFO,
)
]
if new_settings is None:
return

expected_settings_path.write_bytes(serialize_otbr_settings(new_settings))
print(f"Wrote new settings file to {expected_settings_path}")
_LOGGER.info(f"Wrote new settings file to {expected_settings_path}")

await asyncio.sleep(AFTER_DISCONNECT_DELAY)

Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
63 changes: 63 additions & 0 deletions openthread_border_router/tests/test_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import sys
from pathlib import Path

sys.path.append(str(Path(__file__).parent.parent / "rootfs/usr/local/bin"))

import pytest
import shutil
from unittest.mock import patch
from migrate_otbr_settings import main, parse_otbr_settings, is_valid_otbr_settings_file


Comment on lines +3 to +11
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sys.path manipulation on line 4 modifies the global Python path for the entire test session. This can cause issues if multiple tests run in the same session and have conflicting module names. Consider using a more isolated approach such as importlib or pytest fixtures that handle the path manipulation within the test scope only.

Suggested change
sys.path.append(str(Path(__file__).parent.parent / "rootfs/usr/local/bin"))
import pytest
import shutil
from unittest.mock import patch
from migrate_otbr_settings import main, parse_otbr_settings, is_valid_otbr_settings_file
import importlib.util
import pytest
import shutil
from unittest.mock import patch
_MIGRATE_OTBR_SETTINGS_PATH = (
Path(__file__).parent.parent / "rootfs/usr/local/bin" / "migrate_otbr_settings.py"
)
_spec = importlib.util.spec_from_file_location(
"migrate_otbr_settings", _MIGRATE_OTBR_SETTINGS_PATH
)
if _spec is None or _spec.loader is None:
raise ImportError(f"Cannot load migrate_otbr_settings from {_MIGRATE_OTBR_SETTINGS_PATH}")
_migrate_otbr_settings = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_migrate_otbr_settings) # type: ignore[union-attr]
main = _migrate_otbr_settings.main
parse_otbr_settings = _migrate_otbr_settings.parse_otbr_settings
is_valid_otbr_settings_file = _migrate_otbr_settings.is_valid_otbr_settings_file

Copilot uses AI. Check for mistakes.
def copy_otbr_test_data(name: str, dst: Path) -> None:
"""Copy test data, preserving modification times and permissions."""
path = Path(__file__).parent / "data" / name
shutil.copytree(path, dst, dirs_exist_ok=True)


@pytest.mark.asyncio
async def test_migration_complex(tmp_path: Path, caplog) -> None:
copy_otbr_test_data("otbr_settings_complex_running", tmp_path)

with (
patch(
"migrate_otbr_settings.get_adapter_hardware_addr",
return_value="f074bffffeaad34f",
autospec=True,
),
patch(
"sys.argv",
[sys.argv[0], "--data-dir", str(tmp_path), "--adapter", "/dev/null"],
),
caplog.at_level("INFO", logger="migrate_otbr_settings"),
):
await main()

assert "0_f074bffffeaad34f.data is the most recently used, skipping" in caplog.text


@pytest.mark.asyncio
async def test_migration_broken(tmp_path: Path, caplog) -> None:
copy_otbr_test_data("otbr_settings_broken", tmp_path)

with (
patch(
"migrate_otbr_settings.get_adapter_hardware_addr",
return_value="f074bffffeaad34f",
autospec=True,
),
patch(
"sys.argv",
[sys.argv[0], "--data-dir", str(tmp_path), "--adapter", "/dev/null"],
),
caplog.at_level("DEBUG", logger="migrate_otbr_settings"),
):
await main()

assert "is only 8 bytes, attempting recovery from backup" in caplog.text
assert "Found valid backup:" in caplog.text
assert "Recovered settings written to" in caplog.text

# Verify the recovered file has a real active dataset
recovered = parse_otbr_settings((tmp_path / "0_f074bffffeaad34f.data").read_bytes())
assert is_valid_otbr_settings_file(recovered)
Loading