From 6ec50cd3a6ab7d9287506b0e28b2a41449d625db Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 23 Apr 2026 06:51:40 +0000 Subject: [PATCH 1/5] feat(room-io): add jsonFormat option for timed transcription output Port of livekit/agents#5472. Adds `jsonFormat` to `RoomOutputOptions`; when enabled, chunks published on the `lk.transcription` datastream topic are serialized as newline-delimited JSON objects with `text` and `start_time`/`end_time` fields when the chunk is a `TimedString`. The `TranscriptionSynchronizer` now emits `TimedString` items with `end_time` reflecting synchronized playback timing so subscribers can align chunks against playback without extra bookkeeping. --- .changeset/room-io-json-transcription.md | 5 ++ agents/src/voice/room_io/_output.ts | 58 +++++++++++++++++++ agents/src/voice/room_io/room_io.ts | 13 ++++- .../src/voice/transcription/synchronizer.ts | 27 +++++++-- 4 files changed, 97 insertions(+), 6 deletions(-) create mode 100644 .changeset/room-io-json-transcription.md diff --git a/.changeset/room-io-json-transcription.md b/.changeset/room-io-json-transcription.md new file mode 100644 index 000000000..86de9544c --- /dev/null +++ b/.changeset/room-io-json-transcription.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": minor +--- + +feat(room-io): add `jsonFormat` option on `RoomOutputOptions` for timed transcription output. When enabled, each chunk published on the `lk.transcription` datastream topic is a JSON object with `text`, and `start_time`/`end_time` when the chunk is a `TimedString`. Ported from livekit/agents#5472. diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index 01dd31625..34609fea5 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -125,9 +125,67 @@ abstract class BaseParticipantTranscriptionOutput extends TextOutput { protected abstract handleFlush(): void; } +// Ref: python livekit-agents/livekit/agents/voice/room_io/_output.py - 363-383 lines +export interface ParticipantTranscriptionOutputOptions { + /** When true, each chunk sent on the `lk.transcription` datastream topic is serialized + * as a JSON object with `text`, and `start_time`/`end_time`/`confidence`/ + * `start_time_offset` when the captured value is a TimedString. Each object is + * suffixed with a newline so subscribers can parse the stream line-by-line. */ + jsonFormat?: boolean; +} + export class ParticipantTranscriptionOutput extends BaseParticipantTranscriptionOutput { private writer: TextStreamWriter | null = null; private flushTask: Task | null = null; + private jsonFormat: boolean; + + constructor( + room: Room, + isDeltaStream: boolean, + participant: Participant | string | null, + options: ParticipantTranscriptionOutputOptions = {}, + ) { + super(room, isDeltaStream, participant); + this.jsonFormat = options.jsonFormat ?? false; + } + + override async captureText(text: string | TimedString) { + if (!this.participantIdentity) { + return; + } + + const textStr = isTimedString(text) ? text.text : text; + this.latestText = textStr; + + // Ref: python livekit-agents/livekit/agents/voice/room_io/_output.py - 450-462 lines + // When json_format is enabled, serialize each chunk as a protobuf-compatible JSON dict. + // The Python implementation uses `agent_pb.TimedString` + `MessageToDict(preserving_proto_field_name=True)`. + // We emit the same snake_case shape directly (no protobuf runtime dependency on the JS side). + const payload = this.jsonFormat ? this.encodeJsonChunk(text) : textStr; + + await this.handleCaptureText(payload); + } + + private encodeJsonChunk(text: string | TimedString): string { + const obj: Record = { + text: isTimedString(text) ? text.text : String(text), + }; + if (isTimedString(text)) { + if (text.startTime !== undefined) { + obj.start_time = text.startTime; + } + if (text.endTime !== undefined) { + obj.end_time = text.endTime; + } + if (text.confidence !== undefined) { + obj.confidence = text.confidence; + } + if (text.startTimeOffset !== undefined) { + obj.start_time_offset = text.startTimeOffset; + } + } + return JSON.stringify(obj) + '\n'; + } protected async handleCaptureText(text: string): Promise { if (this.flushTask && !this.flushTask.done) { diff --git a/agents/src/voice/room_io/room_io.ts b/agents/src/voice/room_io/room_io.ts index 312de4eba..c6869c861 100644 --- a/agents/src/voice/room_io/room_io.ts +++ b/agents/src/voice/room_io/room_io.ts @@ -101,6 +101,13 @@ export interface RoomOutputOptions { Defaults to the AudioSource internal default (1000ms). */ queueSizeMs?: number; + // Ref: python livekit-agents/livekit/agents/voice/room_io/types.py - 102-103 lines + /** Send the transcription as a JSON dict for each chunk on the `lk.transcription` + datastream topic, including `start_time`/`end_time` timestamps if the chunk is a + TimedString. Each JSON object is suffixed with a newline so clients can parse the + stream line-by-line. + */ + jsonFormat: boolean; } const DEFAULT_ROOM_INPUT_OPTIONS: RoomInputOptions = { @@ -120,6 +127,7 @@ const DEFAULT_ROOM_OUTPUT_OPTIONS: RoomOutputOptions = { audioEnabled: true, syncTranscription: true, audioPublishOptions: new TrackPublishOptions({ source: TrackSource.SOURCE_MICROPHONE }), + jsonFormat: false, }; export class RoomIO { @@ -339,7 +347,10 @@ export class RoomIO { options.isDeltaStream, options.participant, ), - new ParticipantTranscriptionOutput(this.room, options.isDeltaStream, options.participant), + // Ref: python livekit-agents/livekit/agents/voice/room_io/room_io.py - 159 lines + new ParticipantTranscriptionOutput(this.room, options.isDeltaStream, options.participant, { + jsonFormat: this.outputOptions.jsonFormat, + }), ]); } diff --git a/agents/src/voice/transcription/synchronizer.ts b/agents/src/voice/transcription/synchronizer.ts index 75b3de4ba..f47e7ddbc 100644 --- a/agents/src/voice/transcription/synchronizer.ts +++ b/agents/src/voice/transcription/synchronizer.ts @@ -13,6 +13,7 @@ import { type PlaybackFinishedEvent, TextOutput, type TimedString, + createTimedString, isTimedString, } from '../io.js'; @@ -143,8 +144,11 @@ class SegmentSynchronizerImpl { private textData: TextData; private audioData: AudioData; private speed: number; - private outputStream: IdentityTransform; - private outputStreamWriter: WritableStreamDefaultWriter; + // Ref: python livekit-agents/livekit/agents/voice/transcription/synchronizer.py - 151 lines + // Emit TimedString objects so downstream outputs (e.g. RoomIO's json_format) can + // attach `end_time` reflecting synchronized playback timing. + private outputStream: IdentityTransform; + private outputStreamWriter: WritableStreamDefaultWriter; private captureTask: Promise; private startWallTime?: number; @@ -200,7 +204,7 @@ class SegmentSynchronizerImpl { return this.textData.pushedText.length > this.textData.forwardedText.length; } - get readable(): ReadableStream { + get readable(): ReadableStream { return this.outputStream.readable; } @@ -342,7 +346,14 @@ class SegmentSynchronizerImpl { } if (this.playbackCompleted) { - this.outputStreamWriter.write(sentence.slice(textCursor, endPos)); + // Ref: python livekit-agents/livekit/agents/voice/transcription/synchronizer.py - 331-333 lines + const playedWord = sentence.slice(textCursor, endPos); + this.outputStreamWriter.write( + createTimedString({ + text: playedWord, + endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined, + }), + ); textCursor = endPos; continue; } @@ -379,7 +390,13 @@ class SegmentSynchronizerImpl { await this.sleepIfNotClosed(delayTime / 2); const forwardedWord = sentence.slice(textCursor, endPos); - this.outputStreamWriter.write(forwardedWord); + // Ref: python livekit-agents/livekit/agents/voice/transcription/synchronizer.py - 363-368 lines + this.outputStreamWriter.write( + createTimedString({ + text: forwardedWord, + endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined, + }), + ); await this.sleepIfNotClosed(delayTime / 2); From 41b1143de394a434dafcab029845da0fdb89c37f Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 23 Apr 2026 11:36:00 +0000 Subject: [PATCH 2/5] fix(room-io): preserve jsonFormat payload on non-delta flush; emit TimedString for trailing sentence fragment - `ParticipantTranscriptionOutput` now stores the JSON-encoded payload (not the raw text) as `latestText` when `jsonFormat` is enabled, so the non-delta `FINAL=true` flush publishes the same newline-delimited JSON shape as interim chunks. Without this, `userTranscriptOutput` (which uses `isDeltaStream: false`) broke line-by-line JSON parsers on the terminal message. Mirrors the Python behavior in `_output.py` where `_latest_text` is reassigned to the encoded payload before `_latest_text = text`. - `SegmentSynchronizerImpl` now wraps the trailing sentence fragment (anything after the last word) as a `TimedString` with `endTime`, matching every other emission on the same output stream. --- agents/src/voice/room_io/_output.ts | 14 +++++++++----- agents/src/voice/transcription/synchronizer.ts | 10 +++++++++- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index 34609fea5..74be79dbf 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -154,14 +154,18 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription return; } - const textStr = isTimedString(text) ? text.text : text; - this.latestText = textStr; - - // Ref: python livekit-agents/livekit/agents/voice/room_io/_output.py - 450-462 lines + // Ref: python livekit-agents/livekit/agents/voice/room_io/_output.py - 447-464 lines // When json_format is enabled, serialize each chunk as a protobuf-compatible JSON dict. // The Python implementation uses `agent_pb.TimedString` + `MessageToDict(preserving_proto_field_name=True)`. // We emit the same snake_case shape directly (no protobuf runtime dependency on the JS side). - const payload = this.jsonFormat ? this.encodeJsonChunk(text) : textStr; + // latestText must hold the encoded payload so non-delta flush (FINAL=true) republishes the + // same newline-delimited JSON format as the interim chunks. + const payload = this.jsonFormat + ? this.encodeJsonChunk(text) + : isTimedString(text) + ? text.text + : text; + this.latestText = payload; await this.handleCaptureText(payload); } diff --git a/agents/src/voice/transcription/synchronizer.ts b/agents/src/voice/transcription/synchronizer.ts index f47e7ddbc..c2325c24f 100644 --- a/agents/src/voice/transcription/synchronizer.ts +++ b/agents/src/voice/transcription/synchronizer.ts @@ -407,7 +407,15 @@ class SegmentSynchronizerImpl { if (textCursor < sentence.length) { const remaining = sentence.slice(textCursor); - this.outputStreamWriter.write(remaining); + // Keep the trailing fragment (whitespace/punctuation after the last word) shaped like + // the word emissions above so downstream consumers (e.g. jsonFormat) see a uniform + // stream of TimedString chunks. + this.outputStreamWriter.write( + createTimedString({ + text: remaining, + endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined, + }), + ); } } } From 7003f6f3b3421a76fe023fbbc9bf38d058ef3b4a Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Mon, 27 Apr 2026 15:06:06 +0800 Subject: [PATCH 3/5] changes verified --- agents/src/stt/stt.ts | 1 - agents/src/voice/room_io/_output.ts | 3 --- agents/src/voice/room_io/room_io.ts | 3 --- agents/src/voice/transcription/synchronizer.ts | 8 ++++++-- 4 files changed, 6 insertions(+), 9 deletions(-) diff --git a/agents/src/stt/stt.ts b/agents/src/stt/stt.ts index 48ceff5ba..2e378d94a 100644 --- a/agents/src/stt/stt.ts +++ b/agents/src/stt/stt.ts @@ -74,7 +74,6 @@ export interface SpeechData { * * May contain multiple entries when a single utterance spans multiple source languages. */ - // Ref: python livekit-agents/livekit/agents/stt/stt.py - 62-68 lines sourceLanguages?: LanguageCode[]; } diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index 74be79dbf..ef8c9ff8a 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -125,7 +125,6 @@ abstract class BaseParticipantTranscriptionOutput extends TextOutput { protected abstract handleFlush(): void; } -// Ref: python livekit-agents/livekit/agents/voice/room_io/_output.py - 363-383 lines export interface ParticipantTranscriptionOutputOptions { /** When true, each chunk sent on the `lk.transcription` datastream topic is serialized * as a JSON object with `text`, and `start_time`/`end_time`/`confidence`/ @@ -154,7 +153,6 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription return; } - // Ref: python livekit-agents/livekit/agents/voice/room_io/_output.py - 447-464 lines // When json_format is enabled, serialize each chunk as a protobuf-compatible JSON dict. // The Python implementation uses `agent_pb.TimedString` + `MessageToDict(preserving_proto_field_name=True)`. // We emit the same snake_case shape directly (no protobuf runtime dependency on the JS side). @@ -166,7 +164,6 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription ? text.text : text; this.latestText = payload; - await this.handleCaptureText(payload); } diff --git a/agents/src/voice/room_io/room_io.ts b/agents/src/voice/room_io/room_io.ts index c6869c861..939506244 100644 --- a/agents/src/voice/room_io/room_io.ts +++ b/agents/src/voice/room_io/room_io.ts @@ -101,7 +101,6 @@ export interface RoomOutputOptions { Defaults to the AudioSource internal default (1000ms). */ queueSizeMs?: number; - // Ref: python livekit-agents/livekit/agents/voice/room_io/types.py - 102-103 lines /** Send the transcription as a JSON dict for each chunk on the `lk.transcription` datastream topic, including `start_time`/`end_time` timestamps if the chunk is a TimedString. Each JSON object is suffixed with a newline so clients can parse the @@ -172,7 +171,6 @@ export class RoomIO { this.room = room; this.inputOptions = { ...DEFAULT_ROOM_INPUT_OPTIONS, ...inputOptions }; this.outputOptions = { ...DEFAULT_ROOM_OUTPUT_OPTIONS, ...outputOptions }; - this.userTranscriptWriter = this.userTranscriptStream.writable.getWriter(); this.participantIdentity = participant @@ -347,7 +345,6 @@ export class RoomIO { options.isDeltaStream, options.participant, ), - // Ref: python livekit-agents/livekit/agents/voice/room_io/room_io.py - 159 lines new ParticipantTranscriptionOutput(this.room, options.isDeltaStream, options.participant, { jsonFormat: this.outputOptions.jsonFormat, }), diff --git a/agents/src/voice/transcription/synchronizer.ts b/agents/src/voice/transcription/synchronizer.ts index 1df3f6cba..d35322152 100644 --- a/agents/src/voice/transcription/synchronizer.ts +++ b/agents/src/voice/transcription/synchronizer.ts @@ -144,7 +144,6 @@ class SegmentSynchronizerImpl { private textData: TextData; private audioData: AudioData; private speed: number; - // Ref: python livekit-agents/livekit/agents/voice/transcription/synchronizer.py - 151 lines // Emit TimedString objects so downstream outputs (e.g. RoomIO's json_format) can // attach `end_time` reflecting synchronized playback timing. private outputStream: IdentityTransform; @@ -403,7 +402,12 @@ class SegmentSynchronizerImpl { } await this.sleepIfNotClosed(delayTime / 2); - this.outputStreamWriter.write(forwardedWord); + this.outputStreamWriter.write( + createTimedString({ + text: forwardedWord, + endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined, + }), + ); await this.sleepIfNotClosed(delayTime / 2); this.textData.forwardedHyphens += wordHyphens; From 979bafa5d8c3c32cb23bbff5cd2e21632087c14f Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 28 Apr 2026 05:24:43 +0000 Subject: [PATCH 4/5] chore(changeset): bump @livekit/agents as patch instead of minor --- .changeset/room-io-json-transcription.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/room-io-json-transcription.md b/.changeset/room-io-json-transcription.md index 86de9544c..81faec92f 100644 --- a/.changeset/room-io-json-transcription.md +++ b/.changeset/room-io-json-transcription.md @@ -1,5 +1,5 @@ --- -"@livekit/agents": minor +"@livekit/agents": patch --- feat(room-io): add `jsonFormat` option on `RoomOutputOptions` for timed transcription output. When enabled, each chunk published on the `lk.transcription` datastream topic is a JSON object with `text`, and `start_time`/`end_time` when the chunk is a `TimedString`. Ported from livekit/agents#5472. From 698bc7f0f78c4bb24afff42d6f0040fc6f763431 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Thu, 30 Apr 2026 10:56:59 +0800 Subject: [PATCH 5/5] refactor(room-io): use protocol TimedString for jsonFormat output (#1348) --- agents/package.json | 2 +- agents/src/voice/room_io/_output.ts | 31 ++++++++++------------------- pnpm-lock.yaml | 14 ++++++------- 3 files changed, 18 insertions(+), 29 deletions(-) diff --git a/agents/package.json b/agents/package.json index c1111739c..0bac15227 100644 --- a/agents/package.json +++ b/agents/package.json @@ -50,7 +50,7 @@ "@bufbuild/protobuf": "^1.10.0", "@ffmpeg-installer/ffmpeg": "^1.1.0", "@livekit/mutex": "^1.1.1", - "@livekit/protocol": "^1.45.3", + "@livekit/protocol": "^1.45.6", "@livekit/typed-emitter": "^3.0.0", "@livekit/throws-transformer": "0.1.8", "@opentelemetry/api": "^1.9.0", diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index ef8c9ff8a..6d68fa035 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { AgentSession as pb } from '@livekit/protocol'; import type { RemoteParticipant } from '@livekit/rtc-node'; import { type AudioFrame, @@ -153,9 +154,6 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription return; } - // When json_format is enabled, serialize each chunk as a protobuf-compatible JSON dict. - // The Python implementation uses `agent_pb.TimedString` + `MessageToDict(preserving_proto_field_name=True)`. - // We emit the same snake_case shape directly (no protobuf runtime dependency on the JS side). // latestText must hold the encoded payload so non-delta flush (FINAL=true) republishes the // same newline-delimited JSON format as the interim chunks. const payload = this.jsonFormat @@ -168,24 +166,15 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription } private encodeJsonChunk(text: string | TimedString): string { - const obj: Record = { - text: isTimedString(text) ? text.text : String(text), - }; - if (isTimedString(text)) { - if (text.startTime !== undefined) { - obj.start_time = text.startTime; - } - if (text.endTime !== undefined) { - obj.end_time = text.endTime; - } - if (text.confidence !== undefined) { - obj.confidence = text.confidence; - } - if (text.startTimeOffset !== undefined) { - obj.start_time_offset = text.startTimeOffset; - } - } - return JSON.stringify(obj) + '\n'; + const isTimed = isTimedString(text); + const message = new pb.TimedString({ + text: isTimed ? text.text : text, + startTime: isTimed ? text.startTime : undefined, + endTime: isTimed ? text.endTime : undefined, + confidence: isTimed ? text.confidence : undefined, + startTimeOffset: isTimed ? text.startTimeOffset : undefined, + }); + return message.toJsonString({ useProtoFieldName: true }) + '\n'; } protected async handleCaptureText(text: string): Promise { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3f41f05ea..5061514cd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -116,8 +116,8 @@ importers: specifier: ^1.1.1 version: 1.1.1 '@livekit/protocol': - specifier: ^1.45.3 - version: 1.45.3 + specifier: ^1.45.6 + version: 1.45.6 '@livekit/throws-transformer': specifier: 0.1.8 version: 0.1.8(typescript@5.4.5) @@ -2190,8 +2190,8 @@ packages: cpu: [x64] os: [win32] - '@livekit/protocol@1.45.3': - resolution: {integrity: sha512-WmMxBTsy4dRBqcrswFwUUlgq3Z0nnhOqKR6tX749Rb/PcB1yBMUtrHxZvcsS6qi3/5+86zHeVG+exmu1sZqfJg==} + '@livekit/protocol@1.45.6': + resolution: {integrity: sha512-YPDmrUiVe1EY/q/2bD+Fp+69DWq6LZgeH+G/KEbz07OIVf8hgAYzfb1FgiOdWLRpSj06+SuTmrOY604fWNuD3w==} '@livekit/rtc-ffi-bindings-darwin-arm64@0.12.52-patch.0': resolution: {integrity: sha512-IKUir6goV8yVRR7E2qrAP0JtH7gUyMkO0TG8G+dopO/fkXAsPpSealgI9fLcBJl0zhKK+eGCr741r6xR+xxsVw==} @@ -6463,7 +6463,7 @@ snapshots: '@livekit/noise-cancellation-win32-x64@0.1.9': optional: true - '@livekit/protocol@1.45.3': + '@livekit/protocol@1.45.6': dependencies: '@bufbuild/protobuf': 1.10.1 @@ -8866,14 +8866,14 @@ snapshots: livekit-server-sdk@2.13.3: dependencies: '@bufbuild/protobuf': 1.10.1 - '@livekit/protocol': 1.45.3 + '@livekit/protocol': 1.45.6 camelcase-keys: 9.1.3 jose: 5.2.4 livekit-server-sdk@2.14.1: dependencies: '@bufbuild/protobuf': 1.10.1 - '@livekit/protocol': 1.45.3 + '@livekit/protocol': 1.45.6 camelcase-keys: 9.1.3 jose: 5.2.4