diff --git a/ros2action/test/test_api.py b/ros2action/test/test_api.py index 0751d9194..838a5f87d 100644 --- a/ros2action/test/test_api.py +++ b/ros2action/test/test_api.py @@ -30,3 +30,78 @@ def test_get_action_clients_and_servers(): def test_get_action_names(): with DirectNode(None) as node: get_action_names(node=node) + + +def test_action_type_completer(): + from unittest.mock import Mock, patch + from ros2action.api import ActionTypeCompleter + + # Create a mock parsed_args object + parsed_args = Mock() + + # Test 1: When action_name_key is None, should return all action types + completer = ActionTypeCompleter() + with patch('ros2action.api.action_type_completer') as mock_action_type_completer: + mock_action_type_completer.return_value = [ + 'example_interfaces/Fibonacci', + 'action_tutorials_interfaces/Fibonacci' + ] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['example_interfaces/Fibonacci', 'action_tutorials_interfaces/Fibonacci'] + mock_action_type_completer.assert_called_once() + + # Test 2: When action_name_key is provided and action exists + parsed_args.action_name = '/test_action' + completer = ActionTypeCompleter(action_name_key='action_name') + + with patch('ros2action.api.NodeStrategy'): + with patch('ros2action.api.get_action_names_and_types') as mock_get_names_types: + mock_get_names_types.return_value = [ + ('/test_action', ['example_interfaces/Fibonacci']), + ('/other_action', ['action_tutorials_interfaces/Fibonacci']) + ] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['example_interfaces/Fibonacci'] + + # Test 3: When action_name_key is provided but action not found + parsed_args.action_name = '/nonexistent_action' + completer = ActionTypeCompleter(action_name_key='action_name') + + with patch('ros2action.api.NodeStrategy'): + with patch('ros2action.api.get_action_names_and_types') as mock_get_names_types: + mock_get_names_types.return_value = [ + ('/test_action', ['example_interfaces/Fibonacci']), + ('/other_action', ['action_tutorials_interfaces/Fibonacci']) + ] + result = completer(prefix='', parsed_args=parsed_args) + assert result == [] + + +def test_action_goal_prototype_completer(): + from unittest.mock import Mock, patch + from ros2action.api import ActionGoalPrototypeCompleter + + # Create a mock parsed_args object + parsed_args = Mock() + parsed_args.action_type = 'example_interfaces/Fibonacci' + + completer = ActionGoalPrototypeCompleter(action_type_key='action_type') + + # Mock the action class and its Goal + mock_action_class = Mock() + mock_goal_instance = Mock() + mock_action_class.Goal.return_value = mock_goal_instance + + with patch('ros2action.api.get_action') as mock_get_action: + with patch('ros2action.api.message_to_yaml') as mock_message_to_yaml: + mock_get_action.return_value = mock_action_class + mock_message_to_yaml.return_value = 'order: 5' + + result = completer(prefix='', parsed_args=parsed_args) + + # Verify get_action was called with the correct action type + mock_get_action.assert_called_once_with('example_interfaces/Fibonacci') + # Verify message_to_yaml was called with the goal instance + mock_message_to_yaml.assert_called_once_with(mock_goal_instance) + # Verify the result is a list containing the YAML string + assert result == ['order: 5'] diff --git a/ros2component/test/test_api.py b/ros2component/test/test_api.py index baa14d50f..f2987b9c6 100644 --- a/ros2component/test/test_api.py +++ b/ros2component/test/test_api.py @@ -39,3 +39,33 @@ def test_find_container_node_names(): def test_get_package_component_types(): """Test get_package_component_types() API function.""" assert len(get_package_component_types(package_name='ros2component')) == 0 + + +def test_component_type_name_completer(): + from unittest.mock import Mock, patch + from ros2component.api import ComponentTypeNameCompleter + + parsed_args = Mock() + parsed_args.package_name = 'my_package' + completer = ComponentTypeNameCompleter(package_name_key='package_name') + + # Test 1: package has component types + with patch('ros2component.api.get_package_component_types') as mock_get_types: + mock_get_types.return_value = ['my_package::MyComponent', 'my_package::OtherComponent'] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['my_package::MyComponent', 'my_package::OtherComponent'] + mock_get_types.assert_called_once_with(package_name='my_package') + + # Test 2: package has no component types + with patch('ros2component.api.get_package_component_types') as mock_get_types: + mock_get_types.return_value = [] + result = completer(prefix='', parsed_args=parsed_args) + assert result == [] + + # Test 3: different package name + parsed_args.package_name = 'other_package' + with patch('ros2component.api.get_package_component_types') as mock_get_types: + mock_get_types.return_value = ['other_package::Comp'] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['other_package::Comp'] + mock_get_types.assert_called_once_with(package_name='other_package') diff --git a/ros2node/test/test_api.py b/ros2node/test/test_api.py new file mode 100644 index 000000000..7ed60bd9c --- /dev/null +++ b/ros2node/test/test_api.py @@ -0,0 +1,65 @@ +# Copyright 2026 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def test_node_name_completer(): + from unittest.mock import Mock, patch + from ros2node.api import NodeNameCompleter, NodeName + + parsed_args = Mock() + + # Test 1: no include_hidden_nodes_key, defaults to False + completer = NodeNameCompleter() + with patch('ros2node.api.NodeStrategy') as mock_node_strategy: + mock_node = Mock() + mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node) + mock_node_strategy.return_value.__exit__ = Mock(return_value=False) + with patch('ros2node.api.get_node_names') as mock_get_names: + mock_get_names.return_value = [ + NodeName('node1', '/', '/node1'), + NodeName('node2', '/ns', '/ns/node2'), + ] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['/node1', '/ns/node2'] + mock_get_names.assert_called_once_with( + node=mock_node, include_hidden_nodes=False) + + # Test 2: include_hidden_nodes_key provided and True + parsed_args.include_hidden_nodes = True + completer = NodeNameCompleter(include_hidden_nodes_key='include_hidden_nodes') + with patch('ros2node.api.NodeStrategy') as mock_node_strategy: + mock_node = Mock() + mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node) + mock_node_strategy.return_value.__exit__ = Mock(return_value=False) + with patch('ros2node.api.get_node_names') as mock_get_names: + mock_get_names.return_value = [ + NodeName('node1', '/', '/node1'), + NodeName('_hidden', '/', '/_hidden'), + ] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['/node1', '/_hidden'] + mock_get_names.assert_called_once_with( + node=mock_node, include_hidden_nodes=True) + + # Test 3: no nodes available + parsed_args.include_hidden_nodes = False + completer = NodeNameCompleter(include_hidden_nodes_key='include_hidden_nodes') + with patch('ros2node.api.NodeStrategy') as mock_node_strategy: + mock_node = Mock() + mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node) + mock_node_strategy.return_value.__exit__ = Mock(return_value=False) + with patch('ros2node.api.get_node_names') as mock_get_names: + mock_get_names.return_value = [] + result = completer(prefix='', parsed_args=parsed_args) + assert result == [] diff --git a/ros2run/test/test_api.py b/ros2run/test/test_api.py new file mode 100644 index 000000000..6f71c8327 --- /dev/null +++ b/ros2run/test/test_api.py @@ -0,0 +1,45 @@ +# Copyright 2026 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def test_executable_name_completer(): + from unittest.mock import Mock, patch + from ros2pkg.api import PackageNotFound + from ros2run.api import ExecutableNameCompleter + + parsed_args = Mock() + parsed_args.package_name = 'my_package' + completer = ExecutableNameCompleter(package_name_key='package_name') + + # Test 1: package has executables + with patch('ros2run.api.get_executable_paths') as mock_get_paths: + mock_get_paths.return_value = [ + '/opt/ros/humble/lib/my_package/my_exec', + '/opt/ros/humble/lib/my_package/other_exec', + ] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['my_exec', 'other_exec'] + mock_get_paths.assert_called_once_with(package_name='my_package') + + # Test 2: package not found raises PackageNotFound, returns empty list + with patch('ros2run.api.get_executable_paths') as mock_get_paths: + mock_get_paths.side_effect = PackageNotFound('my_package') + result = completer(prefix='', parsed_args=parsed_args) + assert result == [] + + # Test 3: package has no executables + with patch('ros2run.api.get_executable_paths') as mock_get_paths: + mock_get_paths.return_value = [] + result = completer(prefix='', parsed_args=parsed_args) + assert result == [] diff --git a/ros2service/test/test_api.py b/ros2service/test/test_api.py new file mode 100644 index 000000000..6207705f1 --- /dev/null +++ b/ros2service/test/test_api.py @@ -0,0 +1,128 @@ +# Copyright 2026 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def test_service_name_completer(): + from unittest.mock import Mock, patch + from ros2service.api import ServiceNameCompleter + + parsed_args = Mock() + parsed_args.include_hidden_services = False + completer = ServiceNameCompleter( + include_hidden_services_key='include_hidden_services') + + # Test 1: returns service names + with patch('ros2service.api.NodeStrategy') as mock_node_strategy: + mock_node = Mock() + mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node) + mock_node_strategy.return_value.__exit__ = Mock(return_value=False) + with patch('ros2service.api.get_service_names') as mock_get_names: + mock_get_names.return_value = ['/service1', '/service2'] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['/service1', '/service2'] + mock_get_names.assert_called_once_with( + node=mock_node, include_hidden_services=False) + + # Test 2: no services available + with patch('ros2service.api.NodeStrategy') as mock_node_strategy: + mock_node = Mock() + mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node) + mock_node_strategy.return_value.__exit__ = Mock(return_value=False) + with patch('ros2service.api.get_service_names') as mock_get_names: + mock_get_names.return_value = [] + result = completer(prefix='', parsed_args=parsed_args) + assert result == [] + + # Test 3: include hidden services + parsed_args.include_hidden_services = True + with patch('ros2service.api.NodeStrategy') as mock_node_strategy: + mock_node = Mock() + mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node) + mock_node_strategy.return_value.__exit__ = Mock(return_value=False) + with patch('ros2service.api.get_service_names') as mock_get_names: + mock_get_names.return_value = ['/service1', '/_hidden_service'] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['/service1', '/_hidden_service'] + mock_get_names.assert_called_once_with( + node=mock_node, include_hidden_services=True) + + +def test_service_type_completer(): + from unittest.mock import Mock, patch + from ros2service.api import ServiceTypeCompleter + + parsed_args = Mock() + + # Test 1: no service_name_key, returns all service types + completer = ServiceTypeCompleter() + with patch('ros2service.api.service_type_completer') as mock_all_types: + mock_all_types.return_value = ['std_srvs/Empty', 'std_srvs/SetBool'] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['std_srvs/Empty', 'std_srvs/SetBool'] + mock_all_types.assert_called_once() + + # Test 2: service_name_key provided and service found + parsed_args.service_name = '/my_service' + completer = ServiceTypeCompleter(service_name_key='service_name') + with patch('ros2service.api.NodeStrategy') as mock_node_strategy: + mock_node = Mock() + mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node) + mock_node_strategy.return_value.__exit__ = Mock(return_value=False) + with patch('ros2service.api.get_service_names_and_types') as mock_get_names_types: + mock_get_names_types.return_value = [ + ('/my_service', ['std_srvs/Empty']), + ('/other_service', ['std_srvs/SetBool']), + ] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['std_srvs/Empty'] + + # Test 3: service_name_key provided but service not found, falls back to all types + parsed_args.service_name = '/nonexistent_service' + completer = ServiceTypeCompleter(service_name_key='service_name') + with patch('ros2service.api.NodeStrategy') as mock_node_strategy: + mock_node = Mock() + mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node) + mock_node_strategy.return_value.__exit__ = Mock(return_value=False) + with patch('ros2service.api.get_service_names_and_types') as mock_get_names_types: + mock_get_names_types.return_value = [ + ('/my_service', ['std_srvs/Empty']), + ] + with patch('ros2service.api.service_type_completer') as mock_all_types: + mock_all_types.return_value = ['std_srvs/Empty', 'std_srvs/SetBool'] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['std_srvs/Empty', 'std_srvs/SetBool'] + + +def test_service_prototype_completer(): + from unittest.mock import Mock, patch + from ros2service.api import ServicePrototypeCompleter + + parsed_args = Mock() + parsed_args.service_type = 'std_srvs/SetBool' + completer = ServicePrototypeCompleter(service_type_key='service_type') + + mock_service_class = Mock() + mock_request_instance = Mock() + mock_service_class.Request.return_value = mock_request_instance + + with patch('ros2service.api.get_service') as mock_get_service: + with patch('ros2service.api.message_to_yaml') as mock_message_to_yaml: + mock_get_service.return_value = mock_service_class + mock_message_to_yaml.return_value = 'data: false' + + result = completer(prefix='', parsed_args=parsed_args) + + mock_get_service.assert_called_once_with('std_srvs/SetBool') + mock_message_to_yaml.assert_called_once_with(mock_request_instance) + assert result == ['data: false'] diff --git a/ros2topic/test/test_api.py b/ros2topic/test/test_api.py new file mode 100644 index 000000000..94e0a391c --- /dev/null +++ b/ros2topic/test/test_api.py @@ -0,0 +1,127 @@ +# Copyright 2026 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def test_topic_name_completer(): + from unittest.mock import Mock, patch + from ros2topic.api import TopicNameCompleter + + parsed_args = Mock() + parsed_args.include_hidden_topics = False + completer = TopicNameCompleter(include_hidden_topics_key='include_hidden_topics') + + # Test 1: returns topic names + with patch('ros2topic.api.NodeStrategy') as mock_node_strategy: + mock_node = Mock() + mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node) + mock_node_strategy.return_value.__exit__ = Mock(return_value=False) + with patch('ros2topic.api.get_topic_names') as mock_get_names: + mock_get_names.return_value = ['/topic1', '/topic2'] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['/topic1', '/topic2'] + mock_get_names.assert_called_once_with( + node=mock_node, include_hidden_topics=False) + + # Test 2: no topics available + with patch('ros2topic.api.NodeStrategy') as mock_node_strategy: + mock_node = Mock() + mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node) + mock_node_strategy.return_value.__exit__ = Mock(return_value=False) + with patch('ros2topic.api.get_topic_names') as mock_get_names: + mock_get_names.return_value = [] + result = completer(prefix='', parsed_args=parsed_args) + assert result == [] + + # Test 3: include hidden topics + parsed_args.include_hidden_topics = True + with patch('ros2topic.api.NodeStrategy') as mock_node_strategy: + mock_node = Mock() + mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node) + mock_node_strategy.return_value.__exit__ = Mock(return_value=False) + with patch('ros2topic.api.get_topic_names') as mock_get_names: + mock_get_names.return_value = ['/topic1', '/_hidden_topic'] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['/topic1', '/_hidden_topic'] + mock_get_names.assert_called_once_with( + node=mock_node, include_hidden_topics=True) + + +def test_topic_type_completer(): + from unittest.mock import Mock, patch + from ros2topic.api import TopicTypeCompleter + + parsed_args = Mock() + + # Test 1: no topic_name_key, returns all message types + completer = TopicTypeCompleter() + with patch('ros2topic.api.message_type_completer') as mock_all_types: + mock_all_types.return_value = ['std_msgs/String', 'std_msgs/Int32'] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['std_msgs/String', 'std_msgs/Int32'] + mock_all_types.assert_called_once() + + # Test 2: topic_name_key provided and topic found + parsed_args.topic_name = '/my_topic' + completer = TopicTypeCompleter(topic_name_key='topic_name') + with patch('ros2topic.api.NodeStrategy') as mock_node_strategy: + mock_node = Mock() + mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node) + mock_node_strategy.return_value.__exit__ = Mock(return_value=False) + with patch('ros2topic.api.get_topic_names_and_types') as mock_get_names_types: + mock_get_names_types.return_value = [ + ('/my_topic', ['std_msgs/String']), + ('/other_topic', ['std_msgs/Int32']), + ] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['std_msgs/String'] + + # Test 3: topic_name_key provided but topic not found, falls back to all types + parsed_args.topic_name = '/nonexistent_topic' + completer = TopicTypeCompleter(topic_name_key='topic_name') + with patch('ros2topic.api.NodeStrategy') as mock_node_strategy: + mock_node = Mock() + mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node) + mock_node_strategy.return_value.__exit__ = Mock(return_value=False) + with patch('ros2topic.api.get_topic_names_and_types') as mock_get_names_types: + mock_get_names_types.return_value = [ + ('/my_topic', ['std_msgs/String']), + ] + with patch('ros2topic.api.message_type_completer') as mock_all_types: + mock_all_types.return_value = ['std_msgs/String', 'std_msgs/Int32'] + result = completer(prefix='', parsed_args=parsed_args) + assert result == ['std_msgs/String', 'std_msgs/Int32'] + + +def test_topic_message_prototype_completer(): + from unittest.mock import Mock, patch + from ros2topic.api import TopicMessagePrototypeCompleter + + parsed_args = Mock() + parsed_args.message_type = 'std_msgs/String' + completer = TopicMessagePrototypeCompleter(topic_type_key='message_type') + + mock_message_class = Mock() + mock_message_instance = Mock() + mock_message_class.return_value = mock_message_instance + + with patch('ros2topic.api.get_message') as mock_get_message: + with patch('ros2topic.api.message_to_yaml') as mock_message_to_yaml: + mock_get_message.return_value = mock_message_class + mock_message_to_yaml.return_value = 'data: ""' + + result = completer(prefix='', parsed_args=parsed_args) + + mock_get_message.assert_called_once_with('std_msgs/String') + mock_message_to_yaml.assert_called_once_with(mock_message_instance) + assert result == ["'data: \"\"'"]