Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions ros2action/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,78 @@ def test_get_action_clients_and_servers():
def test_get_action_names():
with DirectNode(None) as node:
get_action_names(node=node)


def test_action_type_completer():
from unittest.mock import Mock, patch
from ros2action.api import ActionTypeCompleter

# Create a mock parsed_args object
parsed_args = Mock()

# Test 1: When action_name_key is None, should return all action types
completer = ActionTypeCompleter()
with patch('ros2action.api.action_type_completer') as mock_action_type_completer:
mock_action_type_completer.return_value = [
'example_interfaces/Fibonacci',
'action_tutorials_interfaces/Fibonacci'
]
result = completer(prefix='', parsed_args=parsed_args)
assert result == ['example_interfaces/Fibonacci', 'action_tutorials_interfaces/Fibonacci']
mock_action_type_completer.assert_called_once()

# Test 2: When action_name_key is provided and action exists
parsed_args.action_name = '/test_action'
completer = ActionTypeCompleter(action_name_key='action_name')

with patch('ros2action.api.NodeStrategy'):
with patch('ros2action.api.get_action_names_and_types') as mock_get_names_types:
mock_get_names_types.return_value = [
('/test_action', ['example_interfaces/Fibonacci']),
('/other_action', ['action_tutorials_interfaces/Fibonacci'])
]
result = completer(prefix='', parsed_args=parsed_args)
assert result == ['example_interfaces/Fibonacci']

# Test 3: When action_name_key is provided but action not found
parsed_args.action_name = '/nonexistent_action'
completer = ActionTypeCompleter(action_name_key='action_name')

with patch('ros2action.api.NodeStrategy'):
with patch('ros2action.api.get_action_names_and_types') as mock_get_names_types:
mock_get_names_types.return_value = [
('/test_action', ['example_interfaces/Fibonacci']),
('/other_action', ['action_tutorials_interfaces/Fibonacci'])
]
result = completer(prefix='', parsed_args=parsed_args)
assert result == []


def test_action_goal_prototype_completer():
from unittest.mock import Mock, patch
from ros2action.api import ActionGoalPrototypeCompleter

# Create a mock parsed_args object
parsed_args = Mock()
parsed_args.action_type = 'example_interfaces/Fibonacci'

completer = ActionGoalPrototypeCompleter(action_type_key='action_type')

# Mock the action class and its Goal
mock_action_class = Mock()
mock_goal_instance = Mock()
mock_action_class.Goal.return_value = mock_goal_instance

with patch('ros2action.api.get_action') as mock_get_action:
with patch('ros2action.api.message_to_yaml') as mock_message_to_yaml:
mock_get_action.return_value = mock_action_class
mock_message_to_yaml.return_value = 'order: 5'

result = completer(prefix='', parsed_args=parsed_args)

# Verify get_action was called with the correct action type
mock_get_action.assert_called_once_with('example_interfaces/Fibonacci')
# Verify message_to_yaml was called with the goal instance
mock_message_to_yaml.assert_called_once_with(mock_goal_instance)
# Verify the result is a list containing the YAML string
assert result == ['order: 5']
30 changes: 30 additions & 0 deletions ros2component/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,33 @@ def test_find_container_node_names():
def test_get_package_component_types():
"""Test get_package_component_types() API function."""
assert len(get_package_component_types(package_name='ros2component')) == 0


def test_component_type_name_completer():
from unittest.mock import Mock, patch
from ros2component.api import ComponentTypeNameCompleter

parsed_args = Mock()
parsed_args.package_name = 'my_package'
completer = ComponentTypeNameCompleter(package_name_key='package_name')

# Test 1: package has component types
with patch('ros2component.api.get_package_component_types') as mock_get_types:
mock_get_types.return_value = ['my_package::MyComponent', 'my_package::OtherComponent']
result = completer(prefix='', parsed_args=parsed_args)
assert result == ['my_package::MyComponent', 'my_package::OtherComponent']
mock_get_types.assert_called_once_with(package_name='my_package')

# Test 2: package has no component types
with patch('ros2component.api.get_package_component_types') as mock_get_types:
mock_get_types.return_value = []
result = completer(prefix='', parsed_args=parsed_args)
assert result == []

# Test 3: different package name
parsed_args.package_name = 'other_package'
with patch('ros2component.api.get_package_component_types') as mock_get_types:
mock_get_types.return_value = ['other_package::Comp']
result = completer(prefix='', parsed_args=parsed_args)
assert result == ['other_package::Comp']
mock_get_types.assert_called_once_with(package_name='other_package')
65 changes: 65 additions & 0 deletions ros2node/test/test_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2026 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def test_node_name_completer():
from unittest.mock import Mock, patch
from ros2node.api import NodeNameCompleter, NodeName

parsed_args = Mock()

# Test 1: no include_hidden_nodes_key, defaults to False
completer = NodeNameCompleter()
with patch('ros2node.api.NodeStrategy') as mock_node_strategy:
mock_node = Mock()
mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node)
mock_node_strategy.return_value.__exit__ = Mock(return_value=False)
with patch('ros2node.api.get_node_names') as mock_get_names:
mock_get_names.return_value = [
NodeName('node1', '/', '/node1'),
NodeName('node2', '/ns', '/ns/node2'),
]
result = completer(prefix='', parsed_args=parsed_args)
assert result == ['/node1', '/ns/node2']
mock_get_names.assert_called_once_with(
node=mock_node, include_hidden_nodes=False)

# Test 2: include_hidden_nodes_key provided and True
parsed_args.include_hidden_nodes = True
completer = NodeNameCompleter(include_hidden_nodes_key='include_hidden_nodes')
with patch('ros2node.api.NodeStrategy') as mock_node_strategy:
mock_node = Mock()
mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node)
mock_node_strategy.return_value.__exit__ = Mock(return_value=False)
with patch('ros2node.api.get_node_names') as mock_get_names:
mock_get_names.return_value = [
NodeName('node1', '/', '/node1'),
NodeName('_hidden', '/', '/_hidden'),
]
result = completer(prefix='', parsed_args=parsed_args)
assert result == ['/node1', '/_hidden']
mock_get_names.assert_called_once_with(
node=mock_node, include_hidden_nodes=True)

# Test 3: no nodes available
parsed_args.include_hidden_nodes = False
completer = NodeNameCompleter(include_hidden_nodes_key='include_hidden_nodes')
with patch('ros2node.api.NodeStrategy') as mock_node_strategy:
mock_node = Mock()
mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node)
mock_node_strategy.return_value.__exit__ = Mock(return_value=False)
with patch('ros2node.api.get_node_names') as mock_get_names:
mock_get_names.return_value = []
result = completer(prefix='', parsed_args=parsed_args)
assert result == []
45 changes: 45 additions & 0 deletions ros2run/test/test_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2026 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def test_executable_name_completer():
from unittest.mock import Mock, patch
from ros2pkg.api import PackageNotFound
from ros2run.api import ExecutableNameCompleter

parsed_args = Mock()
parsed_args.package_name = 'my_package'
completer = ExecutableNameCompleter(package_name_key='package_name')

# Test 1: package has executables
with patch('ros2run.api.get_executable_paths') as mock_get_paths:
mock_get_paths.return_value = [
'/opt/ros/humble/lib/my_package/my_exec',
'/opt/ros/humble/lib/my_package/other_exec',
]
result = completer(prefix='', parsed_args=parsed_args)
assert result == ['my_exec', 'other_exec']
mock_get_paths.assert_called_once_with(package_name='my_package')

# Test 2: package not found raises PackageNotFound, returns empty list
with patch('ros2run.api.get_executable_paths') as mock_get_paths:
mock_get_paths.side_effect = PackageNotFound('my_package')
result = completer(prefix='', parsed_args=parsed_args)
assert result == []

# Test 3: package has no executables
with patch('ros2run.api.get_executable_paths') as mock_get_paths:
mock_get_paths.return_value = []
result = completer(prefix='', parsed_args=parsed_args)
assert result == []
128 changes: 128 additions & 0 deletions ros2service/test/test_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright 2026 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def test_service_name_completer():
from unittest.mock import Mock, patch
from ros2service.api import ServiceNameCompleter

parsed_args = Mock()
parsed_args.include_hidden_services = False
completer = ServiceNameCompleter(
include_hidden_services_key='include_hidden_services')

# Test 1: returns service names
with patch('ros2service.api.NodeStrategy') as mock_node_strategy:
mock_node = Mock()
mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node)
mock_node_strategy.return_value.__exit__ = Mock(return_value=False)
with patch('ros2service.api.get_service_names') as mock_get_names:
mock_get_names.return_value = ['/service1', '/service2']
result = completer(prefix='', parsed_args=parsed_args)
assert result == ['/service1', '/service2']
mock_get_names.assert_called_once_with(
node=mock_node, include_hidden_services=False)

# Test 2: no services available
with patch('ros2service.api.NodeStrategy') as mock_node_strategy:
mock_node = Mock()
mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node)
mock_node_strategy.return_value.__exit__ = Mock(return_value=False)
with patch('ros2service.api.get_service_names') as mock_get_names:
mock_get_names.return_value = []
result = completer(prefix='', parsed_args=parsed_args)
assert result == []

# Test 3: include hidden services
parsed_args.include_hidden_services = True
with patch('ros2service.api.NodeStrategy') as mock_node_strategy:
mock_node = Mock()
mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node)
mock_node_strategy.return_value.__exit__ = Mock(return_value=False)
with patch('ros2service.api.get_service_names') as mock_get_names:
mock_get_names.return_value = ['/service1', '/_hidden_service']
result = completer(prefix='', parsed_args=parsed_args)
assert result == ['/service1', '/_hidden_service']
mock_get_names.assert_called_once_with(
node=mock_node, include_hidden_services=True)


def test_service_type_completer():
from unittest.mock import Mock, patch
from ros2service.api import ServiceTypeCompleter

parsed_args = Mock()

# Test 1: no service_name_key, returns all service types
completer = ServiceTypeCompleter()
with patch('ros2service.api.service_type_completer') as mock_all_types:
mock_all_types.return_value = ['std_srvs/Empty', 'std_srvs/SetBool']
result = completer(prefix='', parsed_args=parsed_args)
assert result == ['std_srvs/Empty', 'std_srvs/SetBool']
mock_all_types.assert_called_once()

# Test 2: service_name_key provided and service found
parsed_args.service_name = '/my_service'
completer = ServiceTypeCompleter(service_name_key='service_name')
with patch('ros2service.api.NodeStrategy') as mock_node_strategy:
mock_node = Mock()
mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node)
mock_node_strategy.return_value.__exit__ = Mock(return_value=False)
with patch('ros2service.api.get_service_names_and_types') as mock_get_names_types:
mock_get_names_types.return_value = [
('/my_service', ['std_srvs/Empty']),
('/other_service', ['std_srvs/SetBool']),
]
result = completer(prefix='', parsed_args=parsed_args)
assert result == ['std_srvs/Empty']

# Test 3: service_name_key provided but service not found, falls back to all types
parsed_args.service_name = '/nonexistent_service'
completer = ServiceTypeCompleter(service_name_key='service_name')
with patch('ros2service.api.NodeStrategy') as mock_node_strategy:
mock_node = Mock()
mock_node_strategy.return_value.__enter__ = Mock(return_value=mock_node)
mock_node_strategy.return_value.__exit__ = Mock(return_value=False)
with patch('ros2service.api.get_service_names_and_types') as mock_get_names_types:
mock_get_names_types.return_value = [
('/my_service', ['std_srvs/Empty']),
]
with patch('ros2service.api.service_type_completer') as mock_all_types:
mock_all_types.return_value = ['std_srvs/Empty', 'std_srvs/SetBool']
result = completer(prefix='', parsed_args=parsed_args)
assert result == ['std_srvs/Empty', 'std_srvs/SetBool']


def test_service_prototype_completer():
from unittest.mock import Mock, patch
from ros2service.api import ServicePrototypeCompleter

parsed_args = Mock()
parsed_args.service_type = 'std_srvs/SetBool'
completer = ServicePrototypeCompleter(service_type_key='service_type')

mock_service_class = Mock()
mock_request_instance = Mock()
mock_service_class.Request.return_value = mock_request_instance

with patch('ros2service.api.get_service') as mock_get_service:
with patch('ros2service.api.message_to_yaml') as mock_message_to_yaml:
mock_get_service.return_value = mock_service_class
mock_message_to_yaml.return_value = 'data: false'

result = completer(prefix='', parsed_args=parsed_args)

mock_get_service.assert_called_once_with('std_srvs/SetBool')
mock_message_to_yaml.assert_called_once_with(mock_request_instance)
assert result == ['data: false']
Loading