From f37b6cf1c5d24649913cedda77ab6ac0c686dd8d Mon Sep 17 00:00:00 2001 From: Claudiu Belu Date: Tue, 12 May 2026 23:11:48 +0000 Subject: [PATCH 1/4] Add LUKS disk encryption support for Debian / Ubuntu OS morphing The bulk of the work lives in the new `LinuxLUKSMixin` class (`osmount/luks_mixin.py`), which is then included in `BaseLinuxOSMountTools`: - `mount_os()`: check `osmorphing_info["encrypted_disks_passphrase"]` for each block device. confirmed LUKS containers are opened via `cryptsetup luksOpen` and the resulting `/dev/mapper/` path is used in place of the raw device. `dismount_os()` closes them again after all filesystems have been unmounted. - `remove_encryption_artifacts`: after OS morphing, stale TPM2 LUKS tokens and their keyslots are killed and the corresponding `tpm2-*` options are stripped from `/etc/crypttab`. The source TPM does not exist on the destination, so leaving these in place would cause the initramfs to hang or fail on first boot. - `install_encryption_firstboot_setup`: a temporary migration keyfile is injected into the guest, `/etc/crypttab` is updated to reference it, the initramfs is rebuilt so the migrated VM can boot, GRUB is patched to use the crypttab mapper names instead of the osmount-time names, and a systemd one-shot service is installed to re-enroll TPM2 and remove the migration keyfile on the first boot of the destination VM. The firstboot shell script itself lives in `coriolis/osmorphing/osmount/resources/luks_firstboot_initramfs_tools.sh` and targets `update-initramfs`-based systems (Debian / Ubuntu). --- coriolis/constants.py | 2 + coriolis/osmorphing/manager.py | 6 +- coriolis/osmorphing/osmount/base.py | 16 +- coriolis/osmorphing/osmount/factory.py | 5 +- coriolis/osmorphing/osmount/luks_mixin.py | 587 ++++++++++++++++++ .../luks_firstboot_initramfs_tools.sh | 144 +++++ .../tests/osmorphing/osmount/test_base.py | 8 +- coriolis/tests/osmorphing/test_manager.py | 7 +- 8 files changed, 765 insertions(+), 10 deletions(-) create mode 100644 coriolis/osmorphing/osmount/luks_mixin.py create mode 100644 coriolis/osmorphing/osmount/resources/luks_firstboot_initramfs_tools.sh diff --git a/coriolis/constants.py b/coriolis/constants.py index d6312c8a..546d8932 100644 --- a/coriolis/constants.py +++ b/coriolis/constants.py @@ -389,3 +389,5 @@ PHASE_OSMORPHING_PRE_OS_MOUNT, PHASE_OSMORPHING_POST_OS_MOUNT, ] + +ENCRYPTED_DISKS_PASS = "encrypted_disks_passphrase" diff --git a/coriolis/osmorphing/manager.py b/coriolis/osmorphing/manager.py index 6cdfd2c4..6495f805 100644 --- a/coriolis/osmorphing/manager.py +++ b/coriolis/osmorphing/manager.py @@ -137,7 +137,8 @@ def morph_image(origin_provider, destination_provider, connection_info, # instantiate and run OSMount tools: os_mount_tools = osmount_factory.get_os_mount_tools( os_type, connection_info, event_manager, ignore_devices, - CONF.default_osmorphing_operation_timeout) + CONF.default_osmorphing_operation_timeout, + osmorphing_info=osmorphing_info) proxy_settings = _get_proxy_settings() os_mount_tools.set_proxy(proxy_settings) @@ -307,3 +308,6 @@ def _morph_image(origin_provider, destination_provider, connection_info, LOG.info("Post packages install") import_os_morphing_tools.post_packages_install(packages_add) + + os_mount_tools.remove_encryption_artifacts(os_root_dir) + os_mount_tools.install_encryption_firstboot_setup(os_root_dir) diff --git a/coriolis/osmorphing/osmount/base.py b/coriolis/osmorphing/osmount/base.py index fec59bde..36a2ce9b 100644 --- a/coriolis/osmorphing/osmount/base.py +++ b/coriolis/osmorphing/osmount/base.py @@ -14,6 +14,7 @@ from six import with_metaclass from coriolis import exception +from coriolis.osmorphing.osmount import luks_mixin from coriolis import utils LOG = logging.getLogger(__name__) @@ -24,12 +25,13 @@ class BaseOSMountTools(object, with_metaclass(abc.ABCMeta)): def __init__(self, connection_info, event_manager, ignore_devices, - operation_timeout): + operation_timeout, osmorphing_info=None): self._event_manager = event_manager self._ignore_devices = ignore_devices self._environment = {} self._connection_info = connection_info self._osmount_operation_timeout = operation_timeout + self._osmorphing_info = osmorphing_info or {} self._connect() @abc.abstractmethod @@ -58,6 +60,12 @@ def dismount_os(self, root_dir): def set_proxy(self, proxy_settings): pass + def remove_encryption_artifacts(self, os_root_dir): + pass + + def install_encryption_firstboot_setup(self, os_root_dir): + pass + def get_environment(self): return self._environment @@ -122,7 +130,7 @@ def get_connection(self): return self._ssh -class BaseLinuxOSMountTools(BaseSSHOSMountTools): +class BaseLinuxOSMountTools(luks_mixin.LinuxLUKSMixin, BaseSSHOSMountTools): def _get_pvs(self): out = self._exec_cmd("sudo pvdisplay -c").splitlines() LOG.debug("Output of 'pvdisplay -c' command: %s", out) @@ -560,6 +568,8 @@ def mount_os(self): "sudo ls -1 %s*" % volume_dev).splitlines() LOG.debug("All simple devices to scan: %s", dev_paths) + self._unlock_luks_devices(dev_paths) + lvm_dev_paths = [] self._check_vgs() vgs = self._get_vgs() @@ -659,6 +669,8 @@ def dismount_os(self, root_dir): self._exec_cmd( 'mountpoint -q %s && sudo umount %s' % (root_dir, root_dir)) + self._close_luks_devices() + def set_proxy(self, proxy_settings): url = proxy_settings.get('url') if not url: diff --git a/coriolis/osmorphing/osmount/factory.py b/coriolis/osmorphing/osmount/factory.py index 890eb771..f03c48bc 100644 --- a/coriolis/osmorphing/osmount/factory.py +++ b/coriolis/osmorphing/osmount/factory.py @@ -16,7 +16,8 @@ def get_os_mount_tools(os_type, connection_info, event_manager, - ignore_devices, operation_timeout): + ignore_devices, operation_timeout, + osmorphing_info=None): os_mount_tools = {constants.OS_TYPE_LINUX: [ubuntu.UbuntuOSMountTools, redhat.RedHatOSMountTools, suse.SUSEOSMountTools], @@ -28,7 +29,7 @@ def get_os_mount_tools(os_type, connection_info, event_manager, for cls in os_mount_tools.get(os_type, itertools.chain(*os_mount_tools.values())): tools = cls(connection_info, event_manager, ignore_devices, - operation_timeout) + operation_timeout, osmorphing_info=osmorphing_info) LOG.debug("Testing OS mount tools: %s", cls.__name__) if tools.check_os(): return tools diff --git a/coriolis/osmorphing/osmount/luks_mixin.py b/coriolis/osmorphing/osmount/luks_mixin.py new file mode 100644 index 00000000..5f681b14 --- /dev/null +++ b/coriolis/osmorphing/osmount/luks_mixin.py @@ -0,0 +1,587 @@ +# Copyright 2026 Cloudbase Solutions Srl +# All Rights Reserved. + +import contextlib +import json +import os +import re + +from oslo_log import log as logging + +from coriolis import constants +from coriolis import exception +from coriolis import utils + +LOG = logging.getLogger(__name__) + +_LUKS_KEYFILE_DIR = "/etc/luks" + +_FIRSTBOOT_SCRIPT_PATH = "/usr/local/sbin/coriolis-luks-firstboot.sh" +_SYSTEMD_UNIT_PATH = "/etc/systemd/system/coriolis-luks-firstboot.service" + +_RESOURCES_DIR = os.path.join(os.path.dirname(__file__), "resources") + + +def _load_script(filename): + with open(os.path.join(_RESOURCES_DIR, filename)) as fh: + return fh.read() + + +_LUKS_FIRSTBOOT_SCRIPTS = { + "update-initramfs": _load_script("luks_firstboot_initramfs_tools.sh"), +} + +_SYSTEMD_UNIT = """\ +[Unit] +Description=Coriolis LUKS migration firstboot cleanup +After=local-fs.target +ConditionPathExists=/etc/luks + +[Service] +Type=oneshot +ExecStart=/usr/local/sbin/coriolis-luks-firstboot.sh +RemainAfterExit=yes +StandardOutput=journal+console +StandardError=journal+console + +[Install] +WantedBy=multi-user.target +""" + + +class LinuxLUKSMixin: + """Mixin providing LUKS-related methods for BaseLinuxOSMountTools. + + Expects the consuming class to provide ``self._ssh``, ``self._exec_cmd()``, + and ``self._osmorphing_info``. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._luks_opened = [] + + def _unlock_luks_devices(self, dev_paths): + """Open any LUKS-encrypted devices listed in dev_paths in-place. + + Reads osmorphing_info["encrypted_disks_passphrase"], a single + passphrase used for all LUKS-encrypted devices. Non-LUKS devices are + silently skipped. + + `dev_paths` entries are replaced in-place with the resulting + `/dev/mapper/` path for any device that is unlocked. + + :returns: list of (mapper_name, original_dev_path) pairs opened, + to be used for cleanup and further processing. + """ + passphrase = self._osmorphing_info.get(constants.ENCRYPTED_DISKS_PASS) + + try: + for i, dev_path in enumerate(dev_paths): + mapper_path = self._unlock_luks_device(dev_path, passphrase) + if mapper_path is None: + continue + + dev_paths[i] = mapper_path + self._luks_opened.append( + (os.path.basename(mapper_path), dev_path)) + except Exception: + self._close_luks_devices() + raise + + def _unlock_luks_device(self, dev_path, passphrase): + """Open a single LUKS-encrypted device. + + :raises: CoriolisException if the device/passphrase combination is + inconsistent (LUKS without passphrase, or passphrase for non-LUKS). + :returns: the device mapper name, or None if the device is not LUKS + and no passphrase was given. + """ + is_luks = self._is_luks(dev_path) + + if is_luks and passphrase is None: + raise exception.CoriolisException( + f"{dev_path} is LUKS-encrypted, but no passphrase is " + "provided.") + if not is_luks: + LOG.debug( + "Device '%s' is not a LUKS container; skipping.", dev_path) + return None + + mapper_name = "coriolis_%s" % os.path.basename(dev_path) + key_path = "/tmp/%s.key" % mapper_name + with self._auth_luks(passphrase, key_path): + self._exec_cmd( + "sudo cryptsetup luksOpen --disable-keyring " + "--key-file %s %s %s" % (key_path, dev_path, mapper_name)) + + mapper_path = "/dev/mapper/%s" % mapper_name + LOG.info("Unlocked LUKS device '%s' as '%s'", dev_path, mapper_path) + + return mapper_path + + def _is_luks(self, dev_path): + try: + self._exec_cmd("sudo cryptsetup isLuks %s" % dev_path) + return True + except exception.SSHCommandNotFoundException: + LOG.warn("cryptsetup missing from OS morpher; cannot check if " + "device is LUKS-encrypted.") + except Exception: + # if it's not LUKS, we'll get exit code 1. + # The exception is already logged in self._exec_cmd. + pass + + return False + + def _close_luks_devices(self): + """Close any LUKS mapper devices opened by _unlock_luks_devices.""" + for mapper_name, _ in self._luks_opened: + self._exec_cmd( + "sudo cryptsetup luksClose %s || true" % mapper_name) + + self._luks_opened = [] + + def _write_remote_file(self, dest_path, content, mode=None): + """Write content to dest_path on the remote host via a temp file.""" + tmp = self._exec_cmd("mktemp").strip() + utils.write_ssh_file(self._ssh, tmp, content.encode("utf-8")) + if mode is not None: + self._exec_cmd( + "sudo mv %s %s && sudo chmod %s %s" % ( + tmp, dest_path, mode, dest_path)) + else: + self._exec_cmd("sudo mv %s %s" % (tmp, dest_path)) + + @contextlib.contextmanager + def _auth_luks(self, passphrase, key_path): + """Write passphrase to key_path on the remote host; yield; delete.""" + self._write_remote_file(key_path, passphrase) + try: + yield key_path + finally: + self._exec_cmd("sudo rm -f %s" % key_path) + + def _get_tpm2_token_info(self, dev_path): + """Return list of (token_id, keyslot_id) pairs for systemd-tpm2 tokens. + + Returns an empty list for LUKS1 devices, which do not support tokens. + """ + try: + raw = self._exec_cmd( + "sudo cryptsetup luksDump --dump-json-metadata %s" % dev_path) + header = json.loads(raw) + except Exception: + LOG.warning( + "Could not dump LUKS header for '%s': %s", + dev_path, utils.get_exception_details()) + return [] + + results = [] + for token_id, token in header.get("tokens", {}).items(): + if token.get("type") != "systemd-tpm2": + continue + + for keyslot_id in token.get("keyslots", []): + results.append((token_id, keyslot_id)) + + return results + + def _remove_tpm2_tokens(self, dev_path, passphrase): + """Remove systemd-tpm2 tokens and kill their keyslots from dev_path. + + Token removal modifies only the LUKS2 header metadata. Keyslot + removal is authenticated via the migration passphrase. + """ + token_info = self._get_tpm2_token_info(dev_path) + if not token_info: + return + + for token_id, keyslot_id in token_info: + try: + self._exec_cmd( + "sudo cryptsetup token remove --token-id %s %s" % ( + token_id, dev_path)) + LOG.info( + "Removed systemd-tpm2 token %s from '%s'", + token_id, dev_path) + except Exception: + LOG.warning( + "Failed to remove TPM2 token %s from '%s': %s", + token_id, dev_path, utils.get_exception_details()) + continue + + key_path = "/tmp/coriolis_%s.key" % os.path.basename(dev_path) + try: + with self._auth_luks(passphrase, key_path): + self._exec_cmd( + "sudo cryptsetup luksKillSlot --key-file %s %s %s" % ( + key_path, dev_path, keyslot_id)) + LOG.info( + "Killed TPM2 keyslot %s from '%s'", keyslot_id, dev_path) + except Exception: + LOG.warning( + "Failed to kill TPM2 keyslot %s from '%s': %s", + keyslot_id, dev_path, utils.get_exception_details()) + + def _transform_crypttab(self, os_root_dir, transform): + """Apply transform to each non-comment entry in /etc/crypttab. + + transform(parts) receives the split fields [name, device, keyfile, + options] and returns a modified parts list to replace the line, or + None to leave it unchanged. + + Returns True if the file was written back, False if nothing changed or + the file does not exist. + """ + crypttab_path = os.path.join(os_root_dir, "etc/crypttab") + if not utils.test_ssh_path(self._ssh, crypttab_path): + return False + + content = utils.read_ssh_file(self._ssh, crypttab_path).decode("utf-8") + new_lines = [] + changed = False + + for line in content.splitlines(keepends=True): + stripped = line.strip() + if not stripped or stripped.startswith("#"): + new_lines.append(line) + continue + + parts = stripped.split(None, 3) + new_parts = transform(parts) + if new_parts is None: + new_lines.append(line) + else: + new_lines.append("\t".join(new_parts) + "\n") + changed = True + + if not changed: + return False + + self._write_remote_file(crypttab_path, "".join(new_lines)) + return True + + def _remove_tpm2_crypttab_options(self, os_root_dir): + """Strip tpm2-* options from /etc/crypttab in the mounted OS. + + Prevents the initramfs from attempting TPM2 unsealing on the target, + which would fail because the source TPM is not present there. + """ + def _strip_tpm2(parts): + if len(parts) < 4: + return None + + opts = [ + o for o in parts[3].split(",") if not o.startswith("tpm2-") + ] + new_opts = ",".join(opts) + if new_opts == parts[3]: + return None + + return [parts[0], parts[1], parts[2], new_opts] + + if self._transform_crypttab(os_root_dir, _strip_tpm2): + LOG.info( + "Removed TPM2 options from /etc/crypttab in '%s'", os_root_dir) + + def remove_encryption_artifacts(self, os_root_dir): + """Remove stale TPM2 tokens, kill their keyslots, and strip tpm2-* + options from /etc/crypttab. + + Called after OS morphing, before closing LUKS devices. The source + TPM does not exist on the destination, leaving its token and crypttab + options in place would cause the initramfs to hang or fail on first + boot when it tries and fails to unseal the key. + """ + if not self._luks_opened: + return + + self._event_manager.progress_update( + "Removing stale TPM2 LUKS artifacts") + + passphrase = self._osmorphing_info.get(constants.ENCRYPTED_DISKS_PASS) + for _, dev_path in self._luks_opened: + self._remove_tpm2_tokens(dev_path, passphrase) + + self._remove_tpm2_crypttab_options(os_root_dir) + + def _get_migration_keyfile_path(self, dev_path): + return "%s/coriolis_%s.key" % ( + _LUKS_KEYFILE_DIR, os.path.basename(dev_path)) + + def _get_luks_uuid(self, dev_path): + return self._exec_cmd( + "sudo cryptsetup luksUUID %s" % dev_path).strip() + + def _update_crypttab_keyfile(self, os_root_dir, uuid_to_keyfile): + """Update the keyfile column in crypttab for matching LUKS UUIDs.""" + def _set_keyfile(parts): + if len(parts) < 2: + return None + + m = (re.match(r"UUID=([0-9a-f-]+)", parts[1], re.IGNORECASE) or + re.match(r".*/by-uuid/([0-9a-f-]+)", parts[1], re.IGNORECASE)) + if not m: + return None + + keyfile = uuid_to_keyfile.get(m.group(1)) + if keyfile is None: + return None + + while len(parts) < 4: + parts.append("") + + parts[2] = keyfile + # cryptsetup-initramfs (Ubuntu 22.04+) only embeds crypttab + # entries in the initramfs when the device is verifiable at build + # time OR when the 'initramfs' option is present. Inside a chroot + # (no udev, no /dev/disk/by-uuid/), verification always fails, so + # we force-add 'initramfs' here. + opts_list = [o for o in parts[3].split(",") if o] + if "initramfs" not in opts_list: + opts_list.append("initramfs") + + parts[3] = ",".join(opts_list) + + return parts + + if not self._transform_crypttab(os_root_dir, _set_keyfile): + raise exception.CoriolisException( + "No /etc/crypttab entries matched LUKS UUIDs in '%s'; " + "cannot configure initramfs auto-unlock." % os_root_dir) + + LOG.info("Updated crypttab keyfile entries in '%s'", os_root_dir) + + def _write_migration_keyfiles(self, os_root_dir): + """Write migration keyfiles into the OS and update crypttab.""" + passphrase = self._osmorphing_info.get(constants.ENCRYPTED_DISKS_PASS) + if not passphrase or not self._luks_opened: + return + + keyfile_dir = os.path.join(os_root_dir, _LUKS_KEYFILE_DIR.lstrip("/")) + self._exec_cmd( + "sudo mkdir -p %s && sudo chmod 700 %s" % ( + keyfile_dir, keyfile_dir)) + + uuid_to_keyfile = {} + for _, dev_path in self._luks_opened: + luks_uuid = self._get_luks_uuid(dev_path) + + keyfile_path = self._get_migration_keyfile_path(dev_path) + abs_path = os.path.join(os_root_dir, keyfile_path.lstrip("/")) + + self._write_remote_file(abs_path, passphrase, mode="400") + uuid_to_keyfile[luks_uuid] = keyfile_path + LOG.info( + "Written migration keyfile for LUKS device '%s' (UUID %s)", + dev_path, luks_uuid) + + self._update_crypttab_keyfile(os_root_dir, uuid_to_keyfile) + + initramfs_tool = self._detect_initramfs_tool(os_root_dir) + if initramfs_tool == "update-initramfs": + self._configure_initramfs_tools_keyfiles(os_root_dir) + else: + raise exception.CoriolisException( + "No initramfs tool found in OS at '%s'; cannot configure " + "keyfile-based LUKS auto-unlock." % os_root_dir) + + def _configure_initramfs_tools_keyfiles(self, os_root_dir): + """Set KEYFILE_PATTERN in cryptsetup-initramfs conf-hook. + + cryptsetup-initramfs (Debian / Ubuntu) only embeds keyfiles whose paths + match KEYFILE_PATTERN. Set it so migration keyfiles are included in the + rebuilt initramfs. + """ + hook_dir = os.path.join(os_root_dir, "etc/cryptsetup-initramfs") + self._exec_cmd("sudo mkdir -p %s" % hook_dir) + hook_abs = os.path.join(hook_dir, "conf-hook") + pattern = "%s/coriolis_*.key" % _LUKS_KEYFILE_DIR + + existing = "" + if utils.test_ssh_path(self._ssh, hook_abs): + existing = utils.read_ssh_file(self._ssh, hook_abs).decode("utf-8") + + # Always append: the default conf-hook has #KEYFILE_PATTERN= + # (commented out). Appending sets the active value. The last assignment + # wins, overriding any earlier line. + new_content = existing + '\nKEYFILE_PATTERN="%s"\n' % pattern + self._write_remote_file(hook_abs, new_content) + self._exec_cmd( + "sudo chown root:root %s && sudo chmod 644 %s" % ( + hook_abs, hook_abs)) + LOG.info( + "Set KEYFILE_PATTERN in cryptsetup-initramfs conf-hook at '%s'", + hook_abs) + + def _detect_initramfs_tool(self, os_root_dir): + initramfs_bins = ["usr/sbin/update-initramfs", "sbin/update-initramfs"] + for initramfs_bin in initramfs_bins: + path = os.path.join(os_root_dir, initramfs_bin) + if utils.test_ssh_path(self._ssh, path): + return "update-initramfs" + + return None + + def _rebuild_initramfs(self, os_root_dir): + """Rebuild the initramfs inside the mounted OS chroot. + + /dev, /proc, /sys, /run are already bind-mounted by mount_os(). + """ + tool = self._detect_initramfs_tool(os_root_dir) + if tool == "update-initramfs": + self._exec_cmd( + "sudo chroot %s update-initramfs -u -k all" % os_root_dir) + else: + raise exception.CoriolisException( + "No initramfs tool found in OS at '%s'; cannot rebuild " + "initramfs for LUKS auto-unlock." % os_root_dir) + + def _detect_init_system(self, os_root_dir): + path = os.path.join(os_root_dir, "lib/systemd/systemd") + if utils.test_ssh_path(self._ssh, path): + return "systemd" + + path = os.path.join(os_root_dir, "sbin/openrc") + if utils.test_ssh_path(self._ssh, path): + return "openrc" + + path = os.path.join(os_root_dir, "sbin/initctl") + if utils.test_ssh_path(self._ssh, path): + return "upstart" + + return "sysvinit" + + def _register_firstboot_script_systemd(self, os_root_dir): + unit_abs = os.path.join(os_root_dir, _SYSTEMD_UNIT_PATH.lstrip("/")) + self._write_remote_file(unit_abs, _SYSTEMD_UNIT) + self._exec_cmd( + "sudo chown root:root %s && sudo chmod 644 %s" % ( + unit_abs, unit_abs)) + + wants_dir = os.path.join( + os_root_dir, "etc/systemd/system/multi-user.target.wants") + self._exec_cmd("sudo mkdir -p %s" % wants_dir) + self._exec_cmd( + "sudo ln -sf %s %s/coriolis-luks-firstboot.service" % ( + _SYSTEMD_UNIT_PATH, wants_dir)) + + def _install_luks_firstboot_script(self, os_root_dir): + """Write firstboot cleanup script and register with the init system.""" + initramfs_tool = self._detect_initramfs_tool(os_root_dir) + script_content = _LUKS_FIRSTBOOT_SCRIPTS.get(initramfs_tool) + if script_content is None: + raise exception.CoriolisException( + "No initramfs tool found in OS at '%s'; cannot install " + "LUKS firstboot cleanup script." % os_root_dir) + + script_abs = os.path.join( + os_root_dir, _FIRSTBOOT_SCRIPT_PATH.lstrip("/")) + self._exec_cmd("sudo mkdir -p %s" % os.path.dirname(script_abs)) + self._write_remote_file(script_abs, script_content) + self._exec_cmd( + "sudo chown root:root %s && sudo chmod 500 %s" % ( + script_abs, script_abs)) + + init_system = self._detect_init_system(os_root_dir) + LOG.info( + "Detected init system '%s'; installing LUKS firstboot script", + init_system) + + if init_system == "systemd": + self._register_firstboot_script_systemd(os_root_dir) + else: + raise exception.CoriolisException( + "For VMs with LUKS-encrypted devices, only systemd-based VMs " + "are supported.") + + def install_encryption_firstboot_setup(self, os_root_dir): + """Install a firstboot script to re-enroll TPM2.""" + if not self._luks_opened: + return + + self._event_manager.progress_update( + "Injecting migration keyfile and installing firstboot LUKS " + "cleanup") + + self._write_migration_keyfiles(os_root_dir) + self._fix_grub_luks_root(os_root_dir) + self._rebuild_initramfs(os_root_dir) + self._install_luks_firstboot_script(os_root_dir) + + def _fix_grub_luks_root(self, os_root_dir): + """Patch grub.cfg to use crypttab mapper names for LUKS root devices. + + update-grub names the root device after the mapper opened by osmount + (e.g.: ``/dev/mapper/coriolis_foo``). The initramfs opens LUKS device + using the name from crypttab (e.g.: ``/dev/mapper/luks-root``). They + must match, so replace the osmount names in every grub.cfg we find. + """ + if not self._luks_opened: + return + + crypttab_path = os.path.join(os_root_dir, "etc/crypttab") + if not utils.test_ssh_path(self._ssh, crypttab_path): + return + + crypttab = utils.read_ssh_file( + self._ssh, crypttab_path).decode("utf-8") + + # Build UUID -> crypttab-mapper-name mapping. + uuid_to_crypttab_name = {} + for line in crypttab.splitlines(): + stripped = line.strip() + if not stripped or stripped.startswith('#'): + continue + + parts = stripped.split(None, 3) + if len(parts) < 2: + continue + + mapper_name = parts[0] + m = (re.match(r'UUID=([0-9a-f-]+)', parts[1], re.IGNORECASE) or + re.match(r'.*/by-uuid/([0-9a-f-]+)', parts[1], re.IGNORECASE)) + if m: + uuid_to_crypttab_name[m.group(1).lower()] = mapper_name + + if not uuid_to_crypttab_name: + return + + # Map osmount mapper name -> crypttab mapper name for every opened + # LUKS device that appears in crypttab. + replacements = {} + for _, dev_path in self._luks_opened: + luks_uuid = self._get_luks_uuid(dev_path) + if not luks_uuid: + continue + + crypttab_name = uuid_to_crypttab_name.get(luks_uuid.lower()) + if not crypttab_name: + continue + + osmount_name = "coriolis_%s" % os.path.basename(dev_path) + if osmount_name != crypttab_name: + replacements["/dev/mapper/%s" % osmount_name] = ( + "/dev/mapper/%s" % crypttab_name + ) + + if not replacements: + return + + for rel_cfg in ["boot/grub/grub.cfg", "boot/grub2/grub.cfg"]: + cfg_path = os.path.join(os_root_dir, rel_cfg) + if not utils.test_ssh_path(self._ssh, cfg_path): + continue + + content = self._exec_cmd("sudo cat %s" % cfg_path) + modified = False + for old, new in replacements.items(): + if old not in content: + continue + + content = content.replace(old, new) + modified = True + LOG.info("grub.cfg: replaced '%s' -> '%s'", old, new) + + if modified: + self._write_remote_file(cfg_path, content) diff --git a/coriolis/osmorphing/osmount/resources/luks_firstboot_initramfs_tools.sh b/coriolis/osmorphing/osmount/resources/luks_firstboot_initramfs_tools.sh new file mode 100644 index 00000000..ae42cdfa --- /dev/null +++ b/coriolis/osmorphing/osmount/resources/luks_firstboot_initramfs_tools.sh @@ -0,0 +1,144 @@ +#!/bin/bash + +# Coriolis LUKS firstboot cleanup for initramfs-tools (Debian / Ubuntu). +# Runs once on first boot to re-enroll TPM2, remove migration keyslots, and +# rebuild the initramfs so the embedded keyfile is gone. + +set -e +set -x +KEYFILE_DIR=/etc/luks + +# helpers + +load_keyfile_map() { + [ -f /etc/crypttab ] || return 0 + + # Format: + while read -r _name dev_spec keyfile _; do + # handle only coriolis migration keys. + [[ "$keyfile" == "$KEYFILE_DIR/coriolis_"*.key ]] || continue + + local dev + if [[ "$dev_spec" == UUID=* ]]; then + dev=$(blkid -l -t "$dev_spec" -o device 2>/dev/null) + else + dev="$dev_spec" + fi + + # Add dev: keyfile mapping, if we have a dev. + [ -n "$dev" ] && dev_to_keyfile["$dev"]="$keyfile" + done < <(grep -Ev '^\s*(#|$)' /etc/crypttab) +} + +wait_for_tpm2() { + for dev in /dev/tpmrm0 /dev/tpm0; do + local deadline=$(( $(date +%s) + 10 )) + until [ -e "$dev" ] || [ "$(date +%s)" -ge "$deadline" ]; do + sleep 1 + done + + if [ -e "$dev" ]; then + echo "$dev" + return + fi + done +} + +enroll_clevis() { + local tpm2_dev + + tpm2_dev=$(wait_for_tpm2) + if [ -z "$tpm2_dev" ]; then + echo "ERROR: no TPM2 device found; aborting to avoid lockout." >&2 + return 1 + fi + + echo "TPM2 device detected ($tpm2_dev); enrolling via clevis." + + for dev in "${!dev_to_keyfile[@]}"; do + local keyfile="${dev_to_keyfile[$dev]}" + + if ! clevis luks bind -k "$keyfile" -d "$dev" tpm2 '{"pcr_ids":""}'; then + echo "ERROR: clevis luks bind failed for $dev; aborting to avoid lockout." >&2 + return 1 + fi + + if ! cryptsetup luksDump "$dev" 2>/dev/null | grep -q 'clevis'; then + echo "ERROR: clevis token not found in LUKS header for $dev; aborting to avoid lockout." >&2 + return 1 + fi + + echo "clevis TPM2 enrollment verified for $dev." + done +} + +remove_migration_keyslots() { + for dev in "${!dev_to_keyfile[@]}"; do + local keyfile="${dev_to_keyfile[$dev]}" + + echo "Removing migration keyslot from $dev using $keyfile." + if ! cryptsetup luksRemoveKey "$dev" "$keyfile"; then + echo "ERROR: failed to remove migration keyslot from $dev." >&2 + return 1 + fi + + echo "Migration keyslot removed from $dev." + + sed -i "s|$keyfile|none|g" /etc/crypttab + done +} + +deregister_service() { + # Only disable, do NOT delete the unit file, or daemon-reload while the + # service is still running. systemd would detect "Current command vanished" + # and kill this process immediately. + systemctl disable coriolis-luks-firstboot.service 2>/dev/null || true +} + +# main + +shopt -s nullglob +keyfiles=("$KEYFILE_DIR"/coriolis_*.key) +shopt -u nullglob + +if [ "${#keyfiles[@]}" -eq 0 ]; then + echo "ERROR: no migration keyfiles found in $KEYFILE_DIR; setup is broken." >&2 + exit 1 +fi + +echo "Found ${#keyfiles[@]} migration keyfile(s): ${keyfiles[*]}" + +declare -A dev_to_keyfile +load_keyfile_map + +if [ "${#dev_to_keyfile[@]}" -eq 0 ]; then + echo "ERROR: no coriolis migration entries found in /etc/crypttab; setup is broken." >&2 + exit 1 +fi + +echo "Found ${#dev_to_keyfile[@]} crypttab entry / entries to process." + +for _cmd in clevis clevis-encrypt-tpm2 clevis-luks-bind; do + if ! command -v "$_cmd" >/dev/null 2>&1; then + echo "ERROR: $_cmd not found; TPM2 enrollment requires clevis, clevis-tpm2, and clevis-luks packages." >&2 + exit 1 + fi +done + +enroll_clevis + +remove_migration_keyslots + +echo "Deleting migration keyfiles." +rm -f "${keyfiles[@]}" +rmdir "$KEYFILE_DIR" 2>/dev/null || true + +echo "Rebuilding initramfs." +# Suppress needrestart: it reboots the VM after initramfs rebuild, which breaks +# the firstboot SSH session before the service finishes. +NEEDRESTART_SUSPEND=1 DEBIAN_FRONTEND=noninteractive update-initramfs -u -k all + +deregister_service + +echo "Firstboot LUKS cleanup complete." +rm -f "$0" diff --git a/coriolis/tests/osmorphing/osmount/test_base.py b/coriolis/tests/osmorphing/osmount/test_base.py index 3c0afac9..e5094b9e 100644 --- a/coriolis/tests/osmorphing/osmount/test_base.py +++ b/coriolis/tests/osmorphing/osmount/test_base.py @@ -846,7 +846,9 @@ def test__find_and_mount_root_exec_cmd_exception( @mock.patch.object( base.BaseLinuxOSMountTools, '_check_mount_fstab_partitions' ) - def test_mount_os(self, mock_check_mount_fstab_partitions, + @mock.patch.object(base.luks_mixin.LinuxLUKSMixin, '_unlock_luks_devices') + def test_mount_os(self, mock_unlock_luks_devices, + mock_check_mount_fstab_partitions, mock_get_volume_block_devices, mock_find_dev_with_contents, mock_find_and_mount_root, mock_get_mounted_devices, mock_get_vgs, mock_exec_cmd, @@ -923,7 +925,9 @@ def test_mount_os(self, mock_check_mount_fstab_partitions, @mock.patch.object( base.BaseLinuxOSMountTools, '_check_mount_fstab_partitions' ) - def test_mount_os_run_xfs(self, mock_check_mount_fstab_partitions, + @mock.patch.object(base.luks_mixin.LinuxLUKSMixin, '_unlock_luks_devices') + def test_mount_os_run_xfs(self, mock_unlock_luks_devices, + mock_check_mount_fstab_partitions, mock_get_volume_block_devices, mock_find_dev_with_contents, mock_find_and_mount_root, diff --git a/coriolis/tests/osmorphing/test_manager.py b/coriolis/tests/osmorphing/test_manager.py index 6262a9e5..cdf87452 100644 --- a/coriolis/tests/osmorphing/test_manager.py +++ b/coriolis/tests/osmorphing/test_manager.py @@ -241,8 +241,8 @@ def test_morph_image( self.os_mount_tools.dismount_os.assert_called_once() mock_get_os_mount_tools.assert_called_once_with( - 'linux', mock.sentinel.connection_info, self.event_manager, [], 60) - mock_EventManager.assert_called_with(self.event_handler) + 'linux', mock.sentinel.connection_info, self.event_manager, [], 60, + osmorphing_info=self.osmorphing_info) self.os_mount_tools.dismount_os.assert_called_once() @@ -266,7 +266,8 @@ def test_morph_image_failed_os_mount_setup( self.event_handler) mock_get_os_mount_tools.assert_called_once_with( - 'linux', mock.sentinel.connection_info, self.event_manager, [], 60) + 'linux', mock.sentinel.connection_info, self.event_manager, [], 60, + osmorphing_info=self.osmorphing_info) mock_EventManager.assert_called_once_with(self.event_handler) mock_get_osmorphing_tools_class.assert_not_called() From baee507ca35462bdb51233234fb61407b6ad9907 Mon Sep 17 00:00:00 2001 From: Claudiu Belu Date: Thu, 21 May 2026 19:58:10 +0000 Subject: [PATCH 2/4] Add LUKS disk encryption support for RHEL / dracut OS morphing Extends `LinuxLuksMixin` with dracut support for RHEL / Fedora / SUSE guests: - `_configure_dracut_keyfiles`: writes a `dracut.conf.d/99-coriolis-luks.conf` snippet that adds the migration keyfiles to `install_items`, ensuring dracut embeds them in the initramfs. It also probes for the `libcryptsetup-token-systemd-tpm2.so` plugin (checked against a list of known paths) and adds it explicitly, because cryptsetup loads TPM2 token plugins via `dlopen` and dracut's `ldd` analysis would otherwise miss it along with its `libtss2` dependencies. - `_build_dracut_include_args`: returns `--include` args that force-embed `/etc/crypttab` and all `coriolis_*.key` keyfiles into the initramfs image. Without an explicit crypttab embed, dracut names the mapper `luks-` rather than the crypttab name and cannot find the keyfile at boot. - `luks_firstboot_dracut.sh`: the firstboot shell script for dracut-based systems. Runs once on first boot to re-enroll TPM2, remove the migration keyslots, and rebuild the initramfs so the embedded keyfile no longer ships in future initramfs images. --- coriolis/osmorphing/osmount/luks_mixin.py | 90 +++++++++- .../resources/luks_firstboot_dracut.sh | 164 ++++++++++++++++++ 2 files changed, 253 insertions(+), 1 deletion(-) create mode 100644 coriolis/osmorphing/osmount/resources/luks_firstboot_dracut.sh diff --git a/coriolis/osmorphing/osmount/luks_mixin.py b/coriolis/osmorphing/osmount/luks_mixin.py index 5f681b14..84c52bce 100644 --- a/coriolis/osmorphing/osmount/luks_mixin.py +++ b/coriolis/osmorphing/osmount/luks_mixin.py @@ -15,6 +15,17 @@ LOG = logging.getLogger(__name__) _LUKS_KEYFILE_DIR = "/etc/luks" +_DRACUT_LUKS_CONF_PATH = "/etc/dracut.conf.d/99-coriolis-luks.conf" + +# cryptsetup loads TPM2 token plugins via dlopen, so dracut's ldd analysis +# misses them. List candidate paths in order of preference; the first one +# found in the guest OS will be added to install_items so dracut includes +# both the plugin and its libtss2 dependencies. +_CRYPTSETUP_TPM2_PLUGIN_PATHS = [ + "/usr/lib64/cryptsetup/libcryptsetup-token-systemd-tpm2.so", + "/usr/lib/cryptsetup/libcryptsetup-token-systemd-tpm2.so", + "/usr/lib/x86_64-linux-gnu/cryptsetup/libcryptsetup-token-systemd-tpm2.so", +] _FIRSTBOOT_SCRIPT_PATH = "/usr/local/sbin/coriolis-luks-firstboot.sh" _SYSTEMD_UNIT_PATH = "/etc/systemd/system/coriolis-luks-firstboot.service" @@ -29,6 +40,7 @@ def _load_script(filename): _LUKS_FIRSTBOOT_SCRIPTS = { "update-initramfs": _load_script("luks_firstboot_initramfs_tools.sh"), + "dracut": _load_script("luks_firstboot_dracut.sh"), } _SYSTEMD_UNIT = """\ @@ -379,13 +391,43 @@ def _write_migration_keyfiles(self, os_root_dir): self._update_crypttab_keyfile(os_root_dir, uuid_to_keyfile) initramfs_tool = self._detect_initramfs_tool(os_root_dir) - if initramfs_tool == "update-initramfs": + if initramfs_tool == "dracut": + self._configure_dracut_keyfiles(os_root_dir, uuid_to_keyfile) + elif initramfs_tool == "update-initramfs": self._configure_initramfs_tools_keyfiles(os_root_dir) else: raise exception.CoriolisException( "No initramfs tool found in OS at '%s'; cannot configure " "keyfile-based LUKS auto-unlock." % os_root_dir) + def _configure_dracut_keyfiles(self, os_root_dir, uuid_to_keyfile): + """Write a dracut.conf.d snippet to embed keyfiles in the initramfs.""" + install_items = list(uuid_to_keyfile.values()) + + # cryptsetup loads TPM2 token plugins via dlopen; add it + # explicitly so dracut includes it (and its libtss2 deps via + # ldd analysis of the .so) in the initramfs. + for plugin_path in _CRYPTSETUP_TPM2_PLUGIN_PATHS: + if utils.test_ssh_path( + self._ssh, + os.path.join(os_root_dir, plugin_path.lstrip("/"))): + install_items.append(plugin_path) + LOG.debug( + "Including cryptsetup TPM2 token plugin in " + "dracut install_items: %s", plugin_path) + break + + conf_abs = os.path.join( + os_root_dir, _DRACUT_LUKS_CONF_PATH.lstrip("/")) + conf_content = 'install_items+=" %s "\n' % " ".join(install_items) + self._write_remote_file(conf_abs, conf_content) + self._exec_cmd( + "sudo chown root:root %s && sudo chmod 644 %s" % ( + conf_abs, conf_abs)) + LOG.info( + "Written dracut LUKS keyfile config at '%s'", + _DRACUT_LUKS_CONF_PATH) + def _configure_initramfs_tools_keyfiles(self, os_root_dir): """Set KEYFILE_PATTERN in cryptsetup-initramfs conf-hook. @@ -421,8 +463,41 @@ def _detect_initramfs_tool(self, os_root_dir): if utils.test_ssh_path(self._ssh, path): return "update-initramfs" + for dracut_bin in ["usr/bin/dracut", "usr/sbin/dracut", "sbin/dracut"]: + path = os.path.join(os_root_dir, dracut_bin) + if utils.test_ssh_path(self._ssh, path): + return "dracut" + return None + def _build_dracut_include_args(self, os_root_dir): + """Return --include args that force-embed crypttab and LUKS keyfiles. + + dracut's install_items embeds the keyfile but does not guarantee that + /etc/crypttab lands in the initramfs image. Without crypttab, + systemd-cryptsetup-generator names the mapper luks- instead of + the crypttab mapper name and cannot find the keyfile. + """ + args = [] + crypttab = os.path.join(os_root_dir, "etc/crypttab") + if utils.test_ssh_path(self._ssh, crypttab): + args += ["--include", "/etc/crypttab", "/etc/crypttab"] + + luks_dir = os.path.join(os_root_dir, _LUKS_KEYFILE_DIR.lstrip("/")) + try: + keyfiles = self._exec_cmd( + "sudo find %s -name 'coriolis_*.key' -type f 2>/dev/null" + % luks_dir).strip().splitlines() + except Exception: + keyfiles = [] + + for kf in keyfiles: + # strip os_root_dir prefix. + rel = kf[len(os_root_dir):] + args += ["--include", rel, rel] + + return args + def _rebuild_initramfs(self, os_root_dir): """Rebuild the initramfs inside the mounted OS chroot. @@ -432,6 +507,19 @@ def _rebuild_initramfs(self, os_root_dir): if tool == "update-initramfs": self._exec_cmd( "sudo chroot %s update-initramfs -u -k all" % os_root_dir) + elif tool == "dracut": + # --regenerate-all scans the chroot's own /lib/modules/ for + # installed kernels instead of relying on uname -r + # + # Explicitly --include the crypttab and any LUKS migration keyfiles + # so that systemd-cryptsetup-generator finds them in the initramfs + # and uses the crypttab mapper name (luks-root) and keyfile for + # auto-unlock. install_items in dracut.conf.d embeds the keyfile + # but does NOT guarantee that crypttab ends up in the image. + include_args = self._build_dracut_include_args(os_root_dir) + self._exec_cmd( + "sudo chroot %s dracut --regenerate-all --force %s" + % (os_root_dir, " ".join(include_args))) else: raise exception.CoriolisException( "No initramfs tool found in OS at '%s'; cannot rebuild " diff --git a/coriolis/osmorphing/osmount/resources/luks_firstboot_dracut.sh b/coriolis/osmorphing/osmount/resources/luks_firstboot_dracut.sh new file mode 100644 index 00000000..b6cb0e89 --- /dev/null +++ b/coriolis/osmorphing/osmount/resources/luks_firstboot_dracut.sh @@ -0,0 +1,164 @@ +#!/bin/bash + +# Coriolis LUKS firstboot cleanup for dracut (RHEL / Fedora / SUSE). +# Runs once on first boot to re-enroll TPM2, remove migration keyslots, and +# rebuild the initramfs so the embedded keyfile is gone. + +set -e +set -x +KEYFILE_DIR=/etc/luks +DRACUT_CONF=/etc/dracut.conf.d/99-coriolis-luks.conf + +# helpers + +load_keyfile_map() { + [ -f /etc/crypttab ] || return 0 + + # Format: + while read -r _name dev_spec keyfile _; do + [[ "$keyfile" == "$KEYFILE_DIR/coriolis_"*.key ]] || continue + + local dev + if [[ "$dev_spec" == UUID=* ]]; then + dev=$(blkid -l -t "$dev_spec" -o device 2>/dev/null) + else + dev="$dev_spec" + fi + + [ -n "$dev" ] || continue + + # Add mappings. + dev_to_keyfile["$dev"]="$keyfile" + dev_to_spec["$dev"]="$dev_spec" + done < <(grep -Ev '^\s*(#|$)' /etc/crypttab) +} + +wait_for_tpm2() { + for dev in /dev/tpmrm0 /dev/tpm0; do + local deadline=$(( $(date +%s) + 10 )) + until [ -e "$dev" ] || [ "$(date +%s)" -ge "$deadline" ]; do + sleep 1 + done + + if [ -e "$dev" ]; then + echo "$dev" + return + fi + done +} + +add_tpm2_crypttab_option() { + local dev="$1" + local dev_spec="${dev_to_spec[$dev]}" + + # Append ",tpm2-device=auto" to the options field of the matching entry; + # all other lines are unchanged. + awk -v spec="$dev_spec" ' + NF >= 4 && $2 == spec && $4 !~ /tpm2-device/ { $4 = $4 ",tpm2-device=auto" } + { print } + ' OFS='\t' /etc/crypttab > /tmp/.coriolis.crypttab.new + mv /tmp/.coriolis.crypttab.new /etc/crypttab + + echo "Added tpm2-device=auto to crypttab for $dev." +} + +enroll_systemd_cryptenroll() { + local tpm2_dev + + tpm2_dev=$(wait_for_tpm2) + if [ -z "$tpm2_dev" ]; then + echo "ERROR: no TPM2 device found; aborting to avoid lockout." >&2 + return 1 + fi + + echo "TPM2 device detected ($tpm2_dev); enrolling via systemd-cryptenroll." + + for dev in "${!dev_to_keyfile[@]}"; do + local keyfile="${dev_to_keyfile[$dev]}" + + if ! systemd-cryptenroll --tpm2-device=auto --tpm2-pcrs= \ + --unlock-key-file="$keyfile" "$dev" 2>/dev/null; then + echo "ERROR: systemd-cryptenroll failed for $dev; aborting to avoid lockout." >&2 + return 1 + fi + + if ! cryptsetup luksDump "$dev" 2>/dev/null | grep -q 'systemd-tpm2'; then + echo "ERROR: systemd-tpm2 token not found in LUKS header for $dev; aborting to avoid lockout." >&2 + return 1 + fi + + echo "systemd-cryptenroll TPM2 enrollment verified for $dev." + done +} + +remove_migration_keyslots() { + for dev in "${!dev_to_keyfile[@]}"; do + local keyfile="${dev_to_keyfile[$dev]}" + + echo "Removing migration keyslot from $dev using $keyfile." + if ! cryptsetup luksRemoveKey "$dev" "$keyfile"; then + echo "ERROR: failed to remove migration keyslot from $dev." >&2 + return 1 + fi + + echo "Migration keyslot removed from $dev." + + sed -i "s|$keyfile|none|g" /etc/crypttab + [ "$tpm2_enrolled" = "1" ] && add_tpm2_crypttab_option "$dev" + done +} + +deregister_service() { + # Only disable, do NOT delete the unit file, or daemon-reload while the + # service is still running. systemd would detect "Current command vanished" + # and kill this process immediately. + systemctl disable coriolis-luks-firstboot.service 2>/dev/null || true +} + +# main + +shopt -s nullglob +keyfiles=("$KEYFILE_DIR"/coriolis_*.key) +shopt -u nullglob + +if [ "${#keyfiles[@]}" -eq 0 ]; then + echo "ERROR: no migration keyfiles found in $KEYFILE_DIR; setup is broken." >&2 + exit 1 +fi + +echo "Found ${#keyfiles[@]} migration keyfile(s): ${keyfiles[*]}" + +declare -A dev_to_keyfile dev_to_spec +load_keyfile_map + +if [ "${#dev_to_keyfile[@]}" -eq 0 ]; then + echo "ERROR: no coriolis migration entries found in /etc/crypttab; setup is broken." >&2 + exit 1 +fi + +echo "Found ${#dev_to_keyfile[@]} crypttab entry / entries to process." + +tpm2_enrolled=0 +if command -v systemd-cryptenroll >/dev/null 2>&1; then + enroll_systemd_cryptenroll + tpm2_enrolled=1 +fi + +remove_migration_keyslots + +echo "Deleting migration keyfiles." +rm -f "${keyfiles[@]}" +rmdir "$KEYFILE_DIR" 2>/dev/null || true + +echo "Rebuilding initramfs." +# Embed the updated crypttab, so systemd-cryptsetup-generator uses the crypttab +# mapper name and tpm2-device=auto for auto-unlock. The dracut.conf.d is deleted +# after this rebuild, so its install_items (TPM2 plugin + libtss2) are still +# picked up here. +dracut --force --include /etc/crypttab /etc/crypttab +rm -f "$DRACUT_CONF" + +deregister_service + +echo "Firstboot LUKS cleanup complete." +rm -f "$0" From 599b035cf59451522ca5a95105067176e8612d78 Mon Sep 17 00:00:00 2001 From: Claudiu Belu Date: Tue, 12 May 2026 23:16:50 +0000 Subject: [PATCH 3/4] integration: Adds LUKS OS morphing test Adds `cryptsetup` to the `data-minion` Dockerfile. Required by osmount LUKS unlock / lock; `cryptsetup` `luksOpen` / `luksClose` are called over SSH on the morphing container. Adds `make_luks_device` to `test_utils.py`: formats the device with LUKS, opens it, writes a minimal Linux OS tree inside via make_os_device(), then closes the mapper. Adds integration test in which the source disk is LUKS-encrypted. The test runs a full transfer + deployment with skip_os_morphing=False, and asserts that it completed. --- coriolis/conductor/rpc/server.py | 2 +- coriolis/osmorphing/osmount/base.py | 20 ++- coriolis/osmorphing/redhat.py | 2 +- .../deployments/test_osmorphing.py | 122 +++++++++++++++++- .../dockerfiles/data-minion/Dockerfile | 2 + .../tests/integration/test_provider/imp.py | 31 ++++- .../test_provider/osmorphing/__init__.py | 7 + .../test_provider/osmorphing/rocky.py | 35 +++++ .../test_provider/osmorphing/ubuntu.py | 17 +++ coriolis/tests/integration/utils.py | 76 ++++++++++- .../tests/osmorphing/osmount/test_base.py | 25 +++- coriolis/utils.py | 4 +- 12 files changed, 322 insertions(+), 21 deletions(-) create mode 100644 coriolis/tests/integration/test_provider/osmorphing/rocky.py diff --git a/coriolis/conductor/rpc/server.py b/coriolis/conductor/rpc/server.py index e1795e7b..ce9c334c 100644 --- a/coriolis/conductor/rpc/server.py +++ b/coriolis/conductor/rpc/server.py @@ -1433,7 +1433,7 @@ def _check_transfer_running_executions(self, ctxt, transfer): def _check_valid_transfer_tasks_execution(transfer, force=False): sorted_executions = sorted( transfer.executions, key=lambda e: e.number, reverse=True) - if not sorted_executions: + if not sorted_executions and not force: raise exception.InvalidTransferState( "The Transfer has never been executed.") diff --git a/coriolis/osmorphing/osmount/base.py b/coriolis/osmorphing/osmount/base.py index 36a2ce9b..f38449e6 100644 --- a/coriolis/osmorphing/osmount/base.py +++ b/coriolis/osmorphing/osmount/base.py @@ -6,6 +6,7 @@ import base64 import collections import itertools +import json import os import re import uuid @@ -430,16 +431,27 @@ def _get_volume_block_devices(self): # where 'ln -s /dev/dm-N /dev//' # Querying for the kernel device name (KNAME) should ensure we get the # device names we desire both for physical and logical volumes. - volume_devs = self._exec_cmd("lsblk -lnao KNAME").splitlines() - LOG.debug("All block devices: %s", str(volume_devs)) + raw = self._exec_cmd("lsblk -lao KNAME,TYPE --json") + LOG.debug("All block devices: %s", raw) - volume_devs = ["/dev/%s" % d for d in volume_devs if - not re.match(r"^.*\d+$", d)] + # Exclude partitions. + blockdevices = json.loads(raw).get("blockdevices", []) + volume_devs = [ + "/dev/%s" % dev["kname"] + for dev in blockdevices + if dev.get("type") != "part" + ] LOG.debug("Ignoring block devices: %s", self._ignore_devices) volume_devs = [d for d in volume_devs if d not in self._ignore_devices] + # lsblk reads sysfs, which is shared with the host inside containers, + # so it may list devices that have no /dev node here. Drop any such + # phantom entries. + volume_devs = [d for d in volume_devs + if utils.test_ssh_path(self._ssh, d)] + LOG.info("Volume block devices: %s", volume_devs) return volume_devs diff --git a/coriolis/osmorphing/redhat.py b/coriolis/osmorphing/redhat.py index b7ac1449..3bbd7b59 100644 --- a/coriolis/osmorphing/redhat.py +++ b/coriolis/osmorphing/redhat.py @@ -122,7 +122,7 @@ def _set_dhcp_net_config(self, ifcfgs_ethernet): self._write_config_file(network_cfg_file, network_cfg) def _write_nic_configs(self, nics_info): - for idx, _ in enumerate(nics_info): + for idx, _ in enumerate(nics_info or []): dev_name = "eth%d" % idx cfg_path = "etc/sysconfig/network-scripts/ifcfg-%s" % dev_name if self._test_path(cfg_path): diff --git a/coriolis/tests/integration/deployments/test_osmorphing.py b/coriolis/tests/integration/deployments/test_osmorphing.py index ab49d341..5630e8a0 100644 --- a/coriolis/tests/integration/deployments/test_osmorphing.py +++ b/coriolis/tests/integration/deployments/test_osmorphing.py @@ -5,15 +5,33 @@ Exercises deployments with skip_os_morphing=False, OS detection, and package installation in the target OS. + +Exercices deployments with LUKS-encrypted disks. The source disk is formatted +with LUKS, and contains a minimal Linux OS inside. The transfer copies the +raw encrypted chunks to the destination device. During OS morphing, the +osmount layer detects the LUKS container, unlocks it with the supplied +passphrase, mounts the filesystem, and morphs it. OS families tested: + +- Ubuntu 24.04 (initramfs-based): initramfs is regenerated via + update-initramfs. +- Rocky Linux 9 (dracut-based): initramfs is regenerated via dracut. + +Must be run as root; requires the scsi_debug kernel module and cryptsetup. """ +import os +import tempfile import uuid +from coriolis import constants from coriolis.tests.integration import base as integration_base from coriolis.tests.integration import utils as test_utils +_LUKS_PASSPHRASE = "it-luks-encrypted" -class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase): + +class OsMorphingDeploymentTestBase( + integration_base.ReplicaIntegrationTestBase): # NOTE(claudiub): Size must be high enough to contain the tested OS and # any new packages to be added during OS morphing. @@ -21,6 +39,9 @@ class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase): def setUp(self): super().setUp() + self._prepare_src_device() + + def _prepare_src_device(self): test_utils.write_os_image_to_disk(self._src_device, "ubuntu:24.04") def _execute_transfer_and_deployment(self, deployment_kwargs=None): @@ -35,19 +56,30 @@ def _execute_transfer_and_deployment(self, deployment_kwargs=None): self.addCleanup(self._cleanup_deployment, deployment.id) self.assertDeploymentCompleted(deployment.id) + def _check_path_exists(self, device, path): + return test_utils.path_exists_on_device(device, path) + + +class BasicOsMorphingDeploymentTest(OsMorphingDeploymentTestBase): + def _assert_firstboot_setup(self): + pass + def test_deployment_with_os_morphing(self): self.assertFalse( - test_utils.path_exists_on_device(self._src_device, "usr/bin/jq"), + self._check_path_exists(self._src_device, "usr/bin/jq"), "jq was found on the source device before OS morphing", ) self._execute_transfer_and_deployment() self.assertTrue( - test_utils.path_exists_on_device(self._dst_device, "usr/bin/jq"), + self._check_path_exists(self._dst_device, "usr/bin/jq"), "jq was not found on the destination device after OS morphing", ) + self._assert_firstboot_setup() + +class OsMorphingDeploymentScriptsTests(OsMorphingDeploymentTestBase): def test_os_morphing_global_script_basic_format(self): expected_string = str(uuid.uuid4()) user_scripts = { @@ -135,3 +167,87 @@ def test_os_morphing_global_script_extended_format(self): # the replica OS disk was mounted. self.assertNotIn(self._dst_device, pre_mounts) self.assertIn(self._dst_device, post_mounts) + + +class LUKSOSMorphingDeploymentTest(BasicOsMorphingDeploymentTest): + + # Extra space for initramfs-tools and cryptsetup-initramfs packages that + # the LUKS morphing tools install on top of the base OS image. + _SCSI_DEBUG_SIZE_MB = 512 + + def setUp(self): + # prepare the LUKS key file. + with tempfile.NamedTemporaryFile( + mode="w", suffix=".key", delete=False) as fh: + self._key_file = fh.name + fh.write(_LUKS_PASSPHRASE) + self.addCleanup(os.unlink, self._key_file) + + super().setUp() + + def _prepare_src_device(self): + # Write a minimal Linux OS on the device, encrypted with LUKS. + test_utils.make_luks_device( + self._src_device, self._key_file, "ubuntu:24.04") + + # Update the tranfer information to include the LUKS passphrase. + dest_env = { + "devices": [self._dst_device], + constants.ENCRYPTED_DISKS_PASS: _LUKS_PASSPHRASE, + } + self._client.transfers.update( + self._transfer.id, + {"destination_environment": dest_env}, + ) + + def _check_path_exists(self, device, path): + with test_utils.luks_open(device, self._key_file) as mapper_path: + return test_utils.path_exists_on_device(mapper_path, path) + + def _assert_luks_common_firstboot_files(self): + dst_basename = os.path.basename(self._dst_device) + for path in [ + "usr/local/sbin/coriolis-luks-firstboot.sh", + "etc/systemd/system/coriolis-luks-firstboot.service", + "etc/systemd/system/multi-user.target.wants/" + "coriolis-luks-firstboot.service", + "etc/luks/coriolis_%s.key" % dst_basename, + ]: + self.assertTrue( + self._check_path_exists(self._dst_device, path), + "%s not found after LUKS OS morphing" % path, + ) + + def _assert_firstboot_setup(self): + self._assert_luks_common_firstboot_files() + self.assertTrue( + self._check_path_exists( + self._dst_device, "etc/cryptsetup-initramfs/conf-hook"), + "cryptsetup-initramfs conf-hook not found after LUKS OS morphing", + ) + + +class LUKSRockyLinuxOSMorphingDeploymentTest(LUKSOSMorphingDeploymentTest): + """LUKS + dracut OS morphing test using Rocky Linux 9.""" + + def _prepare_src_device(self): + test_utils.make_luks_device( + self._src_device, self._key_file, "rockylinux:9") + + dest_env = { + "devices": [self._dst_device], + constants.ENCRYPTED_DISKS_PASS: _LUKS_PASSPHRASE, + } + self._client.transfers.update( + self._transfer.id, + {"destination_environment": dest_env}, + ) + + def _assert_firstboot_setup(self): + self._assert_luks_common_firstboot_files() + self.assertTrue( + self._check_path_exists( + self._dst_device, + "etc/dracut.conf.d/99-coriolis-luks.conf"), + "dracut LUKS keyfile config not found after LUKS OS morphing", + ) diff --git a/coriolis/tests/integration/dockerfiles/data-minion/Dockerfile b/coriolis/tests/integration/dockerfiles/data-minion/Dockerfile index 63828d47..a77146e8 100644 --- a/coriolis/tests/integration/dockerfiles/data-minion/Dockerfile +++ b/coriolis/tests/integration/dockerfiles/data-minion/Dockerfile @@ -6,7 +6,9 @@ FROM ubuntu:24.04 # dbus is required for systemd to fully manage units; # sudo is used by replicator / writer setup. # kmod is required during OS morphing (modprobe is being called). +# cryptsetup is required to unlock / lock LUKS-encrypted devices during OS morphing. RUN apt-get update && apt-get install -y --no-install-recommends \ + cryptsetup \ dbus \ kmod \ openssh-server \ diff --git a/coriolis/tests/integration/test_provider/imp.py b/coriolis/tests/integration/test_provider/imp.py index b37d4c78..89dec186 100644 --- a/coriolis/tests/integration/test_provider/imp.py +++ b/coriolis/tests/integration/test_provider/imp.py @@ -15,6 +15,7 @@ from oslo_log import log as logging import paramiko +from coriolis import constants from coriolis.providers import backup_writers from coriolis.providers.base import BaseDestinationMinionPoolProvider from coriolis.providers.base import BaseEndpointDestinationOptionsProvider @@ -240,7 +241,13 @@ def restore_replica_disk_snapshots( def deploy_replica_instance( self, ctxt, connection_info, target_environment, instance_name, export_info, volumes_info, clone_disks): - return {"instance_deployment_info": {}} + info = {} + + passphrase = target_environment.get(constants.ENCRYPTED_DISKS_PASS) + if passphrase: + info[constants.ENCRYPTED_DISKS_PASS] = passphrase + + return {"instance_deployment_info": info} def finalize_replica_instance_deployment( self, ctxt, connection_info, target_environment, @@ -270,6 +277,8 @@ def cleanup_failed_replica_instance_deployment( # BaseInstanceProvider def get_os_morphing_tools(self, os_type, osmorphing_info): + if osmorphing_info.get(constants.ENCRYPTED_DISKS_PASS): + return osmorphing.LUKS_OS_MORPHERS return osmorphing.OS_MORPHERS # BaseImportInstanceProvider @@ -287,20 +296,38 @@ def deploy_os_morphing_resources( test_utils.get_host_disk_devices() - set(devices) ) + device_cgroup_rules = None + passphrase = instance_deployment_info.get( + constants.ENCRYPTED_DISKS_PASS) + if passphrase: + # luksOpen inside the container needs /dev/mapper/control. + # Docker only gives containers the device nodes passed at run time, + # so we must include it explicitly. + # + # After luksOpen, the kernel creates a new dm block device (dm-N). + # udevd inside the container tries to mknod it, but the device + # cgroup blocks access to device numbers not in the container's + # allowlist. "b *:* rwm" lifts that restriction for block devices, + # so the new mapper node becomes accessible. + devices = devices + ["/dev/mapper/control"] + device_cgroup_rules = ["b *:* rwm"] + # Mount the host's /lib/modules tree so that modprobe can # resolve built-in modules. volumes = ["/lib/modules:/lib/modules:ro"] + result = self._create_minion( "coriolis-osmorphing", connection_info, devices, volumes, setup_writer=False, + device_cgroup_rules=device_cgroup_rules, ) - return { "os_morphing_resources": {"container_id": result["container_id"]}, "osmorphing_connection_info": result["ssh_connection_info"], "osmorphing_info": { "os_type": instance_deployment_info.get("os_type", "linux"), "ignore_devices": ignore_devices, + constants.ENCRYPTED_DISKS_PASS: passphrase, }, } diff --git a/coriolis/tests/integration/test_provider/osmorphing/__init__.py b/coriolis/tests/integration/test_provider/osmorphing/__init__.py index 13ddcd8f..05d3d6de 100644 --- a/coriolis/tests/integration/test_provider/osmorphing/__init__.py +++ b/coriolis/tests/integration/test_provider/osmorphing/__init__.py @@ -2,9 +2,16 @@ # All Rights Reserved. from coriolis.osmorphing import base +from coriolis.tests.integration.test_provider.osmorphing import rocky from coriolis.tests.integration.test_provider.osmorphing import ubuntu OS_MORPHERS: list[base.BaseLinuxOSMorphingTools] = [ + rocky.TestRockyLinuxOSMorphingTools, ubuntu.TestUbuntuOSMorphingTools, ] + +LUKS_OS_MORPHERS: list[base.BaseLinuxOSMorphingTools] = [ + rocky.LUKSTestRockyLinuxOSMorphingTools, + ubuntu.LUKSTestUbuntuOSMorphingTools, +] diff --git a/coriolis/tests/integration/test_provider/osmorphing/rocky.py b/coriolis/tests/integration/test_provider/osmorphing/rocky.py new file mode 100644 index 00000000..b5488fd9 --- /dev/null +++ b/coriolis/tests/integration/test_provider/osmorphing/rocky.py @@ -0,0 +1,35 @@ +# Copyright 2026 Cloudbase Solutions Srl +# All Rights Reserved. + +""" +Rocky Linux OS Morphing tools. +""" + +from coriolis.osmorphing import rocky + + +class TestRockyLinuxOSMorphingTools(rocky.BaseRockyLinuxMorphingTools): + """Rocky Linux OSMorphing tools for integration tests.""" + + # Package meant to be installed during OS morphing. + # jq is a small package not present in the base container image. + _packages = { + None: [("jq", True)], + } + + +class LUKSTestRockyLinuxOSMorphingTools(TestRockyLinuxOSMorphingTools): + """Rocky Linux morphing tools for LUKS integration tests. + + Extends the base test tools with dracut and cryptsetup, which provide + the initramfs rebuild tool and LUKS support. The base Rocky Linux Docker + image omits them; they must be installed so initramfs can be rebuilt. + """ + + _packages = { + None: [ + ("jq", True), + ("dracut", False), + ("cryptsetup", False), + ], + } diff --git a/coriolis/tests/integration/test_provider/osmorphing/ubuntu.py b/coriolis/tests/integration/test_provider/osmorphing/ubuntu.py index 2c96d471..97dd6033 100644 --- a/coriolis/tests/integration/test_provider/osmorphing/ubuntu.py +++ b/coriolis/tests/integration/test_provider/osmorphing/ubuntu.py @@ -16,3 +16,20 @@ class TestUbuntuOSMorphingTools(ubuntu.BaseUbuntuMorphingTools): _packages = { None: [("jq", True)], } + + +class LUKSTestUbuntuOSMorphingTools(TestUbuntuOSMorphingTools): + """Ubuntu morphing tools for LUKS integration tests. + + Extends the base test tools with initramfs-tools and cryptsetup-initramfs, + which provide update-initramfs and the LUKS hook. The base Ubuntu Docker + image omits them; they must be installed so initramfs can be rebuilt. + """ + + _packages = { + None: [ + ("jq", True), + ("initramfs-tools", False), + ("cryptsetup-initramfs", False), + ], + } diff --git a/coriolis/tests/integration/utils.py b/coriolis/tests/integration/utils.py index b5efc6d7..a0906b48 100644 --- a/coriolis/tests/integration/utils.py +++ b/coriolis/tests/integration/utils.py @@ -5,6 +5,7 @@ Integration test utils. """ +import contextlib import json import os import socket @@ -371,17 +372,84 @@ def write_os_image_to_disk(device_path, container_image): _run(["docker", "rm", "-f", container_id], check=False) +def _fixup_luks_inner_os(mapper_path, luks_uuid): + """Patch the OS image inside a LUKS mapper to work with OS morphing. + + Docker container images are not full OS installs, so a few things need + fixing before Coriolis can morph them: + + 1. /etc/crypttab is missing: the LUKS mixin needs a UUID= entry there to + configure initramfs auto-unlock. + 2. /boot may be absent (e.g. Rocky Linux 9 Docker image): the osmount + root-finder requires etc, bin, sbin, and boot to all be present. + """ + mapper_name = "luks-%s" % luks_uuid + crypttab_entry = "%s\tUUID=%s\tnone\tluks\n" % (mapper_name, luks_uuid) + + with tempfile.TemporaryDirectory() as mount_point: + _run(["mount", mapper_path, mount_point]) + + try: + etc_dir = os.path.join(mount_point, "etc") + os.makedirs(etc_dir, exist_ok=True) + crypttab_path = os.path.join(etc_dir, "crypttab") + + with open(crypttab_path, "w") as fh: + fh.write(crypttab_entry) + + os.makedirs(os.path.join(mount_point, "boot"), exist_ok=True) + finally: + _run(["umount", mount_point]) + + +def make_luks_device(device_path, key_file, container_image): + """Format *device_path* with LUKS and write a minimal Linux OS inside. + + The mapper device is opened only for the duration of the call. It is closed + before returning, leaving the raw device encrypted. + + Exports the filesystem the container image onto the given device, then + writes a /etc/crypttab entry so that the LUKS mixin can find the UUID + when configuring initramfs auto-unlock during OS morphing. + """ + _run([ + "cryptsetup", "luksFormat", "--batch-mode", "--key-file", key_file, + device_path, + ]) + + luks_uuid = _run( + ["cryptsetup", "luksUUID", device_path]).stdout.decode().strip() + + with luks_open(device_path, key_file) as mapper_path: + write_os_image_to_disk(mapper_path, container_image) + _fixup_luks_inner_os(mapper_path, luks_uuid) + + +@contextlib.contextmanager +def luks_open(device_path, key_file): + mapper_name = "coriolis_luks_setup_%s" % os.path.basename(device_path) + _run([ + "cryptsetup", "luksOpen", "--key-file", key_file, device_path, + mapper_name, + ]) + + try: + yield "/dev/mapper/%s" % mapper_name + finally: + _run(["cryptsetup", "luksClose", mapper_name]) + + def path_exists_on_device(device_path, rel_path): - """Checks if *path* exists on the filesystem of *device_path*. + """Checks if *rel_path* exists on the filesystem of *device_path*. - Mounts the device read-only into a temporary directory, checks for the - path, then unmounts. + Uses lexists so dangling symlinks (e.g. absolute targets only valid inside + the OS, not on the host) are still reported as present. """ with tempfile.TemporaryDirectory() as mount_point: _run(["mount", "-o", "ro", device_path, mount_point]) try: - return os.path.exists(os.path.join(mount_point, rel_path)) + return os.path.lexists(os.path.join(mount_point, rel_path)) finally: _run(["umount", mount_point]) diff --git a/coriolis/tests/osmorphing/osmount/test_base.py b/coriolis/tests/osmorphing/osmount/test_base.py index e5094b9e..ec3adb6d 100644 --- a/coriolis/tests/osmorphing/osmount/test_base.py +++ b/coriolis/tests/osmorphing/osmount/test_base.py @@ -1,6 +1,7 @@ # Copyright 2024 Cloudbase Solutions Srl # All Rights Reserved. +import json import logging from unittest import mock @@ -73,7 +74,11 @@ def setUp(self, mock_connect): @mock.patch('paramiko.SSHClient') @mock.patch.object(base.utils, 'wait_for_port_connectivity') - def test__connect(self, mock_wait_for_port_connectivity, mock_ssh_client): + @mock.patch.object(base.utils, 'deserialize_key') + def test__connect( + self, mock_deserialize_key, mock_wait_for_port_connectivity, + mock_ssh_client, + ): base_os_mount_tools = TestBaseSSHOSMountTools( self.conn_info, self.event_manager, mock.sentinel.ignore_devices, mock.sentinel.operation_timeout) @@ -86,6 +91,7 @@ def test__connect(self, mock_wait_for_port_connectivity, mock_ssh_client): level=logging.INFO): original_connect(base_os_mount_tools) + mock_deserialize_key.assert_called_with(self.conn_info['pkey']) mock_wait_for_port_connectivity.assert_has_calls([ mock.call(self.conn_info['ip'], 22), mock.call(self.conn_info['ip'], 22), @@ -95,7 +101,7 @@ def test__connect(self, mock_wait_for_port_connectivity, mock_ssh_client): self.ssh.connect.assert_called_once_with( hostname=self.conn_info['ip'], port=22, username=self.conn_info['username'], - pkey=self.conn_info['pkey'], + pkey=mock_deserialize_key.return_value, password=self.conn_info['password']) self.ssh.set_log_channel.assert_called_once_with( @@ -646,14 +652,23 @@ def test__get_mount_destinations(self, mock_exec_cmd): self.assertEqual(result, expected_result) + @mock.patch.object(base.utils, 'test_ssh_path', return_value=True) @mock.patch.object(base.BaseSSHOSMountTools, '_exec_cmd') - def test__get_volume_block_devices(self, mock_exec_cmd): - mock_exec_cmd.return_value = "sda\nsda1\nsda2\nsdb\nsdb1\nsdb2" + def test__get_volume_block_devices(self, mock_exec_cmd, _mock_test_path): + lsblk_output = json.dumps({"blockdevices": [ + {"kname": "sda", "type": "disk"}, + {"kname": "sda1", "type": "part"}, + {"kname": "sda2", "type": "part"}, + {"kname": "sdb", "type": "disk"}, + {"kname": "sdb1", "type": "part"}, + {"kname": "sdb2", "type": "part"}, + ]}) + mock_exec_cmd.return_value = lsblk_output self.base_os_mount_tools._ignore_devices = ["/dev/sda1"] result = self.base_os_mount_tools._get_volume_block_devices() - mock_exec_cmd.assert_called_once_with("lsblk -lnao KNAME") + mock_exec_cmd.assert_called_once_with("lsblk -lao KNAME,TYPE --json") expected_result = ["/dev/sda", "/dev/sdb"] self.assertEqual(result, expected_result) diff --git a/coriolis/utils.py b/coriolis/utils.py index 08d50171..82354471 100644 --- a/coriolis/utils.py +++ b/coriolis/utils.py @@ -553,12 +553,14 @@ def connect_ssh(hostname, port, username, pkey=None, password=None, connect_timeout=None, banner_timeout=None): """Open and return a connected paramiko SSHClient. - :param pkey: a paramiko.PKey instance or None. + :param pkey: a paramiko.PKey instance, a serialized PEM string, or None. :param password: plaintext password or None. :param connect_timeout: socket-level timeout in seconds (None = default). :param banner_timeout: banner timeout in seconds passed to paramiko. :raises: exception.CoriolisException on failure. """ + if isinstance(pkey, str): + pkey = deserialize_key(pkey) ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) kwargs = dict( From 55a9ceb8c8e1ef8a37d34e89c99b11ff12a65497 Mon Sep 17 00:00:00 2001 From: Claudiu Belu Date: Wed, 27 May 2026 10:08:39 +0000 Subject: [PATCH 4/4] Adds unit tests for the LUKS mixin --- .../osmorphing/osmount/test_luks_mixin.py | 803 ++++++++++++++++++ 1 file changed, 803 insertions(+) create mode 100644 coriolis/tests/osmorphing/osmount/test_luks_mixin.py diff --git a/coriolis/tests/osmorphing/osmount/test_luks_mixin.py b/coriolis/tests/osmorphing/osmount/test_luks_mixin.py new file mode 100644 index 00000000..ac11bca6 --- /dev/null +++ b/coriolis/tests/osmorphing/osmount/test_luks_mixin.py @@ -0,0 +1,803 @@ +# Copyright 2026 Cloudbase Solutions Srl +# All Rights Reserved. + +import json +import os +from unittest import mock + +from coriolis import constants +from coriolis import exception +from coriolis.osmorphing.osmount import base +from coriolis.osmorphing.osmount import luks_mixin +from coriolis.tests import test_base + + +class ConcreteLinuxLUKSMixin( + luks_mixin.LinuxLUKSMixin, base.BaseSSHOSMountTools +): + def check_os(self): + pass + + def mount_os(self): + pass + + def dismount_os(self): + pass + + def run_user_script(self, user_script): + pass + + +_CONN_INFO = {"ip": "127.0.0.1", "username": "foo", "password": "lish"} +_OS_ROOT_DIR = "/mnt/os" +_DEV = "/dev/sda" +_PASSPHRASE = "dont-dead-open-inside" +_UUID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" +_KEYFILE = "/etc/luks/coriolis_sda.key" + + +class LinuxLUKSMixinTestCase(test_base.CoriolisBaseTestCase): + @mock.patch.object(base.BaseSSHOSMountTools, "_connect") + def setUp(self, mock_connect): + super().setUp() + self.event_manager = mock.MagicMock() + self.osmorphing_info = {constants.ENCRYPTED_DISKS_PASS: _PASSPHRASE} + self.mixin = ConcreteLinuxLUKSMixin( + _CONN_INFO, + self.event_manager, + [], + 30, + osmorphing_info=self.osmorphing_info, + ) + self.mixin._ssh = mock.MagicMock() + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_unlock_luks_device") + def test__unlock_luks_devices(self, mock_unlock): + mock_unlock.return_value = "/dev/mapper/coriolis_sda" + dev_paths = [_DEV] + + self.mixin._unlock_luks_devices(dev_paths) + + self.assertEqual(dev_paths, ["/dev/mapper/coriolis_sda"]) + self.assertEqual(self.mixin._luks_opened, [("coriolis_sda", _DEV)]) + mock_unlock.assert_called_once_with(_DEV, _PASSPHRASE) + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_close_luks_devices") + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_unlock_luks_device") + def test__unlock_luks_devices_cleans_up_on_error( + self, mock_unlock, mock_close + ): + _DEV2 = "/dev/sdb" + mock_unlock.side_effect = [ + "/dev/mapper/coriolis_sda", + exception.CoriolisException("bad passphrase"), + ] + + self.assertRaises( + exception.CoriolisException, + self.mixin._unlock_luks_devices, + [_DEV, _DEV2], + ) + + mock_close.assert_called_once_with() + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_unlock_luks_device") + def test__unlock_luks_devices_skips_non_luks(self, mock_unlock): + mock_unlock.return_value = None + dev_paths = [_DEV] + + self.mixin._unlock_luks_devices(dev_paths) + + self.assertEqual(dev_paths, [_DEV]) + self.assertEqual(self.mixin._luks_opened, []) + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_is_luks") + def test__unlock_luks_device(self, mock_is_luks): + # not LUKS, skipped regardless of passphrase. + mock_is_luks.return_value = False + self.assertIsNone(self.mixin._unlock_luks_device(_DEV, None)) + self.assertIsNone(self.mixin._unlock_luks_device(_DEV, _PASSPHRASE)) + + # is LUKS, no password. + mock_is_luks.return_value = True + self.assertRaises( + exception.CoriolisException, + self.mixin._unlock_luks_device, + _DEV, + None, + ) + + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_auth_luks") + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_is_luks") + def test__unlock_luks_device_success( + self, mock_is_luks, mock_auth_luks, mock_exec_cmd + ): + mock_is_luks.return_value = True + + result = self.mixin._unlock_luks_device(_DEV, _PASSPHRASE) + + self.assertEqual(result, "/dev/mapper/coriolis_sda") + mock_auth_luks.assert_called_once_with( + _PASSPHRASE, "/tmp/coriolis_sda.key" + ) + mock_exec_cmd.assert_called_once_with( + "sudo cryptsetup luksOpen --disable-keyring " + "--key-file /tmp/coriolis_sda.key %s coriolis_sda" % _DEV + ) + + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + def test__is_luks(self, mock_exec_cmd): + # True. + mock_exec_cmd.return_value = "" + self.assertTrue(self.mixin._is_luks(_DEV)) + mock_exec_cmd.assert_called_once_with( + "sudo cryptsetup isLuks %s" % _DEV + ) + + # False. + mock_exec_cmd.side_effect = Exception("exit code 1") + self.assertFalse(self.mixin._is_luks(_DEV)) + + # SSHCommandNotFoundException, warning. + mock_exec_cmd.side_effect = exception.SSHCommandNotFoundException() + with self.assertLogs( + "coriolis.osmorphing.osmount.luks_mixin", level='WARNING' + ): + result = self.mixin._is_luks(_DEV) + self.assertFalse(result) + + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + def test__close_luks_devices(self, mock_exec_cmd): + self.mixin._luks_opened = [ + ("coriolis_sda", "/dev/sda"), + ("coriolis_sdb", "/dev/sdb"), + ] + self.mixin._close_luks_devices() + + mock_exec_cmd.assert_any_call( + "sudo cryptsetup luksClose coriolis_sda || true" + ) + mock_exec_cmd.assert_any_call( + "sudo cryptsetup luksClose coriolis_sdb || true" + ) + self.assertEqual(self.mixin._luks_opened, []) + + @mock.patch.object(luks_mixin.utils, "write_ssh_file") + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + def test__write_remote_file(self, mock_exec_cmd, mock_write): + mock_exec_cmd.return_value = "/foo/lish" + self.mixin._write_remote_file("/bar/tender", "content") + + mock_write.assert_called_once_with( + self.mixin._ssh, "/foo/lish", b"content" + ) + mock_exec_cmd.assert_any_call("sudo mv /foo/lish /bar/tender") + + # With mode. + self.mixin._write_remote_file("/ness/dante", "content", mode="600") + + mock_exec_cmd.assert_any_call( + "sudo mv /foo/lish /ness/dante && sudo chmod 600 /ness/dante" + ) + + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_write_remote_file") + def test__auth_luks(self, mock_write, mock_exec_cmd): + with self.assertRaises(Exception): + with self.mixin._auth_luks(_PASSPHRASE, "/tmp/ting") as key_path: + self.assertEqual(key_path, "/tmp/ting") + mock_write.assert_called_once_with("/tmp/ting", _PASSPHRASE) + + # raise an exception, cleanup should be performed. + raise Exception("al code only sometimes breaks.") + + mock_exec_cmd.assert_called_once_with("sudo rm -f /tmp/ting") + + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + def test__get_tpm2_token_info(self, mock_exec_cmd): + # Exception. + mock_exec_cmd.return_value = "" + + with self.assertLogs( + "coriolis.osmorphing.osmount.luks_mixin", level="WARNING" + ): + result = self.mixin._get_tpm2_token_info(_DEV) + + self.assertEqual(result, []) + + # Only the last entry is valid / has keyslots. + header = { + "tokens": { + "0": {"type": "systemd-tpm2", "keyslots": []}, + "1": {"type": "luks2-keyring", "keyslots": ["2"]}, + "2": {"type": "systemd-tpm2", "keyslots": ["1"]}, + } + } + mock_exec_cmd.return_value = json.dumps(header) + + result = self.mixin._get_tpm2_token_info(_DEV) + + self.assertEqual(result, [("2", "1")]) + + # Empty dump. + mock_exec_cmd.return_value = json.dumps({}) + + result = self.mixin._get_tpm2_token_info(_DEV) + self.assertEqual(result, []) + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_auth_luks") + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_get_tpm2_token_info") + def test__remove_tpm2_tokens(self, mock_info, mock_exec_cmd, mock_auth): + # No tokens. + mock_info.return_value = [] + self.mixin._remove_tpm2_tokens(_DEV, _PASSPHRASE) + + # Remove token. + mock_info.return_value = [("0", "1")] + self.mixin._remove_tpm2_tokens(_DEV, _PASSPHRASE) + + mock_exec_cmd.assert_any_call( + "sudo cryptsetup token remove --token-id 0 %s" % _DEV + ) + mock_exec_cmd.assert_any_call( + "sudo cryptsetup luksKillSlot --key-file /tmp/coriolis_sda.key " + "%s 1" % _DEV + ) + mock_auth.assert_called_once() + + # Token removal failed. + mock_exec_cmd.side_effect = Exception("toe ken remove failed") + with self.assertLogs( + "coriolis.osmorphing.osmount.luks_mixin", level="WARNING" + ): + self.mixin._remove_tpm2_tokens(_DEV, _PASSPHRASE) + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_write_remote_file") + @mock.patch.object(luks_mixin.utils, "read_ssh_file") + @mock.patch.object(luks_mixin.utils, "test_ssh_path") + def test__transform_crypttab(self, mock_test, mock_read, mock_write): + # No file. + mock_test.return_value = False + + result = self.mixin._transform_crypttab(_OS_ROOT_DIR, None) + self.assertFalse(result) + + # No changes. + mock_test.return_value = True + mock_read.return_value = b"# comment\nluks-root UUID=aaa none none\n" + result = self.mixin._transform_crypttab(_OS_ROOT_DIR, lambda p: None) + self.assertFalse(result) + mock_write.assert_not_called() + + # File changed. + mock_read.return_value = b"luks-root UUID=aaa none none\n" + + def _set_opts(parts): + parts[3] = 'new-opt' + return parts + + result = self.mixin._transform_crypttab(_OS_ROOT_DIR, _set_opts) + self.assertTrue(result) + mock_write.assert_called_once() + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_write_remote_file") + @mock.patch.object(luks_mixin.utils, "read_ssh_file") + @mock.patch.object(luks_mixin.utils, "test_ssh_path", return_value=True) + def test__remove_tpm2_crypttab_options( + self, mock_test_path, mock_read, mock_write + ): + line = ( + "luks-root UUID=%s none tpm2-device=auto,x-initrd.attach\n" % _UUID + ) + mock_read.return_value = line.encode("utf-8") + + with self.assertLogs( + "coriolis.osmorphing.osmount.luks_mixin", level="INFO" + ): + self.mixin._remove_tpm2_crypttab_options(_OS_ROOT_DIR) + + new_content = mock_write.call_args[0][1] + self.assertNotIn("tpm2-", new_content) + self.assertIn("x-initrd.attach", new_content) + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_remove_tpm2_tokens") + @mock.patch.object( + luks_mixin.LinuxLUKSMixin, "_remove_tpm2_crypttab_options" + ) + def test_remove_encryption_artifacts(self, mock_opts, mock_tokens): + # No LUKS devices open. + self.mixin._luks_opened = [] + self.mixin.remove_encryption_artifacts(_OS_ROOT_DIR) + mock_tokens.assert_not_called() + mock_opts.assert_not_called() + + # Opened LUKS device. + self.mixin._luks_opened = [("coriolis_sda", _DEV)] + self.mixin.remove_encryption_artifacts(_OS_ROOT_DIR) + + mock_tokens.assert_called_once_with(_DEV, _PASSPHRASE) + mock_opts.assert_called_once_with(_OS_ROOT_DIR) + self.event_manager.progress_update.assert_called() + + def test__get_migration_keyfile_path(self): + result = self.mixin._get_migration_keyfile_path("/foo/lish") + + self.assertEqual( + os.path.join(luks_mixin._LUKS_KEYFILE_DIR, "coriolis_lish.key"), + result, + ) + + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + def test__get_luks_uuid(self, mock_exec_cmd): + mock_exec_cmd.return_value = " %s \n" % _UUID + + result = self.mixin._get_luks_uuid(_DEV) + + self.assertEqual(result, _UUID) + mock_exec_cmd.assert_called_once_with( + "sudo cryptsetup luksUUID %s" % _DEV + ) + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_transform_crypttab") + def test__update_crypttab_keyfile(self, mock_transform): + mock_transform.return_value = True + self.mixin._update_crypttab_keyfile(_OS_ROOT_DIR, {_UUID: _KEYFILE}) + mock_transform.assert_called_once() + + # Extract and exercise the inner _set_keyfile transform. + transform = mock_transform.call_args[0][1] + + # too few parts. + self.assertIsNone(transform(["luks-root"])) + + # no UUID match. + parts = ["luks-root", "PARTUUID=abc", "none", "none"] + self.assertIsNone(transform(parts)) + + # UUID not in map. + parts = [ + "luks-root", + "UUID=00000000-0000-0000-0000-000000000000", + "none", + "none", + ] + self.assertIsNone(transform(parts)) + + # UUID= format match; 'initramfs' always appended. + result = transform(["luks-root", "UUID=%s" % _UUID, "none", "none"]) + self.assertEqual(result[2], _KEYFILE) + self.assertIn("initramfs", result[3].split(",")) + + # /by-uuid/ path also matches. + parts = ["luks-root", "/dev/disk/by-uuid/%s" % _UUID, "none", "none"] + result = transform(parts) + self.assertEqual(result[2], _KEYFILE) + + # transform returns False -> exception. + mock_transform.return_value = False + self.assertRaises( + exception.CoriolisException, + self.mixin._update_crypttab_keyfile, + _OS_ROOT_DIR, + {_UUID: _KEYFILE}, + ) + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_detect_initramfs_tool") + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_configure_dracut_keyfiles") + @mock.patch.object( + luks_mixin.LinuxLUKSMixin, "_configure_initramfs_tools_keyfiles" + ) + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_update_crypttab_keyfile") + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_write_remote_file") + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_get_luks_uuid") + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + def test__write_migration_keyfiles( + self, + mock_exec, + mock_uuid, + mock_write_file, + mock_update_ct, + mock_cfg_initramfs, + mock_cfg_dracut, + mock_detect_tool, + ): + # No passphrase / opened devices. + mock_uuid.return_value = _UUID + self.mixin._osmorphing_info = {} + self.mixin._luks_opened = [] + self.mixin._write_migration_keyfiles(_OS_ROOT_DIR) + mock_exec.assert_not_called() + + # dracut branch. + self.mixin._osmorphing_info = { + constants.ENCRYPTED_DISKS_PASS: _PASSPHRASE, + } + self.mixin._luks_opened = [("coriolis_sda", _DEV)] + mock_detect_tool.return_value = "dracut" + self.mixin._write_migration_keyfiles(_OS_ROOT_DIR) + + keyfile_path = os.path.join( + luks_mixin._LUKS_KEYFILE_DIR, + "coriolis_%s.key" % os.path.basename(_DEV), + ) + expected_abs_path = os.path.join( + _OS_ROOT_DIR, keyfile_path.lstrip("/") + ) + mock_write_file.assert_called_once_with( + expected_abs_path, _PASSPHRASE, mode="400" + ) + mock_update_ct.assert_called_once_with(_OS_ROOT_DIR, {_UUID: _KEYFILE}) + mock_cfg_dracut.assert_called_once_with( + _OS_ROOT_DIR, {_UUID: _KEYFILE} + ) + mock_cfg_initramfs.assert_not_called() + + # update-initramfs branch. + mock_cfg_dracut.reset_mock() + mock_update_ct.reset_mock() + mock_write_file.reset_mock() + mock_detect_tool.return_value = "update-initramfs" + self.mixin._write_migration_keyfiles(_OS_ROOT_DIR) + + mock_cfg_initramfs.assert_called_once_with(_OS_ROOT_DIR) + mock_cfg_dracut.assert_not_called() + + # No tool found: CoriolisException raised. + mock_detect_tool.return_value = None + self.assertRaises( + exception.CoriolisException, + self.mixin._write_migration_keyfiles, + _OS_ROOT_DIR, + ) + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_write_remote_file") + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + @mock.patch.object(luks_mixin.utils, "test_ssh_path") + def test__configure_dracut_keyfiles( + self, mock_test_path, mock_exec, mock_write + ): + plugin_path = luks_mixin._CRYPTSETUP_TPM2_PLUGIN_PATHS[0] + plugin_abs = os.path.join(_OS_ROOT_DIR, plugin_path.lstrip("/")) + conf_abs = os.path.join( + _OS_ROOT_DIR, luks_mixin._DRACUT_LUKS_CONF_PATH.lstrip("/") + ) + + # no TPM2 plugin. + mock_test_path.return_value = False + self.mixin._configure_dracut_keyfiles(_OS_ROOT_DIR, {_UUID: _KEYFILE}) + + written = mock_write.call_args[0][1] + self.assertIn(_KEYFILE, written) + self.assertIn("install_items+=", written) + self.assertNotIn(plugin_path, written) + mock_exec.assert_called_once_with( + "sudo chown root:root %s && sudo chmod 644 %s" + % (conf_abs, conf_abs) + ) + + # with TPM2 plugin. + mock_exec.reset_mock() + mock_write.reset_mock() + mock_test_path.side_effect = lambda _ssh, path: path == plugin_abs + self.mixin._configure_dracut_keyfiles(_OS_ROOT_DIR, {_UUID: _KEYFILE}) + + written = mock_write.call_args[0][1] + self.assertIn(plugin_path, written) + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_write_remote_file") + @mock.patch.object(luks_mixin.utils, "read_ssh_file") + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + @mock.patch.object(luks_mixin.utils, "test_ssh_path") + def test__configure_initramfs_tools_keyfiles( + self, mock_test, mock_exec, mock_read, mock_write + ): + hook_abs = os.path.join( + _OS_ROOT_DIR, "etc/cryptsetup-initramfs/conf-hook" + ) + + # existing conf-hook: content is preserved and KEYFILE_PATTERN + # appended. + mock_test.side_effect = lambda _ssh, path: path == hook_abs + mock_read.return_value = b"# existing\n" + + self.mixin._configure_initramfs_tools_keyfiles(_OS_ROOT_DIR) + + written_content = mock_write.call_args[0][1] + self.assertIn("# existing", written_content) + self.assertIn("KEYFILE_PATTERN=", written_content) + self.assertIn("/etc/luks/coriolis_*.key", written_content) + + # no existing conf-hook: KEYFILE_PATTERN still written. + mock_test.side_effect = None + mock_test.return_value = False + mock_write.reset_mock() + + self.mixin._configure_initramfs_tools_keyfiles(_OS_ROOT_DIR) + + written_content = mock_write.call_args[0][1] + self.assertIn("KEYFILE_PATTERN=", written_content) + + @mock.patch.object(luks_mixin.utils, "test_ssh_path") + def test__detect_initramfs_tool(self, mock_test): + # update-initramfs. + mock_test.side_effect = lambda _ssh, path: "update-initramfs" in path + result = self.mixin._detect_initramfs_tool(_OS_ROOT_DIR) + self.assertEqual(result, "update-initramfs") + + # dracut. + mock_test.side_effect = lambda _ssh, path: "dracut" in path + result = self.mixin._detect_initramfs_tool(_OS_ROOT_DIR) + self.assertEqual(result, "dracut") + + # None. + mock_test.side_effect = None + mock_test.return_value = False + result = self.mixin._detect_initramfs_tool(_OS_ROOT_DIR) + self.assertIsNone(result) + + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + @mock.patch.object(luks_mixin.utils, "test_ssh_path") + def test__build_dracut_include_args(self, mock_test, mock_exec): + # no crypttab. + mock_test.return_value = False + mock_exec.return_value = "" + result = self.mixin._build_dracut_include_args(_OS_ROOT_DIR) + self.assertEqual(result, []) + + # with crypttab and keyfile. + crypttab_path = os.path.join(_OS_ROOT_DIR, "etc/crypttab") + luks_dir = os.path.join(_OS_ROOT_DIR, "etc/luks") + keyfile_abs = os.path.join(luks_dir, "coriolis_sda.key") + + mock_test.side_effect = lambda _ssh, path: path == crypttab_path + mock_exec.return_value = keyfile_abs + result = self.mixin._build_dracut_include_args(_OS_ROOT_DIR) + + expected = [ + "--include", + "/etc/crypttab", + "/etc/crypttab", + "--include", + "/etc/luks/coriolis_sda.key", + "/etc/luks/coriolis_sda.key", + ] + self.assertEqual(result, expected) + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_build_dracut_include_args") + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_detect_initramfs_tool") + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + def test__rebuild_initramfs( + self, mock_exec, mock_detect, mock_include_args + ): + # update-initramfs. + mock_detect.return_value = "update-initramfs" + self.mixin._rebuild_initramfs(_OS_ROOT_DIR) + mock_exec.assert_called_once_with( + "sudo chroot %s update-initramfs -u -k all" % _OS_ROOT_DIR + ) + + # dracut: --regenerate-all --force with --include args. + mock_exec.reset_mock() + mock_detect.return_value = "dracut" + mock_include_args.return_value = [ + "--include", + "/etc/crypttab", + "/etc/crypttab", + ] + self.mixin._rebuild_initramfs(_OS_ROOT_DIR) + + mock_include_args.assert_called_once_with(_OS_ROOT_DIR) + mock_exec.assert_called_once_with( + "sudo chroot %s dracut --regenerate-all --force " + "--include /etc/crypttab /etc/crypttab" % _OS_ROOT_DIR + ) + + # no tool found. + mock_detect.return_value = None + self.assertRaises( + exception.CoriolisException, + self.mixin._rebuild_initramfs, + _OS_ROOT_DIR, + ) + + @mock.patch.object(luks_mixin.utils, 'test_ssh_path') + def test__detect_init_system(self, mock_test): + mock_test.side_effect = lambda _ssh, path: 'systemd/systemd' in path + self.assertEqual( + self.mixin._detect_init_system(_OS_ROOT_DIR), 'systemd' + ) + + mock_test.side_effect = lambda _ssh, path: path.endswith('openrc') + self.assertEqual( + self.mixin._detect_init_system(_OS_ROOT_DIR), 'openrc' + ) + + mock_test.side_effect = lambda _ssh, path: path.endswith('initctl') + self.assertEqual( + self.mixin._detect_init_system(_OS_ROOT_DIR), 'upstart' + ) + + # sysvinit fallback. + mock_test.side_effect = None + mock_test.return_value = False + self.assertEqual( + self.mixin._detect_init_system(_OS_ROOT_DIR), 'sysvinit' + ) + + @mock.patch.object(base.BaseSSHOSMountTools, "_exec_cmd") + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_write_remote_file") + def test__register_firstboot_script_systemd(self, mock_write, mock_exec): + self.mixin._register_firstboot_script_systemd(_OS_ROOT_DIR) + + unit_abs = os.path.join( + _OS_ROOT_DIR, luks_mixin._SYSTEMD_UNIT_PATH.lstrip("/") + ) + unit = luks_mixin._SYSTEMD_UNIT + mock_write.assert_called_once_with(unit_abs, unit) + wants_dir = os.path.join( + _OS_ROOT_DIR, 'etc/systemd/system/multi-user.target.wants' + ) + mock_exec.assert_any_call('sudo mkdir -p %s' % wants_dir) + mock_exec.assert_has_calls( + [ + mock.call( + "sudo chown root:root %s && sudo chmod 644 %s" + % (unit_abs, unit_abs) + ), + mock.call('sudo mkdir -p %s' % wants_dir), + mock.call( + "sudo ln -sf %s %s/coriolis-luks-firstboot.service" + % (luks_mixin._SYSTEMD_UNIT_PATH, wants_dir) + ), + ] + ) + + @mock.patch.object( + luks_mixin.LinuxLUKSMixin, '_register_firstboot_script_systemd' + ) + @mock.patch.object( + luks_mixin.LinuxLUKSMixin, + '_detect_init_system', + return_value='systemd', + ) + @mock.patch.object(base.BaseSSHOSMountTools, '_exec_cmd') + @mock.patch.object(luks_mixin.LinuxLUKSMixin, "_write_remote_file") + @mock.patch.object(luks_mixin.LinuxLUKSMixin, '_detect_initramfs_tool') + def test__install_luks_firstboot_script( + self, + mock_detect_tool, + mock_write, + mock_exec, + mock_detect_init, + mock_reg_systemd, + ): + # no initramfs tool found. + mock_detect_tool.return_value = None + self.assertRaises( + exception.CoriolisException, + self.mixin._install_luks_firstboot_script, + _OS_ROOT_DIR, + ) + + # update-initramfs. + mock_detect_tool.return_value = 'update-initramfs' + mock_detect_init.return_value = "systemd" + + self.mixin._install_luks_firstboot_script(_OS_ROOT_DIR) + + script_abs = os.path.join( + _OS_ROOT_DIR, luks_mixin._FIRSTBOOT_SCRIPT_PATH.lstrip("/") + ) + mock_exec.assert_has_calls( + [ + mock.call("sudo mkdir -p %s" % os.path.dirname(script_abs)), + mock.call( + "sudo chown root:root %s && sudo chmod 500 %s" + % (script_abs, script_abs) + ), + ] + ) + mock_reg_systemd.assert_called_once_with(_OS_ROOT_DIR) + script = luks_mixin._LUKS_FIRSTBOOT_SCRIPTS['update-initramfs'] + mock_write.assert_called_once_with(script_abs, script) + + # dracut. + mock_detect_tool.return_value = 'dracut' + mock_write.reset_mock() + + self.mixin._install_luks_firstboot_script(_OS_ROOT_DIR) + + script = luks_mixin._LUKS_FIRSTBOOT_SCRIPTS['dracut'] + mock_write.assert_called_once_with(script_abs, script) + + @mock.patch.object( + luks_mixin.LinuxLUKSMixin, '_install_luks_firstboot_script' + ) + @mock.patch.object(luks_mixin.LinuxLUKSMixin, '_rebuild_initramfs') + @mock.patch.object(luks_mixin.LinuxLUKSMixin, '_fix_grub_luks_root') + @mock.patch.object(luks_mixin.LinuxLUKSMixin, '_write_migration_keyfiles') + def test_install_encryption_firstboot_setup( + self, mock_write_keyfiles, mock_grub, mock_rebuild, mock_install + ): + # No LUKS opened. + self.mixin._luks_opened = [] + self.mixin.install_encryption_firstboot_setup(_OS_ROOT_DIR) + mock_write_keyfiles.assert_not_called() + + # LUKS opened. + self.mixin._luks_opened = [("coriolis_sda", _DEV)] + + self.mixin.install_encryption_firstboot_setup(_OS_ROOT_DIR) + + mock_write_keyfiles.assert_called_once_with(_OS_ROOT_DIR) + mock_grub.assert_called_once_with(_OS_ROOT_DIR) + mock_rebuild.assert_called_once_with(_OS_ROOT_DIR) + mock_install.assert_called_once_with(_OS_ROOT_DIR) + + @mock.patch.object(luks_mixin.LinuxLUKSMixin, '_write_remote_file') + @mock.patch.object(luks_mixin.LinuxLUKSMixin, '_get_luks_uuid') + @mock.patch.object(base.BaseSSHOSMountTools, '_exec_cmd') + @mock.patch.object(luks_mixin.utils, 'read_ssh_file') + @mock.patch.object(luks_mixin.utils, 'test_ssh_path') + def test__fix_grub_luks_root_patches_grub( + self, + mock_test_path, + mock_read_file, + mock_exec, + mock_get_luks_uuid, + mock_write_file, + ): + # no opened LUKS devices. + self.mixin._luks_opened = [] + self.mixin._fix_grub_luks_root(_OS_ROOT_DIR) + mock_test_path.assert_not_called() + + # no crypttab. + self.mixin._luks_opened = [("coriolis_sda", _DEV)] + mock_test_path.return_value = False + + self.mixin._fix_grub_luks_root(_OS_ROOT_DIR) + + crypttab_path = os.path.join(_OS_ROOT_DIR, "etc/crypttab") + mock_test_path.assert_called_once_with(self.mixin._ssh, crypttab_path) + mock_read_file.assert_not_called() + + # no uuid_to_crypttab_name. + mock_test_path.return_value = True + mock_read_file.return_value = "".encode("utf-8") + + self.mixin._fix_grub_luks_root(_OS_ROOT_DIR) + + mock_read_file.assert_called_once_with(self.mixin._ssh, crypttab_path) + mock_get_luks_uuid.assert_not_called() + + # no replacements. + crypttab = 'luks-root UUID=%s none none\n' % _UUID.lower() + grub_content = ( + 'set root=/dev/mapper/coriolis_sda\n' + 'linux /vmlinuz root=/dev/mapper/coriolis_sda\n' + ) + mock_read_file.return_value = crypttab.encode('utf-8') + mock_get_luks_uuid.return_value = "" + + self.mixin._fix_grub_luks_root(_OS_ROOT_DIR) + + mock_get_luks_uuid.assert_called_once_with(_DEV) + mock_exec.assert_not_called() + + # with replacements. + mock_get_luks_uuid.return_value = _UUID + grub_path = os.path.join(_OS_ROOT_DIR, 'boot/grub/grub.cfg') + crypttab_path = os.path.join(_OS_ROOT_DIR, 'etc/crypttab') + mock_test_path.side_effect = lambda _ssh, path: ( + path in (crypttab_path, grub_path) + ) + mock_exec.return_value = grub_content + + self.mixin._fix_grub_luks_root(_OS_ROOT_DIR) + + written = mock_write_file.call_args[0][1] + self.assertIn('/dev/mapper/luks-root', written) + self.assertNotIn('/dev/mapper/coriolis_sda', written)