diff --git a/.changeset/room-io-json-transcription.md b/.changeset/room-io-json-transcription.md new file mode 100644 index 000000000..81faec92f --- /dev/null +++ b/.changeset/room-io-json-transcription.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +feat(room-io): add `jsonFormat` option on `RoomOutputOptions` for timed transcription output. When enabled, each chunk published on the `lk.transcription` datastream topic is a JSON object with `text`, and `start_time`/`end_time` when the chunk is a `TimedString`. Ported from livekit/agents#5472. diff --git a/agents/package.json b/agents/package.json index c1111739c..0bac15227 100644 --- a/agents/package.json +++ b/agents/package.json @@ -50,7 +50,7 @@ "@bufbuild/protobuf": "^1.10.0", "@ffmpeg-installer/ffmpeg": "^1.1.0", "@livekit/mutex": "^1.1.1", - "@livekit/protocol": "^1.45.3", + "@livekit/protocol": "^1.45.6", "@livekit/typed-emitter": "^3.0.0", "@livekit/throws-transformer": "0.1.8", "@opentelemetry/api": "^1.9.0", diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index eee18f40e..deb7c92de 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { AgentSession as pb } from '@livekit/protocol'; import type { RemoteParticipant } from '@livekit/rtc-node'; import { type AudioFrame, @@ -130,9 +131,56 @@ abstract class BaseParticipantTranscriptionOutput extends TextOutput { protected abstract handleFlush(): void; } +export interface ParticipantTranscriptionOutputOptions { + /** When true, each chunk sent on the `lk.transcription` datastream topic is serialized + * as a JSON object with `text`, and `start_time`/`end_time`/`confidence`/ + * `start_time_offset` when the captured value is a TimedString. Each object is + * suffixed with a newline so subscribers can parse the stream line-by-line. */ + jsonFormat?: boolean; +} + export class ParticipantTranscriptionOutput extends BaseParticipantTranscriptionOutput { private writer: TextStreamWriter | null = null; private flushTask: Task | null = null; + private jsonFormat: boolean; + + constructor( + room: Room, + isDeltaStream: boolean, + participant: Participant | string | null, + options: ParticipantTranscriptionOutputOptions = {}, + ) { + super(room, isDeltaStream, participant); + this.jsonFormat = options.jsonFormat ?? false; + } + + override async captureText(text: string | TimedString) { + if (!this.participantIdentity) { + return; + } + + // latestText must hold the encoded payload so non-delta flush (FINAL=true) republishes the + // same newline-delimited JSON format as the interim chunks. + const payload = this.jsonFormat + ? this.encodeJsonChunk(text) + : isTimedString(text) + ? text.text + : text; + this.latestText = payload; + await this.handleCaptureText(payload); + } + + private encodeJsonChunk(text: string | TimedString): string { + const isTimed = isTimedString(text); + const message = new pb.TimedString({ + text: isTimed ? text.text : text, + startTime: isTimed ? text.startTime : undefined, + endTime: isTimed ? text.endTime : undefined, + confidence: isTimed ? text.confidence : undefined, + startTimeOffset: isTimed ? text.startTimeOffset : undefined, + }); + return message.toJsonString({ useProtoFieldName: true }) + '\n'; + } protected async handleCaptureText(text: string): Promise { if (this.flushTask && !this.flushTask.done) { diff --git a/agents/src/voice/room_io/room_io.ts b/agents/src/voice/room_io/room_io.ts index 312de4eba..939506244 100644 --- a/agents/src/voice/room_io/room_io.ts +++ b/agents/src/voice/room_io/room_io.ts @@ -101,6 +101,12 @@ export interface RoomOutputOptions { Defaults to the AudioSource internal default (1000ms). */ queueSizeMs?: number; + /** Send the transcription as a JSON dict for each chunk on the `lk.transcription` + datastream topic, including `start_time`/`end_time` timestamps if the chunk is a + TimedString. Each JSON object is suffixed with a newline so clients can parse the + stream line-by-line. + */ + jsonFormat: boolean; } const DEFAULT_ROOM_INPUT_OPTIONS: RoomInputOptions = { @@ -120,6 +126,7 @@ const DEFAULT_ROOM_OUTPUT_OPTIONS: RoomOutputOptions = { audioEnabled: true, syncTranscription: true, audioPublishOptions: new TrackPublishOptions({ source: TrackSource.SOURCE_MICROPHONE }), + jsonFormat: false, }; export class RoomIO { @@ -164,7 +171,6 @@ export class RoomIO { this.room = room; this.inputOptions = { ...DEFAULT_ROOM_INPUT_OPTIONS, ...inputOptions }; this.outputOptions = { ...DEFAULT_ROOM_OUTPUT_OPTIONS, ...outputOptions }; - this.userTranscriptWriter = this.userTranscriptStream.writable.getWriter(); this.participantIdentity = participant @@ -339,7 +345,9 @@ export class RoomIO { options.isDeltaStream, options.participant, ), - new ParticipantTranscriptionOutput(this.room, options.isDeltaStream, options.participant), + new ParticipantTranscriptionOutput(this.room, options.isDeltaStream, options.participant, { + jsonFormat: this.outputOptions.jsonFormat, + }), ]); } diff --git a/agents/src/voice/transcription/synchronizer.ts b/agents/src/voice/transcription/synchronizer.ts index 24d9c4f1a..386b38b21 100644 --- a/agents/src/voice/transcription/synchronizer.ts +++ b/agents/src/voice/transcription/synchronizer.ts @@ -13,6 +13,7 @@ import { type PlaybackFinishedEvent, TextOutput, type TimedString, + createTimedString, isTimedString, } from '../io.js'; @@ -143,8 +144,10 @@ class SegmentSynchronizerImpl { private textData: TextData; private audioData: AudioData; private speed: number; - private outputStream: IdentityTransform; - private outputStreamWriter: WritableStreamDefaultWriter; + // Emit TimedString objects so downstream outputs (e.g. RoomIO's json_format) can + // attach `end_time` reflecting synchronized playback timing. + private outputStream: IdentityTransform; + private outputStreamWriter: WritableStreamDefaultWriter; private captureTask: Promise; private startWallTime?: number; @@ -211,7 +214,7 @@ class SegmentSynchronizerImpl { return this.textData.pushedText.length > this.textData.forwardedText.length; } - get readable(): ReadableStream { + get readable(): ReadableStream { return this.outputStream.readable; } @@ -384,7 +387,16 @@ class SegmentSynchronizerImpl { pushedTextCursor = wordEnd; if (this.playbackCompleted) { - this.outputStreamWriter.write(forwardedWord); + this.outputStreamWriter.write( + createTimedString({ + text: forwardedWord, + endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined, + }), + ); + const cleanWords = this.options.splitWords(word); + const cleanWord = cleanWords.length > 0 ? cleanWords[0]![0] : word; + this.textData.forwardedHyphens += this.options.hyphenateWord(cleanWord).length; + this.textData.forwardedText += forwardedWord; continue; } @@ -421,12 +433,28 @@ class SegmentSynchronizerImpl { } await this.sleepIfNotClosed(delayTime / 2); - this.outputStreamWriter.write(forwardedWord); + this.outputStreamWriter.write( + createTimedString({ + text: forwardedWord, + endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined, + }), + ); await this.sleepIfNotClosed(delayTime / 2); this.textData.forwardedHyphens += wordHyphens; this.textData.forwardedText += forwardedWord; } + + if (pushedTextCursor < this.textData.pushedText.length) { + const remaining = this.textData.pushedText.slice(pushedTextCursor); + this.outputStreamWriter.write( + createTimedString({ + text: remaining, + endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined, + }), + ); + this.textData.forwardedText += remaining; + } } private calcHyphens(text: string): string[] { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8c8812b49..b7793fb77 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -116,8 +116,8 @@ importers: specifier: ^1.1.1 version: 1.1.1 '@livekit/protocol': - specifier: ^1.45.3 - version: 1.45.3 + specifier: ^1.45.6 + version: 1.45.6 '@livekit/throws-transformer': specifier: 0.1.8 version: 0.1.8(typescript@5.9.3) @@ -2094,8 +2094,8 @@ packages: cpu: [x64] os: [win32] - '@livekit/protocol@1.45.3': - resolution: {integrity: sha512-WmMxBTsy4dRBqcrswFwUUlgq3Z0nnhOqKR6tX749Rb/PcB1yBMUtrHxZvcsS6qi3/5+86zHeVG+exmu1sZqfJg==} + '@livekit/protocol@1.45.6': + resolution: {integrity: sha512-YPDmrUiVe1EY/q/2bD+Fp+69DWq6LZgeH+G/KEbz07OIVf8hgAYzfb1FgiOdWLRpSj06+SuTmrOY604fWNuD3w==} '@livekit/rtc-ffi-bindings-darwin-arm64@0.12.52-patch.0': resolution: {integrity: sha512-IKUir6goV8yVRR7E2qrAP0JtH7gUyMkO0TG8G+dopO/fkXAsPpSealgI9fLcBJl0zhKK+eGCr741r6xR+xxsVw==} @@ -6128,7 +6128,7 @@ snapshots: '@livekit/noise-cancellation-win32-x64@0.1.9': optional: true - '@livekit/protocol@1.45.3': + '@livekit/protocol@1.45.6': dependencies: '@bufbuild/protobuf': 1.10.1 @@ -8389,7 +8389,7 @@ snapshots: livekit-server-sdk@2.14.1: dependencies: '@bufbuild/protobuf': 1.10.1 - '@livekit/protocol': 1.45.3 + '@livekit/protocol': 1.45.6 camelcase-keys: 9.1.3 jose: 5.2.4