Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rosbridge_library/src/rosbridge_library/rosbridge_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
class RosbridgeProtocol(Protocol):
"""Adds the handlers for the rosbridge opcodes."""

PROTOCOL_VERSION: tuple[int, int, int] = (2, 0, 0)

rosbridge_capabilities: tuple[type[Capability], ...] = (
Advertise,
Publish,
Expand Down
3 changes: 3 additions & 0 deletions rosbridge_server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ if(BUILD_TESTING)
)
endforeach()

# Doesn't depend on the executor type, so just run once
add_launch_test(test/websocket/version_handler.test.py)

find_package(ament_cmake_mypy REQUIRED)
ament_mypy()
endif()
10 changes: 7 additions & 3 deletions rosbridge_server/scripts/rosbridge_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@
from tornado.netutil import bind_sockets
from tornado.web import Application

from rosbridge_server import ClientManager, RosbridgeWebSocket
from rosbridge_server import ClientManager, RosbridgeVersionHandler, RosbridgeWebSocket

if TYPE_CHECKING:
from tornado.routing import _RuleList
from tornado.web import RequestHandler


SERVER_PARAMETERS = (
Expand Down Expand Up @@ -196,9 +197,12 @@ def _handle_parameters(self) -> None:
)

def _start_server(self) -> None:
handlers = [(r"/", RosbridgeWebSocket), (r"", RosbridgeWebSocket)]
handlers: list[tuple[str, type[RequestHandler]]] = []
if self.url_path != "/":
handlers = [(rf"{self.url_path}", RosbridgeWebSocket)]
handlers.append((rf"{self.url_path}", RosbridgeWebSocket))
else:
handlers.extend([(r"/", RosbridgeWebSocket), (r"", RosbridgeWebSocket)])
handlers.append((r"/version", RosbridgeVersionHandler))

application = Application(
handlers=cast("_RuleList", handlers),
Expand Down
1 change: 1 addition & 0 deletions rosbridge_server/src/rosbridge_server/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .client_manager import ClientManager # noqa: F401
from .version_handler import RosbridgeVersionHandler # noqa: F401
from .websocket_handler import RosbridgeWebSocket # noqa: F401
55 changes: 55 additions & 0 deletions rosbridge_server/src/rosbridge_server/version_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2026, Fictionlab sp. z o.o.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of copyright holder nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from __future__ import annotations

import json
from importlib.metadata import version

from rosbridge_library.rosbridge_protocol import RosbridgeProtocol
from tornado.web import RequestHandler


class RosbridgeVersionHandler(RequestHandler):
"""HTTP GET handler that returns the rosbridge protocol and server versions as JSON."""

def get(self) -> None:
major, minor, patch = RosbridgeProtocol.PROTOCOL_VERSION
self.set_header("Content-Type", "application/json")
self.write(
json.dumps(
{
"protocol": {"major": major, "minor": minor, "patch": patch},
"server": version("rosbridge_server"),
}
)
)
54 changes: 54 additions & 0 deletions rosbridge_server/test/websocket/version_handler.test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from __future__ import annotations

import json
import sys
import unittest
import urllib.request
from importlib.metadata import version
from pathlib import Path

import rclpy
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from twisted.python import log

sys.path.append(str(Path(__file__).parent)) # enable importing from common.py in this directory

import common
from common import get_server_port

log.startLogging(sys.stderr)

generate_test_description = common.generate_test_description


class TestVersionHandler(unittest.TestCase):
def test_version_endpoint(self) -> None:
context = rclpy.Context()
rclpy.init(context=context)
executor = SingleThreadedExecutor(context=context)
node = Node("test_version_handler", context=context)
executor.add_node(node)
try:
port_future = executor.create_task(get_server_port, node)
executor.spin_until_future_complete(port_future, timeout_sec=10.0)
port = port_future.result()
finally:
executor.remove_node(node)
node.destroy_node()
rclpy.shutdown(context=context)

with urllib.request.urlopen(f"http://127.0.0.1:{port}/version") as response:
assert response.status == 200
assert response.headers.get_content_type() == "application/json"
data = json.loads(response.read())

assert "protocol" in data
assert "server" in data
assert "major" in data["protocol"]
assert "minor" in data["protocol"]
assert "patch" in data["protocol"]
assert isinstance(data["protocol"]["major"], int)
assert isinstance(data["protocol"]["minor"], int)
assert isinstance(data["protocol"]["patch"], int)
assert data["server"] == version("rosbridge_server")
Loading