Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/room-io-json-transcription.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": patch
---

feat(room-io): add `jsonFormat` option on `RoomOutputOptions` for timed transcription output. When enabled, each chunk published on the `lk.transcription` datastream topic is a JSON object with `text`, and `start_time`/`end_time` when the chunk is a `TimedString`. Ported from livekit/agents#5472.
2 changes: 1 addition & 1 deletion agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"@bufbuild/protobuf": "^1.10.0",
"@ffmpeg-installer/ffmpeg": "^1.1.0",
"@livekit/mutex": "^1.1.1",
"@livekit/protocol": "^1.45.3",
"@livekit/protocol": "^1.45.6",
"@livekit/typed-emitter": "^3.0.0",
"@livekit/throws-transformer": "0.1.8",
"@opentelemetry/api": "^1.9.0",
Expand Down
48 changes: 48 additions & 0 deletions agents/src/voice/room_io/_output.ts
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Comment thread
toubatbrian marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { AgentSession as pb } from '@livekit/protocol';
import type { RemoteParticipant } from '@livekit/rtc-node';
import {
type AudioFrame,
Expand Down Expand Up @@ -130,9 +131,56 @@ abstract class BaseParticipantTranscriptionOutput extends TextOutput {
protected abstract handleFlush(): void;
}

export interface ParticipantTranscriptionOutputOptions {
/** When true, each chunk sent on the `lk.transcription` datastream topic is serialized
* as a JSON object with `text`, and `start_time`/`end_time`/`confidence`/
* `start_time_offset` when the captured value is a TimedString. Each object is
* suffixed with a newline so subscribers can parse the stream line-by-line. */
jsonFormat?: boolean;
}

export class ParticipantTranscriptionOutput extends BaseParticipantTranscriptionOutput {
Comment thread
toubatbrian marked this conversation as resolved.
private writer: TextStreamWriter | null = null;
private flushTask: Task<void> | null = null;
private jsonFormat: boolean;

constructor(
room: Room,
isDeltaStream: boolean,
participant: Participant | string | null,
options: ParticipantTranscriptionOutputOptions = {},
) {
super(room, isDeltaStream, participant);
this.jsonFormat = options.jsonFormat ?? false;
}

override async captureText(text: string | TimedString) {
if (!this.participantIdentity) {
return;
}

// latestText must hold the encoded payload so non-delta flush (FINAL=true) republishes the
// same newline-delimited JSON format as the interim chunks.
const payload = this.jsonFormat
? this.encodeJsonChunk(text)
: isTimedString(text)
? text.text
: text;
this.latestText = payload;
await this.handleCaptureText(payload);
}

private encodeJsonChunk(text: string | TimedString): string {
const isTimed = isTimedString(text);
const message = new pb.TimedString({
text: isTimed ? text.text : text,
startTime: isTimed ? text.startTime : undefined,
endTime: isTimed ? text.endTime : undefined,
confidence: isTimed ? text.confidence : undefined,
startTimeOffset: isTimed ? text.startTimeOffset : undefined,
});
return message.toJsonString({ useProtoFieldName: true }) + '\n';
}

protected async handleCaptureText(text: string): Promise<void> {
if (this.flushTask && !this.flushTask.done) {
Expand Down
12 changes: 10 additions & 2 deletions agents/src/voice/room_io/room_io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ export interface RoomOutputOptions {
Defaults to the AudioSource internal default (1000ms).
*/
queueSizeMs?: number;
/** Send the transcription as a JSON dict for each chunk on the `lk.transcription`
datastream topic, including `start_time`/`end_time` timestamps if the chunk is a
TimedString. Each JSON object is suffixed with a newline so clients can parse the
stream line-by-line.
*/
jsonFormat: boolean;
}

const DEFAULT_ROOM_INPUT_OPTIONS: RoomInputOptions = {
Expand All @@ -120,6 +126,7 @@ const DEFAULT_ROOM_OUTPUT_OPTIONS: RoomOutputOptions = {
audioEnabled: true,
syncTranscription: true,
audioPublishOptions: new TrackPublishOptions({ source: TrackSource.SOURCE_MICROPHONE }),
jsonFormat: false,
};

export class RoomIO {
Expand Down Expand Up @@ -164,7 +171,6 @@ export class RoomIO {
this.room = room;
this.inputOptions = { ...DEFAULT_ROOM_INPUT_OPTIONS, ...inputOptions };
this.outputOptions = { ...DEFAULT_ROOM_OUTPUT_OPTIONS, ...outputOptions };

this.userTranscriptWriter = this.userTranscriptStream.writable.getWriter();

this.participantIdentity = participant
Expand Down Expand Up @@ -339,7 +345,9 @@ export class RoomIO {
options.isDeltaStream,
options.participant,
),
new ParticipantTranscriptionOutput(this.room, options.isDeltaStream, options.participant),
new ParticipantTranscriptionOutput(this.room, options.isDeltaStream, options.participant, {
jsonFormat: this.outputOptions.jsonFormat,
}),
]);
}

Expand Down
38 changes: 33 additions & 5 deletions agents/src/voice/transcription/synchronizer.ts
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
type PlaybackFinishedEvent,
TextOutput,
type TimedString,
createTimedString,
isTimedString,
} from '../io.js';

Expand Down Expand Up @@ -143,8 +144,10 @@ class SegmentSynchronizerImpl {
private textData: TextData;
private audioData: AudioData;
private speed: number;
private outputStream: IdentityTransform<string>;
private outputStreamWriter: WritableStreamDefaultWriter<string>;
Comment thread
toubatbrian marked this conversation as resolved.
// Emit TimedString objects so downstream outputs (e.g. RoomIO's json_format) can
// attach `end_time` reflecting synchronized playback timing.
private outputStream: IdentityTransform<string | TimedString>;
private outputStreamWriter: WritableStreamDefaultWriter<string | TimedString>;
private captureTask: Promise<void>;
private startWallTime?: number;

Expand Down Expand Up @@ -211,7 +214,7 @@ class SegmentSynchronizerImpl {
return this.textData.pushedText.length > this.textData.forwardedText.length;
}

get readable(): ReadableStream<string> {
get readable(): ReadableStream<string | TimedString> {
return this.outputStream.readable;
}

Expand Down Expand Up @@ -384,7 +387,16 @@ class SegmentSynchronizerImpl {
pushedTextCursor = wordEnd;

if (this.playbackCompleted) {
this.outputStreamWriter.write(forwardedWord);
this.outputStreamWriter.write(
createTimedString({
text: forwardedWord,
endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined,
}),
);
const cleanWords = this.options.splitWords(word);
const cleanWord = cleanWords.length > 0 ? cleanWords[0]![0] : word;
this.textData.forwardedHyphens += this.options.hyphenateWord(cleanWord).length;
this.textData.forwardedText += forwardedWord;
continue;
}

Expand Down Expand Up @@ -421,12 +433,28 @@ class SegmentSynchronizerImpl {
}

await this.sleepIfNotClosed(delayTime / 2);
this.outputStreamWriter.write(forwardedWord);
this.outputStreamWriter.write(
createTimedString({
text: forwardedWord,
endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined,
}),
);
await this.sleepIfNotClosed(delayTime / 2);

this.textData.forwardedHyphens += wordHyphens;
this.textData.forwardedText += forwardedWord;
}

if (pushedTextCursor < this.textData.pushedText.length) {
const remaining = this.textData.pushedText.slice(pushedTextCursor);
this.outputStreamWriter.write(
createTimedString({
text: remaining,
endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined,
}),
);
this.textData.forwardedText += remaining;
}
}

private calcHyphens(text: string): string[] {
Expand Down
12 changes: 6 additions & 6 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading