diff --git a/Makefile b/Makefile index a7d72ea29..c104e2136 100644 --- a/Makefile +++ b/Makefile @@ -980,7 +980,7 @@ test_parallel: hub chrome firefox edge chromium video cd ./tests || true ; \ echo TAG=$(TAG_VERSION) > .env ; \ echo VIDEO_TAG=$(FFMPEG_TAG_VERSION)-$(BUILD_DATE) >> .env ; \ - echo TEST_DELAY_AFTER_TEST=$(or $(TEST_DELAY_AFTER_TEST), 2) >> .env ; \ + echo TEST_DELAY_AFTER_TEST=$(or $(TEST_DELAY_AFTER_TEST), 0) >> .env ; \ echo TEST_DRAIN_AFTER_SESSION_COUNT=$(or $(TEST_DRAIN_AFTER_SESSION_COUNT), 2) >> .env ; \ echo TEST_PARALLEL_HARDENING=$(or $(TEST_PARALLEL_HARDENING), "true") >> .env ; \ echo TEST_PARALLEL_COUNT=$(or $(TEST_PARALLEL_COUNT), 5) >> .env ; \ @@ -1007,10 +1007,10 @@ test_parallel: hub chrome firefox edge chromium video make test_video_integrity test_video_standalone: standalone_chrome standalone_chromium standalone_firefox standalone_edge - DOCKER_COMPOSE_FILE=docker-compose-v3-test-standalone.yml TEST_DELAY_AFTER_TEST=2 HUB_CHECKS_INTERVAL=45 make test_video + DOCKER_COMPOSE_FILE=docker-compose-v3-test-standalone.yml TEST_DELAY_AFTER_TEST=0 HUB_CHECKS_INTERVAL=45 make test_video test_video_dynamic_name: - VIDEO_FILE_NAME=auto TEST_DELAY_AFTER_TEST=2 HUB_CHECKS_INTERVAL=45 TEST_ADD_CAPS_RECORD_VIDEO=false \ + VIDEO_FILE_NAME=auto TEST_DELAY_AFTER_TEST=0 HUB_CHECKS_INTERVAL=45 TEST_ADD_CAPS_RECORD_VIDEO=false \ make test_video # This should run on its own CI job. There is no need to combine it with the other tests. @@ -1039,7 +1039,7 @@ test_video: video hub chrome firefox edge chromium echo UID=$$(id -u) >> .env ; \ echo BINDING_VERSION=$(BINDING_VERSION) >> .env ; \ echo BASE_VERSION=$(BASE_VERSION) >> .env ; \ - echo TEST_DELAY_AFTER_TEST=$(or $(TEST_DELAY_AFTER_TEST), 2) >> .env ; \ + echo TEST_DELAY_AFTER_TEST=$(or $(TEST_DELAY_AFTER_TEST), 0) >> .env ; \ echo HUB_CHECKS_INTERVAL=$(or $(HUB_CHECKS_INTERVAL), 45) >> .env ; \ echo SELENIUM_ENABLE_MANAGED_DOWNLOADS=$(or $(SELENIUM_ENABLE_MANAGED_DOWNLOADS), "true") >> .env ; \ echo TEST_FIREFOX_INSTALL_LANG_PACKAGE=$${TEST_FIREFOX_INSTALL_LANG_PACKAGE} >> .env ; \ @@ -1161,7 +1161,7 @@ test_node_docker: hub standalone_docker standalone_chrome standalone_firefox sta echo LOG_LEVEL=$(or $(LOG_LEVEL), "INFO") >> .env ; \ echo REQUEST_TIMEOUT=$(or $(REQUEST_TIMEOUT), 300) >> .env ; \ echo SELENIUM_ENABLE_MANAGED_DOWNLOADS=$(or $(SELENIUM_ENABLE_MANAGED_DOWNLOADS), "false") >> .env ; \ - echo TEST_DELAY_AFTER_TEST=$(or $(TEST_DELAY_AFTER_TEST), 2) >> .env ; \ + echo TEST_DELAY_AFTER_TEST=$(or $(TEST_DELAY_AFTER_TEST), 0) >> .env ; \ echo RECORD_STANDALONE=$(or $(RECORD_STANDALONE), "true") >> .env ; \ echo GRID_URL=$(or $(GRID_URL), "") >> .env ; \ echo HUB_CHECKS_INTERVAL=$(or $(HUB_CHECKS_INTERVAL), 20) >> .env ; \ @@ -1170,6 +1170,7 @@ test_node_docker: hub standalone_docker standalone_chrome standalone_firefox sta echo UID=$$(id -u) >> .env ; \ echo BINDING_VERSION=$(BINDING_VERSION) >> .env ; \ echo BASE_VERSION=$(BASE_VERSION) >> .env ; \ + echo VIDEO_EVENT_DRIVEN=$(or $(VIDEO_EVENT_DRIVEN), "true") >> .env ; \ if [ "$$(uname)" != "Darwin" ]; then \ echo HOST_IP=$$(hostname -I | awk '{print $$1}') >> .env ; \ else \ @@ -1238,7 +1239,7 @@ test_video_integrity: fi; \ for file in $$list_files; do \ echo "Checking video file: $$file"; \ - docker run -u $$(id -u) -v $$(pwd):$$(pwd) -w $$(pwd) --entrypoint="" $(NAME)/video:$(FFMPEG_TAG_VERSION)-$(BUILD_DATE) ffmpeg -v error -i "$$file" -f null - ; \ + docker run --rm -u $$(id -u) -v $$(pwd):$$(pwd) -w $$(pwd) --entrypoint="" $(NAME)/video:$(FFMPEG_TAG_VERSION)-$(BUILD_DATE) ffmpeg -v error -i "$$file" -f null - ; \ if [ $$? -ne 0 ]; then \ echo "Video file $$file is corrupted"; \ number_corrupted_files=$$((number_corrupted_files+1)); \ @@ -1275,7 +1276,7 @@ chart_test_autoscaling_deployment: PLATFORMS=$(PLATFORMS) TEST_EXISTING_KEDA=true RELEASE_NAME=selenium CHART_ENABLE_TRACING=true TEST_PATCHED_KEDA=$(TEST_PATCHED_KEDA) AUTOSCALING_COOLDOWN_PERIOD=30 \ TRACING_EXPORTER_ENDPOINT=$(TRACING_EXPORTER_ENDPOINT) TEST_CUSTOM_SPECIFIC_NAME=true \ SECURE_CONNECTION_SERVER=true SECURE_USE_EXTERNAL_CERT=true SERVICE_TYPE_NODEPORT=true SELENIUM_GRID_PROTOCOL=https SELENIUM_GRID_HOST=$$(hostname -I | cut -d' ' -f1) SELENIUM_GRID_PORT=31444 \ - SELENIUM_GRID_AUTOSCALING_MIN_REPLICA=1 SET_MAX_REPLICAS=3 TEST_DELAY_AFTER_TEST=2 TEST_NODE_DRAIN_AFTER_SESSION_COUNT=3 SELENIUM_GRID_MONITORING=false \ + SELENIUM_GRID_AUTOSCALING_MIN_REPLICA=1 SET_MAX_REPLICAS=3 TEST_DELAY_AFTER_TEST=0 TEST_NODE_DRAIN_AFTER_SESSION_COUNT=3 SELENIUM_GRID_MONITORING=false \ VERSION=$(TAG_VERSION) VIDEO_TAG=$(FFMPEG_TAG_VERSION)-$(BUILD_DATE) KEDA_BASED_NAME=$(KEDA_BASED_NAME) KEDA_BASED_TAG=$(KEDA_BASED_TAG) NAMESPACE=$(NAMESPACE) BINDING_VERSION=$(BINDING_VERSION) BASE_VERSION=$(BASE_VERSION) \ TEMPLATE_OUTPUT_FILENAME="k8s_prefixSelenium_enableTracing_secureServer_externalCerts_nodePort_autoScaling_scaledObject_existingKEDA_subPath.yaml" \ ./tests/charts/make/chart_test.sh DeploymentAutoscaling diff --git a/StandaloneDocker/Dockerfile b/StandaloneDocker/Dockerfile index b5d2b596a..4b1a5f454 100644 --- a/StandaloneDocker/Dockerfile +++ b/StandaloneDocker/Dockerfile @@ -21,4 +21,6 @@ ENV SE_SESSION_REQUEST_TIMEOUT="300" \ # Boolean value, maps "--relax-checks" SE_RELAX_CHECKS="true" \ SE_OTEL_SERVICE_NAME="selenium-standalone-docker" \ - SE_NODE_ENABLE_MANAGED_DOWNLOADS="true" + SE_NODE_ENABLE_MANAGED_DOWNLOADS="true" \ + SE_BIND_BUS="true" \ + SE_EVENT_BUS_IMPLEMENTATION="" diff --git a/StandaloneDocker/start-selenium-grid-docker.sh b/StandaloneDocker/start-selenium-grid-docker.sh index dcd5c08c2..4376dbd1b 100755 --- a/StandaloneDocker/start-selenium-grid-docker.sh +++ b/StandaloneDocker/start-selenium-grid-docker.sh @@ -108,6 +108,19 @@ if [ ! -z "${SE_EVENT_BUS_HEARTBEAT_PERIOD}" ]; then append_se_opts "--eventbus-heartbeat-period" "${SE_EVENT_BUS_HEARTBEAT_PERIOD}" fi +if [ ! -z "${SE_EVENT_BUS_IMPLEMENTATION}" ]; then + append_se_opts "--events-implementation" "${SE_EVENT_BUS_IMPLEMENTATION}" +fi + +if [ "${SE_BIND_BUS}" = "true" ]; then + append_se_opts "--bind-bus" "${SE_BIND_BUS}" + append_se_opts "--publish-events" "tcp://*:${SE_EVENT_BUS_PUBLISH_PORT}" + append_se_opts "--subscribe-events" "tcp://*:${SE_EVENT_BUS_SUBSCRIBE_PORT}" + if [ -z "${SE_EVENT_BUS_IMPLEMENTATION}" ]; then + append_se_opts "--events-implementation" "org.openqa.selenium.events.zeromq.ZeroMqEventBus" + fi +fi + if [ "${SE_ENABLE_TLS}" = "true" ]; then # Configure truststore for the server if [ ! -z "$SE_JAVA_SSL_TRUST_STORE" ]; then diff --git a/Video/recorder.conf b/Video/recorder.conf index e13c68755..ef0edac83 100755 --- a/Video/recorder.conf +++ b/Video/recorder.conf @@ -6,7 +6,7 @@ autostart=%(ENV_SE_RECORD_VIDEO)s startsecs=0 autorestart=%(ENV_SE_RECORD_VIDEO)s stopsignal=TERM -stopwaitsecs=60 +stopwaitsecs=30 ;Logs (all activity redirected to stdout so it can be seen through "docker logs" redirect_stderr=true @@ -20,7 +20,8 @@ killasgroup=true autostart=%(ENV_SE_RECORD_VIDEO)s startsecs=0 autorestart=%(ENV_SE_RECORD_VIDEO)s -stopsignal=KILL +stopsignal=TERM +stopwaitsecs=5 ;Logs (all activity redirected to stdout so it can be seen through "docker logs" redirect_stderr=true diff --git a/Video/video.sh b/Video/video.sh index e96d1fb09..0a6c10dcc 100755 --- a/Video/video.sh +++ b/Video/video.sh @@ -212,9 +212,15 @@ function graceful_exit() { wait_util_uploader_shutdown } +_graceful_exit_done=false function graceful_exit_force() { + if [[ "$_graceful_exit_done" = "true" ]]; then + return + fi + _graceful_exit_done=true graceful_exit - kill -SIGTERM "$(cat ${SE_SUPERVISORD_PID_FILE})" 2>/dev/null + # Supervisord signaling is delegated to the Python controller (video_recorder.py) + # which handles it uniformly for both shell and event-driven modes. echo "$(date -u +"${ts_format}") [${process_name}] - Ready to shutdown the recorder" exit 0 } @@ -234,7 +240,7 @@ if [[ "${VIDEO_UPLOAD_ENABLED}" != "true" ]] && [[ "${VIDEO_FILE_NAME}" != "auto -probesize 32M -analyzeduration 0 -y -f x11grab -video_size ${VIDEO_SIZE} -r ${FRAME_RATE} \ -i ${DISPLAY} ${SE_AUDIO_SOURCE} -codec:v ${CODEC} ${PRESET:-"-preset veryfast"} \ -tune zerolatency -crf ${SE_VIDEO_CRF:-28} -maxrate ${SE_VIDEO_MAXRATE:-1000k} -bufsize ${SE_VIDEO_BUFSIZE:-2000k} \ - -pix_fmt yuv420p -movflags +faststart "$video_file" & + -pix_fmt yuv420p -movflags frag_keyframe+empty_moov+default_base_moof "$video_file" & FFMPEG_PID=$! if ps -p $FFMPEG_PID >/dev/null; then wait $FFMPEG_PID @@ -270,7 +276,7 @@ else -probesize 32M -analyzeduration 0 -y -f x11grab -video_size ${VIDEO_SIZE} -r ${FRAME_RATE} \ -i ${DISPLAY} ${SE_AUDIO_SOURCE} -codec:v ${CODEC} ${PRESET:-"-preset veryfast"} \ -tune zerolatency -crf ${SE_VIDEO_CRF:-28} -maxrate ${SE_VIDEO_MAXRATE:-1000k} -bufsize ${SE_VIDEO_BUFSIZE:-2000k} \ - -pix_fmt yuv420p -movflags +faststart "$video_file" & + -pix_fmt yuv420p -movflags frag_keyframe+empty_moov+default_base_moof "$video_file" & FFMPEG_PID=$! if ps -p $FFMPEG_PID >/dev/null; then recording_started="true" diff --git a/Video/video_ready.py b/Video/video_ready.py index 8787d321d..00f13700a 100755 --- a/Video/video_ready.py +++ b/Video/video_ready.py @@ -28,7 +28,10 @@ def do_GET(self): def graceful_shutdown(signum, frame): print("Trapped SIGTERM/SIGINT/x so shutting down video-ready...") - httpd.shutdown() + # httpd.shutdown() must be called from a different thread than serve_forever() + # or it deadlocks. video-ready has no state to drain, so close the socket + # and exit directly — supervisord will see the clean exit immediately. + httpd.server_close() sys.exit(0) diff --git a/Video/video_recorder.py b/Video/video_recorder.py index 0903d98d1..84fb1f54e 100755 --- a/Video/video_recorder.py +++ b/Video/video_recorder.py @@ -6,6 +6,12 @@ When event-driven mode is enabled, this launches a single unified service that handles both recording and uploading with shared state management. + +After the video service exits for any reason (normal drain, session end, or +supervisord-initiated shutdown), this controller signals supervisord so the +container shuts down. Centralising this here means both shell and event-driven +modes have identical container-lifecycle behaviour without video.sh needing to +know about supervisord. """ import os @@ -14,12 +20,47 @@ import sys +def _signal_supervisord() -> None: + """Signal supervisord to initiate a container-wide shutdown. + + Safe to call even when supervisord is already shutting down — it will + simply ignore a repeated SIGTERM if it is already in SHUTDOWN state. + """ + pid_file = os.environ.get("SE_SUPERVISORD_PID_FILE", "") + if not pid_file: + return + try: + with open(pid_file) as f: + pid = int(f.read().strip()) + os.kill(pid, signal.SIGTERM) + print("[video.recorder] - Signaled supervisord to shut down") + except (OSError, ValueError, FileNotFoundError): + pass + + def main(): event_driven = os.environ.get("SE_VIDEO_EVENT_DRIVEN", "false").lower() == "true" if event_driven: print("Starting unified event-driven video service...") print("This service handles both recording and uploading with shared state.") + + # Capture whether shutdown was externally initiated (SIGTERM/SIGINT) + # before asyncio.run() replaces the signal handlers via add_signal_handler. + _external_shutdown = [False] + + def _mark_external_shutdown(signum, frame): + _external_shutdown[0] = True + # This handler is only reachable before asyncio.run() installs its + # own handlers via loop.add_signal_handler(). Setting the flag and + # returning would swallow the signal — nothing would act on it and + # the process would hang inside asyncio.run() indefinitely. + # Exit immediately so supervisord sees a clean stop. + sys.exit(0) + + signal.signal(signal.SIGTERM, _mark_external_shutdown) + signal.signal(signal.SIGINT, _mark_external_shutdown) + try: import asyncio @@ -31,6 +72,11 @@ def main(): print("Ensure pyzmq is installed: pip install pyzmq") print("Falling back to shell-based recording...") _run_shell_recorder() + return + + # Only trigger container shutdown for self-initiated exits (drain). + if not _external_shutdown[0]: + _signal_supervisord() else: print("Starting shell-based video recording...") _run_shell_recorder() @@ -38,17 +84,34 @@ def main(): def _run_shell_recorder(): proc = subprocess.Popen(["/opt/bin/video.sh"]) + _external_shutdown = False # True when supervisord (or user) told us to stop def forward_signal(signum, frame): - try: - proc.send_signal(signum) - except ProcessLookupError: - pass # Process already exited before signal was forwarded + nonlocal _external_shutdown + # Forward the signal to video.sh at most once. supervisord uses + # killasgroup=true so video.sh already received the signal directly; + # re-forwarding on every re-entrant call amplifies the SIGTERM + # ping-pong and can keep the process alive for 60 s. + if not _external_shutdown: + _external_shutdown = True + try: + proc.send_signal(signum) + except ProcessLookupError: + pass # Process already exited before signal was forwarded proc.wait() signal.signal(signal.SIGTERM, forward_signal) signal.signal(signal.SIGINT, forward_signal) rc = proc.wait() + + # Signal supervisord only for self-initiated exits (drain, node gone). + # If the shutdown came FROM supervisord (_external_shutdown=True) it is + # already in SHUTDOWN state — signalling it again is a no-op at best and + # confusing at worst. If the recorder crashed (rc != 0) we must not bring + # down the Selenium process alongside it. + if not _external_shutdown and rc == 0: + _signal_supervisord() + if rc != 0: sys.exit(rc) diff --git a/Video/video_service.py b/Video/video_service.py index 8c0de78f9..ecdbad9b5 100755 --- a/Video/video_service.py +++ b/Video/video_service.py @@ -355,42 +355,51 @@ async def wait_for_node_ready(self) -> None: f"Waiting for Node /status endpoint: {node_status_url} " f"(verify_ssl={self.node_status_verify_ssl})" ) - while not self.shutdown_event.is_set(): + def _fetch_status() -> Optional[dict]: + """Blocking HTTP fetch run in a thread to avoid blocking the event loop.""" + req = Request(node_status_url, headers=headers) try: - req = Request(node_status_url, headers=headers) if ssl_context is not None: resp_ctx = urlopen(req, timeout=5, context=ssl_context) else: resp_ctx = urlopen(req, timeout=5) - with resp_ctx as resp: if resp.status == 200: - body = json.loads(resp.read().decode("utf-8")) - - if self.record_standalone: - nodes = body.get("value", {}).get("nodes", []) - if nodes: - node_info = nodes[0] - self.node_id = node_info.get("id") - self.node_external_uri = node_info.get("externalUri") - else: - # Fallback: sidecar connected directly to a node - # (e.g. dynamic grid where /status returns singular "node") - node_info = body.get("value", {}).get("node", {}) - self.node_id = node_info.get("nodeId") or node_info.get("id") - self.node_external_uri = node_info.get("externalUri") + return json.loads(resp.read().decode("utf-8")) + except (URLError, OSError, json.JSONDecodeError, ValueError): + pass + return None + + while not self.shutdown_event.is_set(): + try: + # Run blocking urlopen in a thread so SIGTERM can be processed + # immediately by the event loop without waiting up to 5s. + body = await asyncio.to_thread(_fetch_status) + if body is not None: + if self.record_standalone: + nodes = body.get("value", {}).get("nodes", []) + if nodes: + node_info = nodes[0] + self.node_id = node_info.get("id") + self.node_external_uri = node_info.get("externalUri") else: + # Fallback: sidecar connected directly to a node + # (e.g. dynamic grid where /status returns singular "node") node_info = body.get("value", {}).get("node", {}) - self.node_id = node_info.get("nodeId") + self.node_id = node_info.get("nodeId") or node_info.get("id") self.node_external_uri = node_info.get("externalUri") - - if self.node_id: - logger.info(f"Node is ready. ID: {self.node_id}, URI: {self.node_external_uri}") - return - else: - logger.warning("Node /status responded but nodeId is missing, retrying...") - except (URLError, OSError, json.JSONDecodeError, ValueError) as e: - logger.debug(f"Node not ready yet: {e}") + else: + node_info = body.get("value", {}).get("node", {}) + self.node_id = node_info.get("nodeId") + self.node_external_uri = node_info.get("externalUri") + + if self.node_id: + logger.info(f"Node is ready. ID: {self.node_id}, URI: {self.node_external_uri}") + return + else: + logger.warning("Node /status responded but nodeId is missing, retrying...") + else: + logger.debug(f"Node not ready yet: {node_status_url}") except Exception as e: logger.warning(f"Unexpected error polling Node /status: {e}") @@ -454,7 +463,7 @@ async def start_recording(self, session: SessionState) -> bool: "-pix_fmt", "yuv420p", "-movflags", - "+faststart", + "frag_keyframe+empty_moov+default_base_moof", video_path, ] ) @@ -488,35 +497,47 @@ async def start_recording(self, session: SessionState) -> bool: async def stop_recording(self, session: SessionState) -> bool: """Stop ffmpeg recording for a session.""" - if session.ffmpeg_process is None: - logger.warning(f"No recording in progress for session {session.session_id}") + # Claim the process atomically before the first await. Asyncio is + # cooperative: no other coroutine can run between the check and the + # assignment, so a concurrent caller (e.g. cleanup() racing with + # handle_session_closed()) will see None here and return early, + # preventing double-terminate and double-upload. + proc = session.ffmpeg_process + if proc is None: return False - - session.status = SessionStatus.STOPPING + session.ffmpeg_process = None + + # Only move to STOPPING if we are still in the RECORDING state. + # handle_session_closed() sets status to CLOSED before calling us; + # overwriting that with STOPPING would prevent _cleanup_session_delayed + # from ever cleaning up the session (it checks status == CLOSED). + if session.status == SessionStatus.RECORDING: + session.status = SessionStatus.STOPPING session.end_time = datetime.now() try: - session.ffmpeg_process.terminate() + proc.terminate() try: - _, stderr_bytes = await asyncio.wait_for(session.ffmpeg_process.communicate(), timeout=10.0) + _, stderr_bytes = await asyncio.wait_for(proc.communicate(), timeout=10.0) except asyncio.TimeoutError: logger.warning(f"ffmpeg did not stop gracefully for {session.session_id}, killing") - session.ffmpeg_process.kill() - _, stderr_bytes = await session.ffmpeg_process.communicate() + proc.kill() + _, stderr_bytes = await proc.communicate() - rc = session.ffmpeg_process.returncode + rc = proc.returncode if stderr_bytes: stderr_text = stderr_bytes.decode(errors="replace").strip() if stderr_text: logger.warning(f"ffmpeg stderr for {session.session_id}: {stderr_text}") - session.ffmpeg_process = None # 255 is ffmpeg's own graceful-stop exit code (exit_program(255) in its SIGTERM handler). if rc not in (0, 255, -signal.SIGTERM, -signal.SIGKILL): logger.error(f"ffmpeg exited with unexpected code {rc} for {session.session_id}") + session.status = SessionStatus.CLOSED return False self.recorded_count += 1 + session.status = SessionStatus.CLOSED duration = session.duration_seconds logger.info( f"Stopped recording: session={session.session_id}, " f"duration={duration:.1f}s" @@ -526,6 +547,7 @@ async def stop_recording(self, session: SessionState) -> bool: return True except Exception as e: logger.error(f"Failed to stop recording for {session.session_id}: {e}") + session.status = SessionStatus.CLOSED return False # ==================== Upload Functions ==================== @@ -597,7 +619,12 @@ async def process_upload(self, task: UploadTask) -> None: except asyncio.TimeoutError: logger.warning(f"Upload timed out after {self.upload_timeout}s: {task.video_file}, killing process") proc.kill() - await proc.communicate() + _, stderr_bytes = await proc.communicate() + if stderr_bytes: + logger.debug( + f"Upload stderr at timeout for {task.video_file}: " + f"{stderr_bytes.decode(errors='replace').strip()}" + ) return finally: try: @@ -627,6 +654,10 @@ async def upload_worker(self) -> None: logger.warning("Upload worker cancelled, pending uploads may be lost") for t in active_tasks: t.cancel() + # Await cancelled tasks so they are not left as orphaned + # asyncio tasks (which causes "Task destroyed but pending" warnings + # and makes the active_uploads kill loop in run() the sole cleanup). + await asyncio.gather(*active_tasks, return_exceptions=True) raise # None is the sentinel pushed by cleanup() to signal no more uploads @@ -720,8 +751,11 @@ async def handle_session_closed(self, data: dict) -> None: # Stop recording if in progress if session.ffmpeg_process is not None: - await self.stop_recording(session) - await self.queue_upload(session) + stopped = await self.stop_recording(session) + if stopped: + await self.queue_upload(session) + else: + logger.warning(f"Recording stop failed for {session_id}, skipping upload") # Clean up session after a delay (keep for potential late events). # Tracked so cleanup() can cancel these on shutdown instead of waiting 60s. @@ -924,8 +958,11 @@ async def cleanup(self) -> None: for session in active_sessions: logger.info(f"Stopping recording: {session.session_id}") - await self.stop_recording(session) - await self.queue_upload(session) + stopped = await self.stop_recording(session) + if stopped: + await self.queue_upload(session) + else: + logger.warning(f"Recording stop failed for {session.session_id}, skipping upload") # Push sentinel so the upload worker exits after draining the queue. # run() is responsible for awaiting the upload task with a timeout. diff --git a/tests/docker-compose-v3-test-node-docker.yaml b/tests/docker-compose-v3-test-node-docker.yaml index e0d9d5999..df9df86f7 100644 --- a/tests/docker-compose-v3-test-node-docker.yaml +++ b/tests/docker-compose-v3-test-node-docker.yaml @@ -21,6 +21,7 @@ services: - SE_OPTS=--enable-managed-downloads ${SELENIUM_ENABLE_MANAGED_DOWNLOADS} - SE_BROWSER_ARGS_DISABLE_DSHM=--disable-dev-shm-usage - SE_LOG_LEVEL=${LOG_LEVEL} + - SE_VIDEO_EVENT_DRIVEN=${VIDEO_EVENT_DRIVEN} - SE_VIDEO_RECORD_STANDALONE=true - SE_VIDEO_FILE_NAME=${VIDEO_FILE_NAME} - SE_VIDEO_FILE_NAME_SUFFIX=${VIDEO_FILE_NAME_SUFFIX} diff --git a/tests/docker-compose-v3-test-standalone-docker.yaml b/tests/docker-compose-v3-test-standalone-docker.yaml index 91b6c0771..c3add7798 100644 --- a/tests/docker-compose-v3-test-standalone-docker.yaml +++ b/tests/docker-compose-v3-test-standalone-docker.yaml @@ -14,6 +14,8 @@ services: - SE_NODE_ENABLE_MANAGED_DOWNLOADS=${SELENIUM_ENABLE_MANAGED_DOWNLOADS} - SE_NODE_GRID_URL=${GRID_URL} - SE_OPTS=--log-level ${LOG_LEVEL} --enable-managed-downloads ${SELENIUM_ENABLE_MANAGED_DOWNLOADS} + - SE_EVENT_BUS_HOST=selenium-hub + - SE_VIDEO_EVENT_DRIVEN=${VIDEO_EVENT_DRIVEN} - SE_VIDEO_RECORD_STANDALONE=${RECORD_STANDALONE} - SE_VIDEO_FILE_NAME=${VIDEO_FILE_NAME} - SE_VIDEO_FILE_NAME_SUFFIX=${VIDEO_FILE_NAME_SUFFIX}