-
Notifications
You must be signed in to change notification settings - Fork 3.6k
feat: add typed service locator to SimulationContext #5672
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| Added | ||
| ^^^^^ | ||
|
|
||
| * Added :class:`~isaaclab.sim.ServiceLocator` and exposed it as | ||
| :attr:`~isaaclab.sim.SimulationContext.services`. | ||
|
|
||
| Backend-specific caches can be registered and retrieved using subscript | ||
| syntax (``services[cls] = instance``, ``services[cls]``). Services with | ||
| a ``close()`` method are automatically closed on ``clear_instance()``. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| # Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). | ||
| # All rights reserved. | ||
| # | ||
| # SPDX-License-Identifier: BSD-3-Clause | ||
|
|
||
| """Typed service locator for lifecycle-managed singletons.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from typing import TypeVar | ||
|
|
||
| _T = TypeVar("_T") | ||
|
|
||
|
|
||
| def _try_close(service: object) -> None: | ||
| """Call close() on *service* if it exists and is callable.""" | ||
| close = getattr(service, "close", None) | ||
| if callable(close): | ||
| close() | ||
|
|
||
|
|
||
| class ServiceLocator: | ||
| """A typed service registry keyed by class, interface, or abstract base class. | ||
|
|
||
| Services are registered and retrieved using subscript syntax:: | ||
|
|
||
| locator[FabricStageCache] = FabricStageCache(stage) | ||
| cache = locator[FabricStageCache] | ||
|
|
||
| Deleting a service calls ``close()`` on it if available:: | ||
|
|
||
| del locator[FabricStageCache] | ||
|
|
||
| All registered services are closed and cleared via :meth:`close_all`. | ||
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| self._services: dict[type, object] = {} | ||
|
|
||
| def __getitem__(self, cls: type[_T]) -> _T | None: | ||
| """Retrieve a service by its key class, or ``None`` if not registered.""" | ||
| return self._services.get(cls) # type: ignore[return-value] | ||
|
|
||
| def __setitem__(self, cls: type[_T], instance: _T) -> None: | ||
| """Register a service under the given key. | ||
|
|
||
| The key can be the concrete class of *instance*, a parent class, | ||
| or an abstract base class / protocol — allowing retrieval by | ||
| interface rather than implementation. | ||
|
|
||
| Does *not* close a previously registered service — the caller is | ||
| responsible for closing the old instance before replacing it. | ||
| Use ``del locator[cls]`` to close and remove, or :meth:`pop` to | ||
| remove without closing. | ||
| """ | ||
| self._services[cls] = instance | ||
|
|
||
| def __delitem__(self, cls: type) -> None: | ||
| """Close and remove a service. | ||
|
|
||
| Calls ``close()`` on the instance if it has one, then removes it. | ||
|
|
||
| Raises: | ||
| KeyError: If no service is registered under *cls*. | ||
| """ | ||
| instance = self._services.pop(cls) | ||
| _try_close(instance) | ||
|
|
||
| def __contains__(self, cls: type) -> bool: | ||
| """Check if a service is registered under *cls*.""" | ||
| return cls in self._services | ||
|
|
||
| def pop(self, cls: type[_T]) -> _T | None: | ||
| """Remove and return a service without closing it. | ||
|
|
||
| Returns: | ||
| The previously registered instance, or ``None`` if not registered. | ||
| """ | ||
| return self._services.pop(cls, None) # type: ignore[return-value] | ||
|
|
||
| def close_all(self, caught_exceptions: list[Exception]) -> None: | ||
| """Close all registered services and clear the registry. | ||
|
|
||
| Calls ``close()`` on each service that has one. Exceptions are | ||
| always collected into *caught_exceptions* — closing continues for | ||
| all remaining services regardless of failures. | ||
|
|
||
| Args: | ||
| caught_exceptions: A list to which any exceptions raised by | ||
| service ``close()`` calls are appended. | ||
| """ | ||
| services = list(self._services.values()) | ||
| self._services.clear() | ||
| for service in services: | ||
| try: | ||
| _try_close(service) | ||
| except Exception as e: | ||
| caught_exceptions.append(e) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,169 @@ | ||
| # Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). | ||
| # All rights reserved. | ||
| # | ||
| # SPDX-License-Identifier: BSD-3-Clause | ||
|
|
||
| """Tests for ServiceLocator.""" | ||
|
|
||
| import pytest | ||
|
|
||
| from isaaclab.sim.service_locator import ServiceLocator | ||
|
|
||
| # -- Dummy service helpers -- | ||
|
|
||
|
|
||
| class _DummyServiceWithClose: | ||
| """Service with a callable close() method.""" | ||
|
|
||
| def __init__(self): | ||
| self.closed = False | ||
|
|
||
| def close(self): | ||
| self.closed = True | ||
|
|
||
|
|
||
| class _DummyServiceWithoutClose: | ||
| """Service without a close() method.""" | ||
|
|
||
| pass | ||
|
|
||
|
|
||
| class _DummyServiceWithCloseProperty: | ||
| """Service where 'close' exists but is not callable.""" | ||
|
|
||
| close = 42 # attribute, not a method | ||
|
|
||
|
|
||
| class _DummyServiceThatThrows: | ||
| """Service whose close() raises an exception.""" | ||
|
|
||
| def close(self): | ||
| raise RuntimeError("close failed") | ||
|
|
||
|
|
||
| # -- Fixtures -- | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def locator(): | ||
| """Provide a fresh ServiceLocator for each test.""" | ||
| return ServiceLocator() | ||
|
|
||
|
|
||
| # -- Tests -- | ||
|
|
||
|
|
||
| def test_get_returns_none_when_unregistered(locator): | ||
| assert locator[_DummyServiceWithClose] is None | ||
|
|
||
|
|
||
| def test_set_and_get(locator): | ||
| svc = _DummyServiceWithClose() | ||
| locator[_DummyServiceWithClose] = svc | ||
| assert locator[_DummyServiceWithClose] is svc | ||
|
|
||
|
|
||
| def test_contains(locator): | ||
| assert _DummyServiceWithClose not in locator | ||
| locator[_DummyServiceWithClose] = _DummyServiceWithClose() | ||
| assert _DummyServiceWithClose in locator | ||
|
|
||
|
|
||
| def test_del_closes_service(locator): | ||
| svc = _DummyServiceWithClose() | ||
| locator[_DummyServiceWithClose] = svc | ||
| del locator[_DummyServiceWithClose] | ||
| assert svc.closed | ||
| assert locator[_DummyServiceWithClose] is None | ||
|
|
||
|
|
||
| def test_del_without_close_method(locator): | ||
| """del on a service without close() should not raise.""" | ||
| locator[_DummyServiceWithoutClose] = _DummyServiceWithoutClose() | ||
| del locator[_DummyServiceWithoutClose] | ||
| assert locator[_DummyServiceWithoutClose] is None | ||
|
|
||
|
|
||
| def test_del_with_non_callable_close_property(locator): | ||
| """del on a service where close is a property (not callable) should not raise.""" | ||
| svc = _DummyServiceWithCloseProperty() | ||
| locator[_DummyServiceWithCloseProperty] = svc | ||
| del locator[_DummyServiceWithCloseProperty] | ||
| assert locator[_DummyServiceWithCloseProperty] is None | ||
|
|
||
|
|
||
| def test_del_missing_raises_key_error(locator): | ||
| with pytest.raises(KeyError): | ||
| del locator[_DummyServiceWithClose] | ||
|
|
||
|
|
||
| def test_pop_returns_without_closing(locator): | ||
| svc = _DummyServiceWithClose() | ||
| locator[_DummyServiceWithClose] = svc | ||
| popped = locator.pop(_DummyServiceWithClose) | ||
| assert popped is svc | ||
| assert not svc.closed | ||
| assert locator[_DummyServiceWithClose] is None | ||
|
|
||
|
|
||
| def test_pop_missing_returns_none(locator): | ||
| assert locator.pop(_DummyServiceWithClose) is None | ||
|
|
||
|
|
||
| def test_close_all(locator): | ||
| svc1 = _DummyServiceWithClose() | ||
| svc2 = _DummyServiceWithoutClose() | ||
| locator[_DummyServiceWithClose] = svc1 | ||
| locator[_DummyServiceWithoutClose] = svc2 | ||
| errors: list[Exception] = [] | ||
| locator.close_all(caught_exceptions=errors) | ||
| assert svc1.closed | ||
| assert not errors | ||
| assert locator[_DummyServiceWithClose] is None | ||
| assert locator[_DummyServiceWithoutClose] is None | ||
|
|
||
|
|
||
| def test_close_all_skips_non_callable_close(locator): | ||
| """close_all does not crash on services with non-callable close attribute.""" | ||
| locator[_DummyServiceWithCloseProperty] = _DummyServiceWithCloseProperty() | ||
| errors: list[Exception] = [] | ||
| locator.close_all(caught_exceptions=errors) | ||
| assert not errors | ||
| assert locator[_DummyServiceWithCloseProperty] is None | ||
|
|
||
|
|
||
| def test_close_all_collects_exceptions(locator): | ||
| """Exceptions are collected and all services still get closed.""" | ||
| svc_ok = _DummyServiceWithClose() | ||
| locator[_DummyServiceWithClose] = svc_ok | ||
| locator[_DummyServiceThatThrows] = _DummyServiceThatThrows() | ||
| errors: list[Exception] = [] | ||
| locator.close_all(caught_exceptions=errors) | ||
| assert svc_ok.closed | ||
| assert len(errors) == 1 | ||
| assert isinstance(errors[0], RuntimeError) | ||
| assert locator[_DummyServiceWithClose] is None | ||
| assert locator[_DummyServiceThatThrows] is None | ||
|
|
||
|
|
||
| def test_multiple_service_types(locator): | ||
| svc1 = _DummyServiceWithClose() | ||
| svc2 = _DummyServiceWithoutClose() | ||
| locator[_DummyServiceWithClose] = svc1 | ||
| locator[_DummyServiceWithoutClose] = svc2 | ||
| assert locator[_DummyServiceWithClose] is svc1 | ||
| assert locator[_DummyServiceWithoutClose] is svc2 | ||
|
|
||
|
|
||
| def test_base_class_key(locator): | ||
| """Can register under a base class and retrieve by it.""" | ||
|
|
||
| class Base: | ||
| pass | ||
|
|
||
| class Impl(Base): | ||
| pass | ||
|
|
||
| impl = Impl() | ||
| locator[Base] = impl | ||
| assert locator[Base] is impl |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.