diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index e5ba468..004f9ee 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -15,6 +15,15 @@ jobs: - os: ubuntu-latest python: '3.11' toxenv: py + - os: ubuntu-latest + python: '3.12' + toxenv: py + - os: ubuntu-latest + python: '3.13' + toxenv: py + - os: ubuntu-latest + python: '3.14' + toxenv: py # windows - os: windows-latest python: "3.10" diff --git a/CHANGES.md b/CHANGES.md index 0e786d7..3450c6a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -18,3 +18,9 @@ - add support for control server - add shared sample and window transform modules - add virtual channel runtime and vadd command + +## 1.0.1 (23/03/2026) + +- improve pdevinfo output +- new triggering logic +- add version command diff --git a/README.md b/README.md index 7e16eb1..1d71995 100644 --- a/README.md +++ b/README.md @@ -40,11 +40,11 @@ Plugins are automatically detected by Nxscli. Available plugins: * [nxscli-mpl](https://github.com/railab/nxscli-mpl) - Matplotlib extension +* [nxscli-pqg](https://github.com/railab/nxscli-pqg) - PyQtGraph extension ## Plugins Planned * Stream data as audio (inspired by audio knock detection systems) -* PyQtGraph support ## Installation diff --git a/docs/triggers.rst b/docs/triggers.rst new file mode 100644 index 0000000..72d2764 --- /dev/null +++ b/docs/triggers.rst @@ -0,0 +1,133 @@ +======== +Triggers +======== + +Overview +======== + +Nxscli supports software triggers configured globally with ``trig`` or +per-plugin with ``--trig``. + +Current trigger types: + +* ``on`` - always on +* ``off`` - always off +* ``er`` - edge rising +* ``ef`` - edge falling +* ``we`` - window enter +* ``wx`` - window exit + +Current capture modes: + +* ``mode=start_after`` - default behavior, emit data only after trigger +* ``mode=stop_after`` - stream data until trigger, then stop after ``post`` + samples + +Syntax +====== + +Format: + +.. code-block:: text + + [channel]:[trigger][#source][@vector],[legacy_hoffset],[level_or_low][,high][,name=value...] + +Supported named options: + +* ``mode=start_after|stop_after`` +* ``pre=`` - explicit pre-trigger samples for start-after capture +* ``post=`` - post-trigger samples for stop-after capture +* ``holdoff=`` - reserved for upcoming repeated/scope modes +* ``rearm=true|false`` - reserved for upcoming repeated/scope modes + +Notes: + +* The positional ``hoffset`` field is preserved for backward compatibility and + currently acts as legacy pre-trigger history. +* Advanced trigger capture modes currently target NumPy block streams. +* Trigger source can be cross-channel via ``#chan`` and vector index via + ``@idx``. +* Window triggers use positional parameters ``hoffset,low,high``. + +Trigger Semantics +================= + +All edge and window triggers are evaluated on consecutive sample pairs. +That means the trigger boundary is defined between sample ``n`` and +sample ``n+1``, not "on" one single sample value. + +Edge triggers: + +* ``er`` fires when sample ``n`` is at or below ``level`` and sample + ``n+1`` is above ``level``. +* ``ef`` fires when sample ``n`` is at or above ``level`` and sample + ``n+1`` is below ``level``. + +Window triggers: + +* ``we`` fires when sample ``n`` is outside ``[low, high]`` and sample + ``n+1`` is inside ``[low, high]``. +* ``wx`` fires when sample ``n`` is inside ``[low, high]`` and sample + ``n+1`` is outside ``[low, high]``. + +Trigger Boundary +================ + +For ``er``, ``ef``, ``we``, and ``wx``, the trigger point belongs to the +transition between sample ``n`` and sample ``n+1``. + +That means these triggers are boundary-based, not whole-sample events. +Any downstream consumer should treat the trigger position as the crossing +between the two samples that satisfied the trigger condition. + +Downstream Delivery +=================== + +Trigger events are delivered together with normal stream payload handling. +Plugins first read triggered data from their queue, then read the matching +trigger event metadata for that payload batch. + +The trigger event metadata currently includes: + +* trigger position within the emitted payload +* channel id associated with the emitted event +* capture mode + +Examples +======== + +Always on: + +.. code-block:: bash + + python -m nxscli dummy trig "g:on" chan 17 pprinter 8 + +Start after trigger with pre-trigger history: + +.. code-block:: bash + + python -m nxscli dummy trig "g:er#17,0,0.5,pre=16" chan 17 pnpsave 64 /tmp/nxs_trigger_pre + +Stop after trigger with post-trigger tail: + +.. code-block:: bash + + python -m nxscli dummy trig "g:er#17,0,0.5,mode=stop_after,post=32" chan 17 pnpsave 256 /tmp/nxs_trigger_post + +Vector trigger on deterministic mixed vector channel: + +.. code-block:: bash + + python -m nxscli dummy trig "g:er#25@1,0,0.5,pre=8" chan 25 pnpsave 64 /tmp/nxs_trigger_vec + +Window enter trigger on deterministic sine source: + +.. code-block:: bash + + python -m nxscli dummy trig "g:we#22,0,-0.25,0.25,pre=16" chan 22 pnpsave 64 /tmp/nxs_trigger_window_enter + +Window exit trigger on deterministic sine source: + +.. code-block:: bash + + python -m nxscli dummy trig "g:wx#22,0,-0.25,0.25,pre=16" chan 22 pnpsave 64 /tmp/nxs_trigger_window_exit diff --git a/docs/usage.rst b/docs/usage.rst index c775dd2..9e38b7e 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -42,6 +42,7 @@ with various channel configurations (based on ``pcap`` from ``nxscli-mpl``): Library integration guide: * :doc:`library` +* :doc:`triggers` Interface Commands ------------------ @@ -69,10 +70,15 @@ Supported interface commands: - 14: hist_bimodal - vdim = 1, deterministic bi-modal - 15: xy_lissajous - vdim = 2, correlated XY signal - 16: polar_theta_radius - vdim = 2, (theta, radius) signal - - 17: step_up_once - vdim = 1, one rising step - - 18: step_down_once - vdim = 1, one falling step - - 19: pulse_square_20p - vdim = 1, periodic square pulse (20% duty) - - 20: pulse_single_sparse - vdim = 1, one-sample pulse every 250 samples + - 17: step_low_to_high - vdim = 1, one low-to-high step + - 18: step_high_to_low - vdim = 1, one high-to-low step + - 19: square_wave_20p - vdim = 1, periodic square wave (20% duty) + - 20: impulse_sparse - vdim = 1, one-sample impulse every 250 samples + - 21: square_wave_50p - vdim = 1, periodic square wave (50% duty) + - 22: sine_slow - vdim = 1, slow sine wave + - 23: impulse_clustered - vdim = 1, clustered impulses + - 24: impulse_once_ref - vdim = 1, one-shot impulse reference + - 25: vec3_mixed_steps - vdim = 3, mixed vector step source * ``serial`` - select serial port NxScope interface @@ -95,6 +101,7 @@ Available configuration commands: Optional, at default all channels are always-on. Triggers can be configured per channel with the option ``--trig``. + For syntax, modes, and validation commands see :doc:`triggers`. * ``vadd`` - add virtual channel in `nxscli` virtual runtime. diff --git a/pyproject.toml b/pyproject.toml index 887c5f5..f19660d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,23 +4,26 @@ build-backend = 'setuptools.build_meta' [project] name = "nxscli" -version = "1.0.0" +version = "1.0.1" authors = [{name = "raiden00", email = "raiden00@railab.me"}] description = "Nxscope CLI client" license = {text = "Apache-2.0"} readme = "README.md" requires-python = ">=3.10" dependencies = [ - "nxslib>=1.0.0", + "nxslib>=1.0.1", "click>=8.1", - "numpy" + "numpy", + "rich>=13.0" ] classifiers = [ - "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", "Topic :: Software Development :: Embedded Systems", "Operating System :: OS Independent", ] diff --git a/src/nxscli/cli/main.py b/src/nxscli/cli/main.py index 3259e52..cb470d0 100644 --- a/src/nxscli/cli/main.py +++ b/src/nxscli/cli/main.py @@ -43,17 +43,23 @@ def main( control_endpoint: str, ) -> bool: """Nxscli - Command-line clinet to the NxScope.""" + click_ctx = click.get_current_context() + ctx.debug = debug if debug: # pragma: no cover logger.setLevel("DEBUG") else: logger.setLevel("INFO") - ctx.phandler = PluginHandler(plugins_list) parse = Parser() ctx.parser = parse ctx.triggers = {} ctx.nxscope_plugins = [] + + if click_ctx.invoked_subcommand == "version": + return True + + ctx.phandler = PluginHandler(plugins_list) if control_server: try: ctx.nxscope_plugins.append(ControlServerPlugin(control_endpoint)) @@ -61,7 +67,7 @@ def main( ctx.phandler.cleanup() raise - click.get_current_context().call_on_close(cli_on_close) + click_ctx.call_on_close(cli_on_close) return True @@ -212,6 +218,11 @@ def cli_on_close(ctx: Environment) -> bool: def click_final_init() -> None: """Handle final Click initialization.""" + # add standalone commands + for cmd in commands_list: + if cmd.name == "version": + main.add_command(cmd) + # add interfaces for intf in interfaces_list: main.add_command(intf) @@ -219,6 +230,8 @@ def click_final_init() -> None: # add commands to interfaces for group in interfaces_list: for cmd in commands_list: + if cmd.name == "version": + continue group.add_command(cmd) diff --git a/src/nxscli/cli/types.py b/src/nxscli/cli/types.py index ae66e86..58a5fee 100644 --- a/src/nxscli/cli/types.py +++ b/src/nxscli/cli/types.py @@ -118,6 +118,56 @@ class Trigger(click.ParamType): req_global = "g" req_vect = "@" + @staticmethod + def _parse_bool(value: str) -> bool: + """Parse common boolean tokens.""" + norm = value.strip().lower() + if norm in ("1", "true", "yes", "on"): + return True + if norm in ("0", "false", "no", "off"): + return False + raise click.BadParameter(f"invalid trigger boolean value: {value}") + + @staticmethod + def _parse_channel_ref(value: str) -> int | ChannelRef: + """Parse physical or virtual trigger source token.""" + if value.startswith("v"): + virt = value[1:] + if not virt.isnumeric(): + raise click.BadParameter( + "virtual trigger source must be like v0" + ) + return ChannelRef.virtual(int(virt)) + return int(value) + + def _parse_named_args( + self, cfg: list[str] + ) -> tuple[list[str], dict[str, Any]]: + """Split positional and named trigger arguments.""" + positional: list[str] = [] + named: dict[str, Any] = {} + + for item in cfg: + if "=" not in item: + positional.append(item) + continue + + key, raw = item.split("=", 1) + if key == "mode": + named["mode"] = raw + elif key == "pre": + named["pre_samples"] = int(raw) + elif key == "post": + named["post_samples"] = int(raw) + elif key == "holdoff": + named["holdoff"] = int(raw) + elif key == "rearm": + named["rearm"] = self._parse_bool(raw) + else: + raise click.BadParameter(f"unsupported trigger option: {key}") + + return positional, named + def convert( self, value: Any, param: Any, ctx: Any ) -> dict[int, DTriggerConfigReq]: @@ -135,15 +185,17 @@ def convert( if vect_idx != -1 and cross_idx != -1: if vect_idx > cross_idx: trg = tmp[0][:cross_idx] - cross = int(tmp[0][cross_idx + 1 : vect_idx]) + cross = self._parse_channel_ref( + tmp[0][cross_idx + 1 : vect_idx] + ) vect = int(tmp[0][vect_idx + 1 :]) else: trg = tmp[0][:vect_idx] vect = int(tmp[0][vect_idx + 1 : cross_idx]) - cross = int(tmp[0][cross_idx + 1 :]) + cross = self._parse_channel_ref(tmp[0][cross_idx + 1 :]) elif vect_idx == -1 and cross_idx != -1: trg, cross_s = tmp[0].split(self.req_cross) - cross = int(cross_s) + cross = self._parse_channel_ref(cross_s) vect = 0 elif vect_idx != -1 and cross_idx == -1: trg, vect_s = tmp[0].split(self.req_vect) @@ -154,7 +206,7 @@ def convert( vect = 0 cross = None - cfg = tmp[1:] + cfg, named = self._parse_named_args(tmp[1:]) # special case for global configuration if schan == self.req_global: chan = -1 @@ -165,7 +217,7 @@ def convert( if cross == chan: cross = None - req = DTriggerConfigReq(trg, cross, vect, cfg) + req = DTriggerConfigReq(trg, cross, vect, cfg, **named) ret[chan] = req return ret diff --git a/src/nxscli/commands/cmd_version.py b/src/nxscli/commands/cmd_version.py new file mode 100644 index 0000000..ea388e6 --- /dev/null +++ b/src/nxscli/commands/cmd_version.py @@ -0,0 +1,58 @@ +"""Version command.""" + +from dataclasses import dataclass +from importlib.metadata import PackageNotFoundError, distributions, version + +import click + + +@dataclass(frozen=True) +class PluginPackageVersion: + """Installed plugin package version details.""" + + name: str + version: str + + +def _get_package_version(name: str) -> str: + """Return installed package version or a readable fallback.""" + try: + return version(name) + except PackageNotFoundError: + return "not installed" + + +def get_plugin_package_versions() -> list[PluginPackageVersion]: + """Get versions for external nxscli plugin packages.""" + packages: dict[str, PluginPackageVersion] = {} + + for dist in distributions(): + entry_points = [ + entry + for entry in dist.entry_points + if entry.group == "nxscli.extensions" + ] + if not entry_points: + continue + + name = dist.metadata["Name"] + if name == "nxscli": + continue + + packages[name] = PluginPackageVersion( + name=name, + version=dist.version, + ) + + return sorted(packages.values(), key=lambda item: item.name) + + +@click.command(name="version") +def cmd_version() -> None: + """Print installed versions for nxscli, nxslib and plugin packages.""" + click.echo(f"nxscli: {_get_package_version('nxscli')}") + click.echo(f"nxslib: {_get_package_version('nxslib')}") + click.echo("plugins:") + + for package in get_plugin_package_versions(): + click.echo(f"- {package.name}: {package.version}") diff --git a/src/nxscli/commands/config/cmd_trig.py b/src/nxscli/commands/config/cmd_trig.py index ef85c84..09a211f 100644 --- a/src/nxscli/commands/config/cmd_trig.py +++ b/src/nxscli/commands/config/cmd_trig.py @@ -44,10 +44,19 @@ def cmd_trig( off - always off er - edge rising, parameters: [hoffset, level] ef - edge falling: [hoffset, level] + we - window enter, parameters: [hoffset, low, high] + wx - window exit, parameters: [hoffset, low, high] + + Optional named trigger options: + mode=start_after|stop_after + pre= + post= + holdoff= + rearm=true|false \b where: - hoffset - horizontal offset for triggered data + hoffset - legacy pre-trigger sample count level - trigger level Default: all channels on ('g:on'). @@ -67,6 +76,14 @@ def cmd_trig( chan 4 always off, white spaces ignored: '2 : er#2@1, 0, 1; 3 : on; 4 : off' + - all chans stream until edge, then keep 64 post-trigger samples: + 'g:er,0,0.5,mode=stop_after,post=64' + - all chans capture 32 samples before rising edge: + 'g:er,0,0.5,pre=32' + - all chans capture 32 samples before entering window [-0.5, 0.5]: + 'g:we,0,-0.5,0.5,pre=32' + - all chans capture after exiting window [-0.25, 0.25]: + 'g:wx,0,-0.25,0.25' """ # noqa: D301 assert ctx.phandler ctx.triggers = triggers diff --git a/src/nxscli/commands/interface/cmd_dummy.py b/src/nxscli/commands/interface/cmd_dummy.py index a39198f..fee8bfc 100644 --- a/src/nxscli/commands/interface/cmd_dummy.py +++ b/src/nxscli/commands/interface/cmd_dummy.py @@ -50,10 +50,15 @@ def cmd_dummy( 14: hist_bimodal - vdim = 1, deterministic bi-modal 15: xy_lissajous - vdim = 2, correlated XY signal 16: polar_theta_radius - vdim = 2, (theta, radius) signal - 17: step_up_once - vdim = 1, one rising step - 18: step_down_once - vdim = 1, one falling step - 19: pulse_square_20p - vdim = 1, periodic square pulse (20% duty) - 20: pulse_single_sparse - vdim = 1, one-sample pulse every 250 samples + 17: step_low_to_high - vdim = 1, one low-to-high step + 18: step_high_to_low - vdim = 1, one high-to-low step + 19: square_wave_20p - vdim = 1, periodic square wave (20% duty) + 20: impulse_sparse - vdim = 1, one-sample impulse every 250 samples + 21: square_wave_50p - vdim = 1, periodic square wave (50% duty) + 22: sine_slow - vdim = 1, slow sine wave + 23: impulse_clustered - vdim = 1, clustered impulses + 24: impulse_once_ref - vdim = 1, one-shot impulse reference + 25: vec3_mixed_steps - vdim = 3, mixed vector step source """ # noqa: D301 intf = DummyDev( rxpadding=writepadding, diff --git a/src/nxscli/ext_commands.py b/src/nxscli/ext_commands.py index f6904a5..e287d76 100644 --- a/src/nxscli/ext_commands.py +++ b/src/nxscli/ext_commands.py @@ -9,6 +9,7 @@ from nxscli.commands.cmd_npsave import cmd_pnpsave from nxscli.commands.cmd_printer import cmd_printer from nxscli.commands.cmd_udp import cmd_pudp +from nxscli.commands.cmd_version import cmd_version from nxscli.commands.config.cmd_chan import cmd_chan from nxscli.commands.config.cmd_trig import cmd_trig from nxscli.commands.config.cmd_vadd import cmd_vadd @@ -27,4 +28,5 @@ cmd_pnone, cmd_printer, cmd_pudp, + cmd_version, ] diff --git a/src/nxscli/idata.py b/src/nxscli/idata.py index 25eac45..a060ffd 100644 --- a/src/nxscli/idata.py +++ b/src/nxscli/idata.py @@ -12,7 +12,7 @@ from nxslib.dev import DeviceChannel from nxscli.channelref import ChannelRef - from nxscli.trigger import TriggerHandler + from nxscli.trigger import DTriggerEvent, TriggerHandler ############################################################################### # Class: PluginDataCb @@ -40,6 +40,7 @@ def __init__( que: queue.Queue[Any], channel: "DeviceChannel", trig: "TriggerHandler", + aux_drain: "Callable[[], None] | None" = None, ): """Initialize a queue data handler. @@ -50,6 +51,8 @@ def __init__( self._queue = que self._channel = channel self._trigger = trig + self._aux_drain = aux_drain + self._last_trigger_event: "DTriggerEvent | None" = None def __str__(self) -> str: """Format string representation.""" @@ -98,13 +101,23 @@ def queue_get(self, block: bool, timeout: float = 1.0) -> Any: :param timeout: get data timeout """ ret = [] + if self._aux_drain is not None: + self._aux_drain() try: # get data from queue ret = self._queue.get(block=block, timeout=timeout) except queue.Empty: pass - return self._trigger.data_triggered(ret) + payload = self._trigger.data_triggered(ret) + self._last_trigger_event = self._trigger.pop_trigger_event() + return payload + + def pop_trigger_event(self) -> "DTriggerEvent | None": + """Return and clear the last trigger event metadata.""" + event = self._last_trigger_event + self._last_trigger_event = None + return event ############################################################################### @@ -134,6 +147,7 @@ def __init__( self._cb = cb # queue handlers + self._aux_qd: list[tuple[queue.Queue[Any], "TriggerHandler"]] = [] self._qdlist = self._qdlist_init() def __del__(self) -> None: @@ -147,6 +161,8 @@ def _qdlist_init(self) -> list[PluginQueueData]: from nxscli.channelref import ChannelRef ret = [] + visible_ids = {chan.data.chan for chan in self._chanlist} + aux_source_ids: set[int] = set() for i, channel in enumerate(self._chanlist): # get queue with data if channel.data.chan >= 0: @@ -162,16 +178,57 @@ def _qdlist_init(self) -> list[PluginQueueData]: ) que = self._cb.stream_sub(cref) # initialize queue handler - pdata = PluginQueueData(que, channel, self._trig[i]) + pdata = PluginQueueData( + que, channel, self._trig[i], aux_drain=self._aux_drain + ) # add hanler to a list ret.append(pdata) + + srcchan = self._trig[i].config.srcchan + srcref = self._trig[i].config.source_ref + if ( + srcchan is None + or srcref is None + or srcchan in visible_ids + or srcchan in aux_source_ids + ): + continue + src_trig = self._find_trigger(srcchan) + if src_trig is None: + raise AssertionError( + f"missing source trigger handler for channel {srcchan}" + ) + aux_q = self._cb.stream_sub(srcref) + self._aux_qd.append((aux_q, src_trig)) + aux_source_ids.add(srcchan) return ret + def _find_trigger(self, chan: int) -> "TriggerHandler | None": + for trig in self._trig: + if trig.chan == chan: + return trig + from nxscli.trigger import TriggerHandler + + return TriggerHandler.find_by_channel(chan) + + def _aux_drain(self) -> None: + for aux_q, trig in self._aux_qd: + while True: + try: + data = aux_q.get(block=False) + except queue.Empty: + break + trig.data_triggered(data) + trig.pop_trigger_event() + def _queue_deinit(self) -> None: """Deinitialize queue.""" for pdata in self._qdlist: self._cb.stream_unsub(pdata.queue) self._qdlist.clear() + for aux_q, _ in self._aux_qd: + self._cb.stream_unsub(aux_q) + self._aux_qd.clear() # clean up triggers # TODO: revisit where this beleong, here or in plugins ? diff --git a/src/nxscli/phandler.py b/src/nxscli/phandler.py index 2659d01..ea806c6 100644 --- a/src/nxscli/phandler.py +++ b/src/nxscli/phandler.py @@ -6,7 +6,13 @@ from nxscli.idata import PluginDataCb from nxscli.logger import logger from nxscli.stream_hub import SharedStreamProvider -from nxscli.trigger import DTriggerConfigReq, TriggerHandler, trigger_from_req +from nxscli.trigger import ( + DTriggerConfig, + DTriggerConfigReq, + ETriggerType, + TriggerHandler, + trigger_from_req, +) if TYPE_CHECKING: import queue @@ -687,12 +693,59 @@ def triggers_plugin( tcfg = self.trigger_get(chan.data.chan) trgs.append((chan.data.chan, tcfg)) - ret = [] + visible_ids = {chan.data.chan for chan in chanlist} + resolved: list[tuple[int, DTriggerConfig]] = [] + hidden_handlers: list[TriggerHandler] = [] for item in trgs: - # get trigger configuration dtc = trigger_from_req(item[1]) - # get trigger - trig = TriggerHandler(item[0], dtc) - ret.append(trig) + source_ref = self._trigger_source_ref(item[1]) + if source_ref is not None: + src_channel = self._trigger_source_channel(source_ref) + if src_channel.data.chan == item[0]: + dtc.source_ref = None + dtc.srcchan = None + else: + dtc.source_ref = source_ref + dtc.srcchan = src_channel.data.chan + if dtc.srcchan not in visible_ids: + hidden = self._hidden_trigger_source( + dtc.srcchan, source_ref + ) + hidden_handlers.append(hidden) + + resolved.append((item[0], dtc)) + ret = [] + for chan_id, dtc in resolved: + ret.append(TriggerHandler(chan_id, dtc)) return ret + + def _trigger_source_ref(self, req: DTriggerConfigReq) -> ChannelRef | None: + source = req.srcchan + if isinstance(source, ChannelRef): + return source + if isinstance(source, int): + return ChannelRef.physical(source) + return None + + def _trigger_source_channel( + self, source_ref: ChannelRef + ) -> "DeviceChannel": + src_channel = self.channel_get(source_ref) + if src_channel is None or not src_channel.data.is_valid: + raise ValueError(f"invalid trigger source channel: {source_ref}") + return src_channel + + def _hidden_trigger_source( + self, chan: int, source_ref: ChannelRef + ) -> TriggerHandler: + hidden = TriggerHandler.find_by_channel(chan) + if hidden is None: + hidden = TriggerHandler( + chan, + DTriggerConfig( + ETriggerType.ALWAYS_OFF, + source_ref=source_ref, + ), + ) + return hidden diff --git a/src/nxscli/plugins/devinfo.py b/src/nxscli/plugins/devinfo.py index 69fcdce..21dcbef 100644 --- a/src/nxscli/plugins/devinfo.py +++ b/src/nxscli/plugins/devinfo.py @@ -1,8 +1,12 @@ """Module containing devinfo plugin.""" -import pprint from typing import Any +from nxslib.dev import EDeviceChannelType +from rich import box +from rich.console import Console +from rich.table import Table + from nxscli.iplugin import IPluginText ############################################################################### @@ -13,6 +17,90 @@ class PluginDevinfo(IPluginText): """Plugin that shows device information.""" + @staticmethod + def _get_bool(data: dict[str, Any], key: str) -> bool: + """Read a boolean-like field with a False default.""" + return bool(data.get(key, False)) + + @staticmethod + def _get_int(data: dict[str, Any], key: str) -> int: + """Read an integer-like field with a zero default.""" + return int(data.get(key, 0)) + + @staticmethod + def _get_float(data: dict[str, Any], key: str) -> float: + """Read a float-like field with a zero default.""" + return float(data.get(key, 0.0)) + + @staticmethod + def _format_bool(value: bool) -> str: + """Return a short human-readable boolean.""" + return "yes" if value else "no" + + @staticmethod + def _format_enabled(enabled: tuple[int, ...]) -> str: + """Format enabled channel IDs as compact ranges.""" + if not enabled: + return "none" + + ranges: list[str] = [] + start = enabled[0] + end = enabled[0] + + for item in enabled[1:]: + if item == end + 1: + end = item + continue + + if start == end: + ranges.append(str(start)) + else: + ranges.append(f"{start}-{end}") + + start = item + end = item + + if start == end: + ranges.append(str(start)) + else: + ranges.append(f"{start}-{end}") + + return ", ".join(ranges) + + @staticmethod + def _render_channels_table(channels: list[dict[str, Any]]) -> str: + """Render channels as a readable text table.""" + table = Table(box=box.ASCII_DOUBLE_HEAD, expand=False) + + table.add_column("ID", justify="right", no_wrap=True) + table.add_column("Name", overflow="fold") + table.add_column("Type", no_wrap=True) + table.add_column("Dim", justify="right", no_wrap=True) + table.add_column("Valid", no_wrap=True) + table.add_column("En", no_wrap=True) + table.add_column("Div", justify="right", no_wrap=True) + + for chan in channels: + table.add_row( + str(chan["chan"]), + chan["name"] if chan["name"] else "-", + chan["dtype_text"], + str(chan["vdim"]), + PluginDevinfo._format_bool(chan["valid"]), + PluginDevinfo._format_bool(chan["enabled"]), + str(chan["divider"]), + ) + + console = Console( + force_terminal=False, + color_system=None, + width=100, + ) + with console.capture() as capture: + console.print(table) + + return capture.get() + def __init__(self) -> None: """Initialize devinfo plugin.""" super().__init__() @@ -55,9 +143,12 @@ def start(self, _: Any) -> bool: chan: Any = {} chan["chan"] = chinfo.data.chan chan["type"] = chinfo.data._type + chan["dtype"] = chinfo.data.dtype + chan["dtype_text"] = EDeviceChannelType.to_text(chinfo.data.dtype) chan["vdim"] = chinfo.data.vdim chan["name"] = chinfo.data.name chan["enabled"] = chinfo.data.en + chan["valid"] = chinfo.data.is_valid chan["divider"] = self._phandler.get_channel_divider(chid) tmp.append(chan) @@ -71,15 +162,46 @@ def start(self, _: Any) -> bool: def result(self) -> str: """Get devinfo plugin result.""" assert self._return - s = "\nDevice common:\n" - s += pprint.pformat(self._return["cmn"]) - s += "\nStream stats:\n" - s += pprint.pformat(self._return["stream"]) - s += "\nChannels state (applied):\n" - s += pprint.pformat(self._return["channels_state_applied"]) - s += "\nChannels state (buffered):\n" - s += pprint.pformat(self._return["channels_state_buffered"]) - s += "\nDevice channels:\n" - s += pprint.pformat(self._return["channels"]) - s += "\n" - return s + cmn = self._return["cmn"] + stream = self._return["stream"] + applied = self._return["channels_state_applied"] + buffered = self._return["channels_state_buffered"] + channels = self._return["channels"] + + div_supported = self._get_bool(cmn, "div_supported") + ack_supported = self._get_bool(cmn, "ack_supported") + flags = self._get_int(cmn, "flags") + rxpadding = self._get_int(cmn, "rxpadding") + connected = self._get_bool(stream, "connected") + stream_started = self._get_bool(stream, "stream_started") + overflow_count = self._get_int(stream, "overflow_count") + bitrate = self._get_float(stream, "bitrate") + + lines = [ + "", + "Device Summary", + f" Channels: {cmn['chmax']}", + f" Divider support: {self._format_bool(div_supported)}", + f" Ack support: {self._format_bool(ack_supported)}", + f" Flags: 0x{flags:02x}", + f" RX padding: {rxpadding}", + "", + "Stream", + f" Connected: {self._format_bool(connected)}", + f" Started: {self._format_bool(stream_started)}", + f" Overflow count: {overflow_count}", + f" Bitrate: {bitrate:.1f} B/s", + "", + "Channel State", + " Applied enabled: " + f"{self._format_enabled(applied['enabled_channels'])}", + " Buffered enabled: " + f"{self._format_enabled(buffered['enabled_channels'])}", + "", + "Channels", + ] + + lines.append(self._render_channels_table(channels).rstrip("\n")) + + lines.append("") + return "\n".join(lines) diff --git a/src/nxscli/trigger/__init__.py b/src/nxscli/trigger/__init__.py new file mode 100644 index 0000000..bd8d36d --- /dev/null +++ b/src/nxscli/trigger/__init__.py @@ -0,0 +1,23 @@ +"""Public trigger API.""" + +from nxscli.trigger.core import ( + DTriggerConfig, + DTriggerConfigReq, + DTriggerEvent, + DTriggerState, + ETriggerCaptureMode, + ETriggerType, + TriggerHandler, + trigger_from_req, +) + +__all__ = [ + "DTriggerConfig", + "DTriggerConfigReq", + "DTriggerEvent", + "DTriggerState", + "ETriggerCaptureMode", + "ETriggerType", + "TriggerHandler", + "trigger_from_req", +] diff --git a/src/nxscli/trigger.py b/src/nxscli/trigger/core.py similarity index 57% rename from src/nxscli/trigger.py rename to src/nxscli/trigger/core.py index 5e52b64..2228586 100644 --- a/src/nxscli/trigger.py +++ b/src/nxscli/trigger/core.py @@ -5,13 +5,16 @@ from dataclasses import dataclass from enum import Enum from threading import Lock -from typing import Any +from typing import TYPE_CHECKING, Any import numpy as np from nxslib.nxscope import DNxscopeStreamBlock from nxscli.logger import logger +if TYPE_CHECKING: + from nxscli.channelref import ChannelRef + ############################################################################### # Enum: ETriggerType ############################################################################### @@ -24,6 +27,15 @@ class ETriggerType(Enum): ALWAYS_ON = 1 EDGE_RISING = 2 EDGE_FALLING = 3 + WINDOW_ENTER = 4 + WINDOW_EXIT = 5 + + +class ETriggerCaptureMode(Enum): + """Trigger capture behavior.""" + + START_AFTER = "start_after" + STOP_AFTER = "stop_after" ############################################################################### @@ -39,6 +51,15 @@ class DTriggerState: idx: int +@dataclass +class DTriggerEvent: + """Metadata about one emitted trigger event.""" + + sample_index: float + channel: int + capture_mode: ETriggerCaptureMode + + ############################################################################### # Class: DDTriggerConfigReq ############################################################################### @@ -49,9 +70,14 @@ class DTriggerConfigReq: """The class representing trigger configuration request.""" ttype: str - srcchan: int | None + srcchan: "int | ChannelRef | None" vect: int = 0 params: list[Any] | None = None + mode: str | None = None + pre_samples: int | None = None + post_samples: int | None = None + holdoff: int = 0 + rearm: bool = False ############################################################################### @@ -64,12 +90,20 @@ def trigger_from_req(req: DTriggerConfigReq) -> "DTriggerConfig": :param req: trigger configuration request """ + capture_mode = ETriggerCaptureMode.START_AFTER + if req.mode is not None: + capture_mode = ETriggerCaptureMode(req.mode) + + srcchan = req.srcchan if isinstance(req.srcchan, int) else None + if req.ttype == "off": # no arguments - dtc = DTriggerConfig(ETriggerType.ALWAYS_OFF) + dtc = DTriggerConfig( + ETriggerType.ALWAYS_OFF, capture_mode=capture_mode + ) elif req.ttype == "on": # no arguments - dtc = DTriggerConfig(ETriggerType.ALWAYS_ON) + dtc = DTriggerConfig(ETriggerType.ALWAYS_ON, capture_mode=capture_mode) elif req.ttype == "er": # argument 1 horisontal offset # argument 2 trigger level @@ -77,7 +111,12 @@ def trigger_from_req(req: DTriggerConfigReq) -> "DTriggerConfig": hoffset = int(req.params[0]) level = float(req.params[1]) dtc = DTriggerConfig( - ETriggerType.EDGE_RISING, req.srcchan, req.vect, hoffset, level + ETriggerType.EDGE_RISING, + srcchan, + req.vect, + hoffset, + level, + capture_mode=capture_mode, ) elif req.ttype == "ef": # argument 1 horisontal offset @@ -86,10 +125,50 @@ def trigger_from_req(req: DTriggerConfigReq) -> "DTriggerConfig": hoffset = int(req.params[0]) level = float(req.params[1]) dtc = DTriggerConfig( - ETriggerType.EDGE_FALLING, req.srcchan, req.vect, hoffset, level + ETriggerType.EDGE_FALLING, + srcchan, + req.vect, + hoffset, + level, + capture_mode=capture_mode, + ) + elif req.ttype == "we": + assert req.params + hoffset = int(req.params[0]) + low = float(req.params[1]) + high = float(req.params[2]) + dtc = DTriggerConfig( + ETriggerType.WINDOW_ENTER, + srcchan, + req.vect, + hoffset, + window_low=low, + window_high=high, + capture_mode=capture_mode, + ) + elif req.ttype == "wx": + assert req.params + hoffset = int(req.params[0]) + low = float(req.params[1]) + high = float(req.params[2]) + dtc = DTriggerConfig( + ETriggerType.WINDOW_EXIT, + srcchan, + req.vect, + hoffset, + window_low=low, + window_high=high, + capture_mode=capture_mode, ) else: raise AssertionError + + if req.pre_samples is not None: + dtc.pre_samples = req.pre_samples + if req.post_samples is not None: + dtc.post_samples = req.post_samples + dtc.holdoff = req.holdoff + dtc.rearm = req.rearm return dtc @@ -107,6 +186,21 @@ class DTriggerConfig: vect: int = 0 hoffset: int = 0 level: float | None = None + source_ref: "ChannelRef | None" = None + window_low: float | None = None + window_high: float | None = None + capture_mode: ETriggerCaptureMode = ETriggerCaptureMode.START_AFTER + pre_samples: int = 0 + post_samples: int = 0 + holdoff: int = 0 + rearm: bool = False + + @property + def effective_pre_samples(self) -> int: + """Return explicit pre-samples or fall back to legacy hoffset.""" + if self.pre_samples > 0: + return self.pre_samples + return self.hoffset ############################################################################### @@ -135,6 +229,8 @@ def __init__(self, chan: int, config: DTriggerConfig) -> None: self._chan: int = chan self._trigger: DTriggerState = DTriggerState(False, 0) self._triger_done = False + self._post_remaining = 0 + self._last_event: DTriggerEvent | None = None # trigger source channel reference self._src: "TriggerHandler" | None = None # noqa: TC010 @@ -242,7 +338,7 @@ def _edgerising( return DTriggerState(False, 0) hits = np.flatnonzero((vec[:-1] <= level) & (vec[1:] > level)) if hits.size > 0: - return DTriggerState(True, int(hits[0])) + return DTriggerState(True, int(hits[0] + 1)) return DTriggerState(False, 0) def _edgefalling( @@ -253,7 +349,31 @@ def _edgefalling( return DTriggerState(False, 0) hits = np.flatnonzero((vec[:-1] >= level) & (vec[1:] < level)) if hits.size > 0: - return DTriggerState(True, int(hits[0])) + return DTriggerState(True, int(hits[0] + 1)) + return DTriggerState(False, 0) + + def _windowenter( + self, combined: list[Any], vect: int, low: float, high: float + ) -> DTriggerState: + vec = self._combined_vector_np(combined, vect) + if vec.size < 2: + return DTriggerState(False, 0) + inside = (vec >= low) & (vec <= high) + hits = np.flatnonzero((~inside[:-1]) & inside[1:]) + if hits.size > 0: + return DTriggerState(True, int(hits[0] + 1)) + return DTriggerState(False, 0) + + def _windowexit( + self, combined: list[Any], vect: int, low: float, high: float + ) -> DTriggerState: + vec = self._combined_vector_np(combined, vect) + if vec.size < 2: + return DTriggerState(False, 0) + inside = (vec >= low) & (vec <= high) + hits = np.flatnonzero(inside[:-1] & (~inside[1:])) + if hits.size > 0: + return DTriggerState(True, int(hits[0] + 1)) return DTriggerState(False, 0) def _is_self_trigger( @@ -269,6 +389,24 @@ def _is_self_trigger( elif config.ttype is ETriggerType.EDGE_FALLING: assert config.level is not None return self._edgefalling(combined, config.vect, config.level) + elif config.ttype is ETriggerType.WINDOW_ENTER: + assert config.window_low is not None + assert config.window_high is not None + return self._windowenter( + combined, + config.vect, + config.window_low, + config.window_high, + ) + elif config.ttype is ETriggerType.WINDOW_EXIT: + assert config.window_low is not None + assert config.window_high is not None + return self._windowexit( + combined, + config.vect, + config.window_low, + config.window_high, + ) else: raise AssertionError @@ -323,6 +461,20 @@ def config(self) -> DTriggerConfig: """Get trigger configuration.""" return self._config + def pop_trigger_event(self) -> DTriggerEvent | None: + """Return and clear the last emitted trigger event.""" + event = self._last_event + self._last_event = None + return event + + @classmethod + def find_by_channel(cls, chan: int) -> "TriggerHandler | None": + """Return registered trigger handler for one channel if present.""" + for inst in cls._instances: + if inst.chan == chan: + return inst + return None + @classmethod def cls_cleanup(cls: type["TriggerHandler"]) -> None: """Clean up all instances.""" @@ -402,44 +554,157 @@ def _cache_tail(self, combined: list[Any], hoffset: int) -> list[Any]: start = max(total - hoffset, 0) return self._slice_from(combined, start) - def data_triggered(self, data: list[Any]) -> list[Any]: - """Get triggered data. + def _sample_count(self, combined: list[Any]) -> int: + """Return number of samples in combined payload.""" + if not combined: + return 0 + if not self._is_block_payload(combined): + return len(combined) + return sum(int(block.data.shape[0]) for block in combined) + + def _should_emit_event(self) -> bool: + """Return ``True`` only for real trigger detections.""" + return self._config.ttype not in ( + ETriggerType.ALWAYS_ON, + ETriggerType.ALWAYS_OFF, + ) - :param data: stream data - """ + def _slice_range( + self, combined: list[Any], start: int, stop: int + ) -> list[Any]: + """Slice combined payload by sample range.""" + if stop <= start: + return [] + if not combined: + return [] + if not self._is_block_payload(combined): + return combined[start:stop] + + ret: list[Any] = [] + offset = 0 + for block in combined: + rows = int(block.data.shape[0]) + block_start = max(start - offset, 0) + block_stop = min(stop - offset, rows) + if block_stop > block_start: + data = block.data[block_start:block_stop, :] + meta = ( + None + if block.meta is None + else block.meta[block_start:block_stop, :] + ) + ret.append(DNxscopeStreamBlock(data=data, meta=meta)) + offset += rows + if offset >= stop: + break + return ret + + def _tail_samples(self, combined: list[Any], count: int) -> list[Any]: + """Keep only the last N samples for boundary detection.""" + if count <= 0: + return [] + total = self._sample_count(combined) + start = max(total - count, 0) + return self._slice_range(combined, start, total) + + def _data_triggered_start_after(self, data: list[Any]) -> list[Any]: + """Legacy start-after-trigger behavior with pre-trigger history.""" combined = self._cache + data + self._last_event = None self._trigger = self._is_triggered(combined) # check all cross-channel triggers self._cross_channel_handle(combined) + pre_samples = self._config.effective_pre_samples + if not self._trigger.state: - # not triggered yet ret = [] - # update cache if self._is_block_payload(combined): - if self._config.hoffset <= 0: - # keep only current block batch when no history is needed + if pre_samples <= 0: self._cache = data else: - self._cache = self._cache_tail( - combined, self._config.hoffset - ) + self._cache = self._cache_tail(combined, pre_samples) else: clen = len(self._cache) - self._cache = combined[clen - self._config.hoffset :] + self._cache = combined[clen - pre_samples :] else: - # one time hoffset for trigger + first_trigger = not self._triger_done if not self._triger_done: - hoffset = self._config.hoffset + hoffset = pre_samples self._triger_done = True else: hoffset = 0 - # return data with a configured horisontal offset - ret = self._slice_from(combined, self._trigger.idx - hoffset) - # reset cache + start_idx = self._trigger.idx - hoffset + ret = self._slice_from(combined, start_idx) + trigger_sample = self._trigger.idx - max(start_idx, 0) + if first_trigger and self._should_emit_event(): + self._last_event = DTriggerEvent( + sample_index=float(trigger_sample), + channel=self._chan, + capture_mode=self._config.capture_mode, + ) self._cache = [] return ret + + def _data_triggered_stop_after(self, data: list[Any]) -> list[Any]: + """Stream immediately, then stop after trigger plus tail samples.""" + self._last_event = None + if not self._should_emit_event(): + return data + if not data and self._post_remaining <= 0: + return [] + + if not self._is_block_payload(data): + raise NotImplementedError( + "stop_after capture mode requires NumPy block stream payloads" + ) + + if self._trigger.state: + total = self._sample_count(data) + if self._post_remaining <= 0: + return [] + returned = self._slice_range( + data, 0, min(total, self._post_remaining) + ) + self._post_remaining -= min(total, self._post_remaining) + return returned + + combined = self._cache + data + cache_rows = self._sample_count(self._cache) + + self._trigger = self._is_triggered(combined) + self._cross_channel_handle(combined) + + if not self._trigger.state: + self._cache = self._tail_samples(combined, 1) + return data + + total = self._sample_count(combined) + stop_at = self._trigger.idx + self._config.post_samples + 1 + returned = self._slice_range(combined, cache_rows, min(total, stop_at)) + self._last_event = DTriggerEvent( + sample_index=max( + self._trigger.idx - cache_rows, + 0.0, + ), + channel=self._chan, + capture_mode=self._config.capture_mode, + ) + self._post_remaining = max(stop_at - total, 0) + self._cache = [] + return returned + + def data_triggered(self, data: list[Any]) -> list[Any]: + """Get triggered data. + + :param data: stream data + """ + if self._config.capture_mode is ETriggerCaptureMode.START_AFTER: + return self._data_triggered_start_after(data) + if self._config.capture_mode is ETriggerCaptureMode.STOP_AFTER: + return self._data_triggered_stop_after(data) + raise AssertionError diff --git a/tests/cli/test_main.py b/tests/cli/test_main.py index 501d3e8..f87b605 100644 --- a/tests/cli/test_main.py +++ b/tests/cli/test_main.py @@ -1,9 +1,11 @@ import socket +from importlib.metadata import PackageNotFoundError import pytest # type: ignore from click.testing import CliRunner import nxscli +import nxscli.commands.cmd_version from nxscli.cli.main import main from tests.fake_nxscope import FakeNxscope @@ -84,6 +86,86 @@ def cleanup(self): assert cleanup_called["flag"] is True +def test_main_version_builtin(runner): + result = runner.invoke(main, ["version"]) + assert result.exit_code == 0 + assert "nxscli:" in result.output + assert "nxslib:" in result.output + assert "plugins:" in result.output + assert "- nxscli:" not in result.output + + +def test_main_version_external_plugins(runner, monkeypatch): + class FakeEntryPoint: + def __init__(self, group): + self.group = group + + class FakeDist: + def __init__(self, name, version, entry_points): + self.metadata = {"Name": name} + self.version = version + self.entry_points = entry_points + + monkeypatch.setattr( + nxscli.commands.cmd_version, + "distributions", + lambda: [ + FakeDist( + "nxscli-mpl", "2.0.0", [FakeEntryPoint("nxscli.extensions")] + ), + FakeDist( + "irrelevant", + "9.9.9", + [FakeEntryPoint("other.group")], + ), + FakeDist( + "nxscli-pqg", "3.1.4", [FakeEntryPoint("nxscli.extensions")] + ), + ], + ) + + result = runner.invoke(main, ["version"]) + assert result.exit_code == 0 + assert "- nxscli-mpl: 2.0.0" in result.output + assert "- nxscli-pqg: 3.1.4" in result.output + assert "- irrelevant:" not in result.output + + +def test_main_version_missing_package_and_skip_builtin(runner, monkeypatch): + class FakeEntryPoint: + def __init__(self, group): + self.group = group + + class FakeDist: + def __init__(self, name, version, entry_points): + self.metadata = {"Name": name} + self.version = version + self.entry_points = entry_points + + def fake_version(name): + if name == "nxslib": + raise PackageNotFoundError + return "1.0.0" + + monkeypatch.setattr(nxscli.commands.cmd_version, "version", fake_version) + monkeypatch.setattr( + nxscli.commands.cmd_version, + "distributions", + lambda: [ + FakeDist("nxscli", "1.0.0", [FakeEntryPoint("nxscli.extensions")]), + FakeDist( + "nxscli-mpl", "2.0.0", [FakeEntryPoint("nxscli.extensions")] + ), + ], + ) + + result = runner.invoke(main, ["version"]) + assert result.exit_code == 0 + assert "nxslib: not installed" in result.output + assert "- nxscli:" not in result.output + assert "- nxscli-mpl: 2.0.0" in result.output + + def test_main_pdevinfo(runner): args = ["dummy", "pdevinfo"] result = runner.invoke(main, args) diff --git a/tests/cli/test_types.py b/tests/cli/test_types.py index 6dd4240..b77e271 100644 --- a/tests/cli/test_types.py +++ b/tests/cli/test_types.py @@ -63,6 +63,48 @@ def test_trigger(): assert t.convert("0:off;1:er@0#1,1,2", None, None) assert t.convert("0:off;1:er#1@1,1,2", None, None) + parsed = t.convert( + "g:er#17,0,0.5,mode=stop_after,post=64,holdoff=8,rearm=true", + None, + None, + ) + req = parsed[-1] + assert req.srcchan == 17 + assert req.mode == "stop_after" + assert req.post_samples == 64 + assert req.holdoff == 8 + assert req.rearm is True + + parsed = t.convert("1:er@1,0,0.5,pre=32", None, None) + req = parsed[1] + assert req.vect == 1 + assert req.pre_samples == 32 + + parsed = t.convert("1:we,0,-0.25,0.25", None, None) + assert parsed[1].ttype == "we" + assert parsed[1].params == ["0", "-0.25", "0.25"] + + parsed = t.convert("1:wx#v0,0,-0.5,0.5", None, None) + assert parsed[1].ttype == "wx" + assert parsed[1].srcchan is not None + assert parsed[1].srcchan.virtual_name() == "v0" + + parsed = t.convert("1:er#v0,0,0.5", None, None) + assert parsed[1].srcchan is not None + assert parsed[1].srcchan.virtual_name() == "v0" + + parsed = t.convert("1:er,0,0.5,rearm=false", None, None) + assert parsed[1].rearm is False + + with pytest.raises(click.BadParameter): + t.convert("1:er,0,0.5,rearm=maybe", None, None) + + with pytest.raises(click.BadParameter): + t.convert("1:er,0,0.5,unknown=1", None, None) + + with pytest.raises(click.BadParameter): + t.convert("1:er#vbad,0,0.5", None, None) + def test_stringlist(): s = StringList() diff --git a/tests/plugins/test_devinfo.py b/tests/plugins/test_devinfo.py index 9bf7988..8b17ff2 100644 --- a/tests/plugins/test_devinfo.py +++ b/tests/plugins/test_devinfo.py @@ -11,8 +11,46 @@ def test_plugindevinfo_init(): plugin = PluginDevinfo() assert plugin.stream is False + assert plugin.data_wait() is True - # TODO: + +def test_plugindevinfo_helpers(): + assert PluginDevinfo._get_bool({}, "missing") is False + assert PluginDevinfo._get_bool({"flag": 1}, "flag") is True + + assert PluginDevinfo._get_int({}, "missing") == 0 + assert PluginDevinfo._get_int({"value": "7"}, "value") == 7 + + assert PluginDevinfo._get_float({}, "missing") == 0.0 + assert PluginDevinfo._get_float({"value": "1.5"}, "value") == 1.5 + + assert PluginDevinfo._format_bool(True) == "yes" + assert PluginDevinfo._format_bool(False) == "no" + + assert PluginDevinfo._format_enabled(()) == "none" + assert PluginDevinfo._format_enabled((3,)) == "3" + assert PluginDevinfo._format_enabled((1, 2, 3, 5, 7, 8)) == "1-3, 5, 7-8" + + +def test_plugindevinfo_table_render(): + out = PluginDevinfo._render_channels_table( + [ + { + "chan": 10, + "name": "", + "dtype_text": "UNDEF", + "vdim": 0, + "valid": False, + "enabled": False, + "divider": 0, + } + ] + ) + + assert "Valid" in out + assert "| 10 |" in out + assert "UNDEF" in out + assert "| no" in out def test_plugindevinfo_content(): @@ -30,8 +68,11 @@ def test_plugindevinfo_content(): assert plugin.start({}) is True out = plugin.result() - assert "Device common" in out - assert "Channels state (applied)" in out - assert "Channels state (buffered)" in out - assert "stream_started" in out - assert "bitrate" in out + assert "Device Summary" in out + assert "Channel State" in out + assert "Applied enabled: none" in out + assert "Bitrate: 0.0 B/s" in out + assert "noise_uniform_scalar" in out + assert "FLOAT" in out + assert "| 10 |" in out + assert "UNDEF" in out diff --git a/tests/test_idata.py b/tests/test_idata.py index 65df0f3..2732817 100644 --- a/tests/test_idata.py +++ b/tests/test_idata.py @@ -1,16 +1,14 @@ import queue -from typing import TYPE_CHECKING import numpy as np import pytest # type: ignore from nxslib.dev import DeviceChannel +from nxslib.nxscope import DNxscopeStreamBlock +from nxscli.channelref import ChannelRef from nxscli.idata import PluginData, PluginDataCb, PluginQueueData from nxscli.trigger import DTriggerConfig, ETriggerType, TriggerHandler -if TYPE_CHECKING: - from nxscli.channelref import ChannelRef - g_queue: queue.Queue[list] = queue.Queue() @@ -84,6 +82,34 @@ def __init__(self) -> None: TriggerHandler.cls_cleanup() +def test_pluginqueuedata_exposes_trigger_event_metadata() -> None: + q = queue.Queue() + q.put( + [ + DNxscopeStreamBlock( + data=np.asarray([0.0, 0.0, 1.0, 2.0], dtype=float).reshape( + -1, 1 + ), + meta=None, + ) + ] + ) + chan = DeviceChannel(7, 2, 1, "chan7") + dtc = DTriggerConfig(ETriggerType.EDGE_RISING, hoffset=1, level=0.5) + trig = TriggerHandler(7, dtc) + qdata = PluginQueueData(q, chan, trig) + + ret = qdata.queue_get(block=False) + event = qdata.pop_trigger_event() + + assert len(ret) == 1 + assert event is not None + assert event.channel == 7 + assert event.sample_index == 1.0 + assert qdata.pop_trigger_event() is None + TriggerHandler.cls_cleanup() + + def test_nxsclipdata_init(): channels = [DeviceChannel(0, 1, 2, "chan0")] dtc = DTriggerConfig(ETriggerType.ALWAYS_OFF) @@ -165,3 +191,105 @@ def stream_sub(channel: "ChannelRef"): # noqa: ANN001 pdata._queue_deinit() TriggerHandler.cls_cleanup() + + +def test_nxsclipdata_drains_hidden_virtual_trigger_source() -> None: + queues: dict[str, queue.Queue[list]] = { + "0": queue.Queue(), + "v0": queue.Queue(), + } + + def stream_sub(channel: "ChannelRef"): # noqa: ANN001 + if channel.is_virtual: + return queues[channel.virtual_name()] + return queues[str(channel.physical_id())] + + def stream_unsub(_q): # noqa: ANN001 + return + + target = DeviceChannel(0, 2, 1, "chan0") + _ = TriggerHandler( + -2, + DTriggerConfig( + ETriggerType.ALWAYS_OFF, + source_ref=ChannelRef.virtual(0), + ), + ) + trig = TriggerHandler( + 0, + DTriggerConfig( + ETriggerType.EDGE_RISING, + srcchan=-2, + level=0.5, + source_ref=ChannelRef.virtual(0), + ), + ) + cb = PluginDataCb(stream_sub, stream_unsub) + pdata = PluginData([target], [trig], cb) + + queues["v0"].put( + [ + DNxscopeStreamBlock( + data=np.asarray([0.0, 1.0, 1.0], dtype=float).reshape(-1, 1), + meta=None, + ) + ] + ) + queues["0"].put( + [ + DNxscopeStreamBlock( + data=np.asarray([10.0, 11.0, 12.0], dtype=float).reshape( + -1, 1 + ), + meta=None, + ) + ] + ) + + ret = pdata.qdlist[0].queue_get(block=False) + + assert len(pdata._aux_qd) == 1 + assert len(ret) == 1 + assert ret[0].data.shape[0] > 0 + pdata._queue_deinit() + TriggerHandler.cls_cleanup() + + +def test_nxsclipdata_missing_hidden_trigger_raises() -> None: + def stream_sub(channel: "ChannelRef"): # noqa: ANN001 + return queue.Queue() + + def stream_unsub(_q): # noqa: ANN001 + return + + target = DeviceChannel(0, 2, 1, "chan0") + trig = TriggerHandler( + 0, + DTriggerConfig( + ETriggerType.EDGE_RISING, + srcchan=-2, + level=0.5, + source_ref=ChannelRef.virtual(0), + ), + ) + cb = PluginDataCb(stream_sub, stream_unsub) + + with pytest.raises(AssertionError): + _ = PluginData([target], [trig], cb) + + stream_unsub(queue.Queue()) + TriggerHandler.cls_cleanup() + + +def test_nxsclipdata_find_trigger_prefers_visible_list() -> None: + q = queue.Queue() + chan = DeviceChannel(0, 2, 1, "chan0") + trig = TriggerHandler(0, DTriggerConfig(ETriggerType.ALWAYS_ON)) + qdata = PluginData( + [chan], [trig], PluginDataCb(lambda _ch: q, lambda _q: None) + ) + + assert qdata._find_trigger(0) is trig + + qdata._queue_deinit() + TriggerHandler.cls_cleanup() diff --git a/tests/test_phandler.py b/tests/test_phandler.py index 3997305..7054d54 100644 --- a/tests/test_phandler.py +++ b/tests/test_phandler.py @@ -17,7 +17,7 @@ ) from nxscli.phandler import PluginHandler from nxscli.plugins.none import PluginNone -from nxscli.trigger import DTriggerConfigReq +from nxscli.trigger import DTriggerConfigReq, TriggerHandler from tests.fake_nxscope import FakeNxscope @@ -818,6 +818,99 @@ def test_phandler_chanlist_plugin_virtual_multi_refs(nxscope) -> None: assert any(ch.data.chan == -2 for ch in chanlist) +def test_phandler_triggers_plugin_resolves_virtual_source(nxscope) -> None: + with PluginHandler() as p: + p.nxscope_connect(nxscope) + + provider = _MockProvider() + provider.channels["v0"] = DeviceChannel(-2, 10, 1, "v0") + p.stream_provider_add(provider) + + chan = p.channel_get(ChannelRef.physical(0)) + assert chan is not None + + trig = p.triggers_plugin( + [chan], + { + 0: DTriggerConfigReq( + "er", ChannelRef.virtual(0), 0, ["0", "0.5"] + ) + }, + ) + + assert trig[0].config.srcchan == -2 + assert trig[0].config.source_ref is not None + assert trig[0].config.source_ref.virtual_name() == "v0" + assert TriggerHandler.find_by_channel(-2) is not None + TriggerHandler.cls_cleanup() + + +def test_phandler_triggers_plugin_keeps_visible_source_inline( + nxscope, +) -> None: + with PluginHandler() as p: + p.nxscope_connect(nxscope) + + chans = [ + p.channel_get(ChannelRef.physical(0)), + p.channel_get(ChannelRef.physical(1)), + ] + assert chans[0] is not None + assert chans[1] is not None + + trig = p.triggers_plugin( + chans, {1: DTriggerConfigReq("er", 0, 0, ["0", "0.5"])} + ) + + assert trig[1].config.srcchan == 0 + assert trig[1].config.source_ref is not None + assert trig[1].config.source_ref.physical_id() == 0 + TriggerHandler.cls_cleanup() + + +def test_phandler_triggers_plugin_ignores_same_channel_explicit_source( + nxscope, +) -> None: + with PluginHandler() as p: + p.nxscope_connect(nxscope) + + chan = p.channel_get(ChannelRef.physical(0)) + assert chan is not None + + trig = p.triggers_plugin( + [chan], + {0: DTriggerConfigReq("er", 0, 0, ["0", "0.5"])}, + ) + + assert trig[0].config.srcchan is None + assert trig[0].config.source_ref is None + TriggerHandler.cls_cleanup() + + +def test_phandler_triggers_plugin_rejects_invalid_source(nxscope) -> None: + with PluginHandler() as p: + p.nxscope_connect(nxscope) + chan = p.channel_get(ChannelRef.physical(0)) + assert chan is not None + + with pytest.raises(ValueError): + p.triggers_plugin( + [chan], {0: DTriggerConfigReq("er", 99, 0, ["0", "0.5"])} + ) + + TriggerHandler.cls_cleanup() + + +def test_phandler_hidden_trigger_source_reuses_existing(nxscope) -> None: + with PluginHandler() as p: + p.nxscope_connect(nxscope) + hidden = p._hidden_trigger_source(9, ChannelRef.physical(0)) + reused = p._hidden_trigger_source(9, ChannelRef.physical(0)) + + assert reused is hidden + TriggerHandler.cls_cleanup() + + def test_mock_provider_non_virtual_paths() -> None: provider = _MockProvider() assert provider.channel_get(ChannelRef.physical(0)) is None diff --git a/tests/test_trigger.py b/tests/test_trigger.py index 838342c..895c8bf 100644 --- a/tests/test_trigger.py +++ b/tests/test_trigger.py @@ -7,6 +7,7 @@ from nxscli.trigger import ( DTriggerConfig, DTriggerConfigReq, + ETriggerCaptureMode, ETriggerType, TriggerHandler, trigger_from_req, @@ -51,6 +52,38 @@ def test_triggerfromstr(): assert x.hoffset == 200 assert x.level == 20 + req = DTriggerConfigReq("we", None, params=["12", "-0.5", "0.5"]) + x = trigger_from_req(req) + assert x.ttype == ETriggerType.WINDOW_ENTER + assert x.hoffset == 12 + assert x.window_low == -0.5 + assert x.window_high == 0.5 + + req = DTriggerConfigReq("wx", None, params=["7", "-1.0", "1.0"]) + x = trigger_from_req(req) + assert x.ttype == ETriggerType.WINDOW_EXIT + assert x.hoffset == 7 + assert x.window_low == -1.0 + assert x.window_high == 1.0 + + req = DTriggerConfigReq( + "er", + None, + params=["5", "1.5"], + mode="stop_after", + pre_samples=3, + post_samples=7, + holdoff=2, + rearm=True, + ) + x = trigger_from_req(req) + assert x.capture_mode is ETriggerCaptureMode.STOP_AFTER + assert x.pre_samples == 3 + assert x.post_samples == 7 + assert x.holdoff == 2 + assert x.rearm is True + assert x.effective_pre_samples == 3 + # initialization logic def test_triggerhandle_init(): @@ -241,6 +274,24 @@ def test_triggerhandle_alwaysoff(): TriggerHandler.cls_cleanup() +def test_triggerhandle_alwayson_does_not_emit_event(): + with global_lock: + trig = TriggerHandler(0, DTriggerConfig(ETriggerType.ALWAYS_ON)) + data = [ + DNxscopeStreamBlock( + data=np.array([[1.0], [2.0], [3.0]]), + meta=np.array([[0], [0], [0]]), + ) + ] + + ret = trig.data_triggered(data) + + assert ret == data + assert trig.pop_trigger_event() is None + + TriggerHandler.cls_cleanup() + + def test_triggerhandle_alwayson(): with global_lock: assert len(TriggerHandler._instances) == 0 @@ -322,7 +373,6 @@ def test_triggerhandle_edgerising1(): ] dout = th.data_triggered(din) assert dout == [ - DNxscopeStream((0,), ()), DNxscopeStream((1,), ()), DNxscopeStream((2,), ()), ] @@ -431,7 +481,6 @@ def test_triggerhandle_edgerising2(): ] dout = th.data_triggered(din) assert dout == [ - DNxscopeStream((5,), ()), DNxscopeStream((6,), ()), DNxscopeStream((7,), ()), ] @@ -523,7 +572,7 @@ def test_triggerhandle_edgefalling1(): DNxscopeStream((-1,), ()), ] dout = th.data_triggered(din) - assert dout == [DNxscopeStream((0,), ()), DNxscopeStream((-1,), ())] + assert dout == [DNxscopeStream((-1,), ())] din = [ DNxscopeStream((-1,), ()), @@ -651,7 +700,7 @@ def test_triggerhandle_edgefalling2(): DNxscopeStream((-6,), ()), ] dout = th.data_triggered(din) - assert dout == [DNxscopeStream((-5,), ()), DNxscopeStream((-6,), ())] + assert dout == [DNxscopeStream((-6,), ())] din = [ DNxscopeStream((2,), ()), @@ -705,6 +754,89 @@ def test_triggerhandle_edgefalling2(): TriggerHandler.cls_cleanup() +def test_triggerhandle_windowenter() -> None: + with global_lock: + dtc = DTriggerConfig( + ETriggerType.WINDOW_ENTER, + hoffset=0, + window_low=-0.5, + window_high=0.5, + ) + th = TriggerHandler(0, dtc) + + din = [ + DNxscopeStream((-1.0,), ()), + DNxscopeStream((-0.8,), ()), + DNxscopeStream((-0.6,), ()), + ] + assert th.data_triggered(din) == [] + + din = [ + DNxscopeStream((-0.4,), ()), + DNxscopeStream((0.0,), ()), + DNxscopeStream((0.4,), ()), + ] + dout = th.data_triggered(din) + assert dout == din + + TriggerHandler.cls_cleanup() + + +def test_triggerhandle_windowexit() -> None: + with global_lock: + dtc = DTriggerConfig( + ETriggerType.WINDOW_EXIT, + hoffset=0, + window_low=-0.5, + window_high=0.5, + ) + th = TriggerHandler(0, dtc) + + din = [ + DNxscopeStream((-0.2,), ()), + DNxscopeStream((0.0,), ()), + DNxscopeStream((0.2,), ()), + ] + assert th.data_triggered(din) == [] + + din = [ + DNxscopeStream((0.4,), ()), + DNxscopeStream((0.6,), ()), + DNxscopeStream((0.8,), ()), + ] + dout = th.data_triggered(din) + assert dout == [ + DNxscopeStream((0.6,), ()), + DNxscopeStream((0.8,), ()), + ] + + TriggerHandler.cls_cleanup() + + +def test_triggerhandle_window_helpers_short_input() -> None: + with global_lock: + dtc = DTriggerConfig( + ETriggerType.WINDOW_ENTER, + hoffset=0, + window_low=-0.5, + window_high=0.5, + ) + th = TriggerHandler(0, dtc) + + assert th._windowenter([], 0, -0.5, 0.5).state is False + assert ( + th._windowenter([DNxscopeStream((0.0,), ())], 0, -0.5, 0.5).state + is False + ) + assert th._windowexit([], 0, -0.5, 0.5).state is False + assert ( + th._windowexit([DNxscopeStream((0.0,), ())], 0, -0.5, 0.5).state + is False + ) + + TriggerHandler.cls_cleanup() + + def test_triggerhandle_chanxtochany_nohoffset(): with global_lock: assert len(TriggerHandler._instances) == 0 @@ -991,6 +1123,13 @@ def test_triggerhandler_block_helpers_and_cache_paths() -> None: assert th._edgerising([], 0, 0.0).state is False assert th._edgefalling([], 0, 0.0).state is False assert th._slice_from([], 1) == [] + assert th._sample_count([DNxscopeStream((1,), ())]) == 1 + assert th._slice_range([], 0, 0) == [] + assert th._slice_range([], 0, 1) == [] + assert th._slice_range([DNxscopeStream((1,), ())], 0, 1) == [ + DNxscopeStream((1,), ()) + ] + assert th._tail_samples([DNxscopeStream((1,), ())], 0) == [] block = DNxscopeStreamBlock(data=np.array([[0.0], [2.0]]), meta=None) out = th.data_triggered([block]) @@ -1027,6 +1166,8 @@ def test_triggerhandler_block_helpers_and_cache_paths() -> None: concat0 = DNxscopeStreamBlock(data=np.array([[0.0]]), meta=None) concat1 = DNxscopeStreamBlock(data=np.array([[1.0]]), meta=None) assert th._combined_vector([concat0, concat1], 0) == [0.0, 1.0] + sliced_range = th._slice_range([concat0, concat1], 0, 5) + assert len(sliced_range) == 2 TriggerHandler.cls_cleanup() @@ -1045,6 +1186,237 @@ def test_triggerhandler_block_cache_hoffset_zero_keeps_current_batch() -> None: TriggerHandler.cls_cleanup() +def test_triggerhandle_stop_after_block_current_batch() -> None: + with global_lock: + trig = TriggerHandler( + 0, + DTriggerConfig( + ETriggerType.EDGE_RISING, + level=0.5, + capture_mode=ETriggerCaptureMode.STOP_AFTER, + post_samples=1, + ), + ) + + block = DNxscopeStreamBlock( + data=np.array([[0.0], [0.0], [1.0], [2.0]]), + meta=np.array([[0], [1], [2], [3]]), + ) + out = trig.data_triggered([block]) + + assert len(out) == 1 + assert out[0].data.tolist() == [[0.0], [0.0], [1.0], [2.0]] + assert out[0].meta.tolist() == [[0], [1], [2], [3]] + + TriggerHandler.cls_cleanup() + + +def test_triggerhandle_stop_after_block_boundary() -> None: + with global_lock: + trig = TriggerHandler( + 0, + DTriggerConfig( + ETriggerType.EDGE_RISING, + level=0.5, + capture_mode=ETriggerCaptureMode.STOP_AFTER, + post_samples=2, + ), + ) + + first = DNxscopeStreamBlock( + data=np.array([[0.0], [0.0]]), + meta=np.array([[0], [1]]), + ) + second = DNxscopeStreamBlock( + data=np.array([[1.0], [2.0], [3.0]]), + meta=np.array([[2], [3], [4]]), + ) + third = DNxscopeStreamBlock( + data=np.array([[4.0], [5.0]]), + meta=np.array([[5], [6]]), + ) + + out1 = trig.data_triggered([first]) + out2 = trig.data_triggered([second]) + out3 = trig.data_triggered([third]) + + assert out1[0].data.tolist() == [[0.0], [0.0]] + assert out2[0].data.tolist() == [[1.0], [2.0], [3.0]] + assert out3 == [] + + TriggerHandler.cls_cleanup() + + +def test_triggerhandle_stop_after_block_carries_post_tail() -> None: + with global_lock: + trig = TriggerHandler( + 0, + DTriggerConfig( + ETriggerType.EDGE_RISING, + level=0.5, + capture_mode=ETriggerCaptureMode.STOP_AFTER, + post_samples=3, + ), + ) + + assert trig.data_triggered([]) == [] + + first = DNxscopeStreamBlock( + data=np.array([[0.0]]), + meta=np.array([[0]]), + ) + second = DNxscopeStreamBlock( + data=np.array([[1.0]]), + meta=np.array([[1]]), + ) + third = DNxscopeStreamBlock( + data=np.array([[2.0], [3.0], [4.0]]), + meta=np.array([[2], [3], [4]]), + ) + + out1 = trig.data_triggered([first]) + out2 = trig.data_triggered([second]) + out3 = trig.data_triggered([third]) + out4 = trig.data_triggered([third]) + + assert out1[0].data.tolist() == [[0.0]] + assert out2[0].data.tolist() == [[1.0]] + assert out3[0].data.tolist() == [[2.0], [3.0], [4.0]] + assert out4 == [] + + TriggerHandler.cls_cleanup() + + +def test_triggerhandle_stop_after_block_boundary_zero_post() -> None: + with global_lock: + trig = TriggerHandler( + 0, + DTriggerConfig( + ETriggerType.EDGE_RISING, + level=0.5, + capture_mode=ETriggerCaptureMode.STOP_AFTER, + post_samples=0, + ), + ) + + first = DNxscopeStreamBlock( + data=np.array([[0.0]]), + meta=np.array([[0]]), + ) + second = DNxscopeStreamBlock( + data=np.array([[1.0]]), + meta=np.array([[1]]), + ) + + out1 = trig.data_triggered([first]) + out2 = trig.data_triggered([second]) + + assert out1[0].data.tolist() == [[0.0]] + assert out2[0].data.tolist() == [[1.0]] + + TriggerHandler.cls_cleanup() + + +def test_triggerhandle_stop_after_legacy_payload_unsupported() -> None: + with global_lock: + trig = TriggerHandler( + 0, + DTriggerConfig( + ETriggerType.EDGE_RISING, + level=0.5, + capture_mode=ETriggerCaptureMode.STOP_AFTER, + post_samples=1, + ), + ) + + with pytest.raises(NotImplementedError): + trig.data_triggered( + [ + DNxscopeStream((0.0,), ()), + DNxscopeStream((1.0,), ()), + ] + ) + + TriggerHandler.cls_cleanup() + + +def test_triggerhandle_stop_after_alwayson_does_not_emit_event() -> None: + with global_lock: + trig = TriggerHandler( + 0, + DTriggerConfig( + ETriggerType.ALWAYS_ON, + capture_mode=ETriggerCaptureMode.STOP_AFTER, + post_samples=2, + ), + ) + data = [ + DNxscopeStreamBlock( + data=np.array([[1.0], [2.0], [3.0]]), + meta=np.array([[0], [0], [0]]), + ) + ] + + ret = trig.data_triggered(data) + + assert len(ret) == 1 + assert np.array_equal(ret[0].data, np.array([[1.0], [2.0], [3.0]])) + assert trig.pop_trigger_event() is None + + TriggerHandler.cls_cleanup() + + +def test_triggerhandle_emits_event_metadata() -> None: + with global_lock: + trig = TriggerHandler( + 7, + DTriggerConfig(ETriggerType.EDGE_RISING, hoffset=2, level=0.5), + ) + block = DNxscopeStreamBlock( + data=np.array([[0.0], [0.0], [1.0], [2.0]]), + meta=None, + ) + + out = trig.data_triggered([block]) + event = trig.pop_trigger_event() + + assert len(out) == 1 + assert event is not None + assert event.channel == 7 + assert event.sample_index == 2.0 + assert trig.pop_trigger_event() is None + + TriggerHandler.cls_cleanup() + + +def test_triggerhandle_start_after_emits_event_only_once() -> None: + with global_lock: + trig = TriggerHandler( + 7, + DTriggerConfig(ETriggerType.EDGE_RISING, hoffset=2, level=0.5), + ) + first = DNxscopeStreamBlock( + data=np.array([[0.0], [0.0], [1.0], [2.0]]), + meta=None, + ) + second = DNxscopeStreamBlock( + data=np.array([[3.0], [4.0], [5.0]]), + meta=None, + ) + + trig.data_triggered([first]) + event1 = trig.pop_trigger_event() + + trig.data_triggered([second]) + event2 = trig.pop_trigger_event() + + assert event1 is not None + assert event1.sample_index == 2.0 + assert event2 is None + + TriggerHandler.cls_cleanup() + + def test_triggerhandle_edgerising_hoffset(): # TODO pass