Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/room-io-json-transcription.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": minor
Comment thread
toubatbrian marked this conversation as resolved.
Outdated
---

feat(room-io): add `jsonFormat` option on `RoomOutputOptions` for timed transcription output. When enabled, each chunk published on the `lk.transcription` datastream topic is a JSON object with `text`, and `start_time`/`end_time` when the chunk is a `TimedString`. Ported from livekit/agents#5472.
1 change: 0 additions & 1 deletion agents/src/stt/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ export interface SpeechData {
*
* May contain multiple entries when a single utterance spans multiple source languages.
*/
// Ref: python livekit-agents/livekit/agents/stt/stt.py - 62-68 lines
Comment thread
toubatbrian marked this conversation as resolved.
Outdated
sourceLanguages?: LanguageCode[];
}

Expand Down
59 changes: 59 additions & 0 deletions agents/src/voice/room_io/_output.ts
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Comment thread
toubatbrian marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,68 @@ abstract class BaseParticipantTranscriptionOutput extends TextOutput {
protected abstract handleFlush(): void;
}

export interface ParticipantTranscriptionOutputOptions {
/** When true, each chunk sent on the `lk.transcription` datastream topic is serialized
* as a JSON object with `text`, and `start_time`/`end_time`/`confidence`/
* `start_time_offset` when the captured value is a TimedString. Each object is
* suffixed with a newline so subscribers can parse the stream line-by-line. */
jsonFormat?: boolean;
}

export class ParticipantTranscriptionOutput extends BaseParticipantTranscriptionOutput {
Comment thread
toubatbrian marked this conversation as resolved.
private writer: TextStreamWriter | null = null;
private flushTask: Task<void> | null = null;
private jsonFormat: boolean;

constructor(
room: Room,
isDeltaStream: boolean,
participant: Participant | string | null,
options: ParticipantTranscriptionOutputOptions = {},
) {
super(room, isDeltaStream, participant);
this.jsonFormat = options.jsonFormat ?? false;
}

override async captureText(text: string | TimedString) {
if (!this.participantIdentity) {
return;
}

// When json_format is enabled, serialize each chunk as a protobuf-compatible JSON dict.
// The Python implementation uses `agent_pb.TimedString` + `MessageToDict(preserving_proto_field_name=True)`.
// We emit the same snake_case shape directly (no protobuf runtime dependency on the JS side).
// latestText must hold the encoded payload so non-delta flush (FINAL=true) republishes the
// same newline-delimited JSON format as the interim chunks.
const payload = this.jsonFormat
? this.encodeJsonChunk(text)
: isTimedString(text)
? text.text
: text;
this.latestText = payload;
await this.handleCaptureText(payload);
}

private encodeJsonChunk(text: string | TimedString): string {
const obj: Record<string, unknown> = {
text: isTimedString(text) ? text.text : String(text),
};
if (isTimedString(text)) {
if (text.startTime !== undefined) {
obj.start_time = text.startTime;
}
if (text.endTime !== undefined) {
obj.end_time = text.endTime;
}
if (text.confidence !== undefined) {
obj.confidence = text.confidence;
}
if (text.startTimeOffset !== undefined) {
obj.start_time_offset = text.startTimeOffset;
}
}
return JSON.stringify(obj) + '\n';
}

protected async handleCaptureText(text: string): Promise<void> {
if (this.flushTask && !this.flushTask.done) {
Expand Down
12 changes: 10 additions & 2 deletions agents/src/voice/room_io/room_io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ export interface RoomOutputOptions {
Defaults to the AudioSource internal default (1000ms).
*/
queueSizeMs?: number;
/** Send the transcription as a JSON dict for each chunk on the `lk.transcription`
datastream topic, including `start_time`/`end_time` timestamps if the chunk is a
TimedString. Each JSON object is suffixed with a newline so clients can parse the
stream line-by-line.
*/
jsonFormat: boolean;
}

const DEFAULT_ROOM_INPUT_OPTIONS: RoomInputOptions = {
Expand All @@ -120,6 +126,7 @@ const DEFAULT_ROOM_OUTPUT_OPTIONS: RoomOutputOptions = {
audioEnabled: true,
syncTranscription: true,
audioPublishOptions: new TrackPublishOptions({ source: TrackSource.SOURCE_MICROPHONE }),
jsonFormat: false,
};

export class RoomIO {
Expand Down Expand Up @@ -164,7 +171,6 @@ export class RoomIO {
this.room = room;
this.inputOptions = { ...DEFAULT_ROOM_INPUT_OPTIONS, ...inputOptions };
this.outputOptions = { ...DEFAULT_ROOM_OUTPUT_OPTIONS, ...outputOptions };

this.userTranscriptWriter = this.userTranscriptStream.writable.getWriter();

this.participantIdentity = participant
Expand Down Expand Up @@ -339,7 +345,9 @@ export class RoomIO {
options.isDeltaStream,
options.participant,
),
new ParticipantTranscriptionOutput(this.room, options.isDeltaStream, options.participant),
new ParticipantTranscriptionOutput(this.room, options.isDeltaStream, options.participant, {
jsonFormat: this.outputOptions.jsonFormat,
}),
]);
}

Expand Down
38 changes: 33 additions & 5 deletions agents/src/voice/transcription/synchronizer.ts
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
type PlaybackFinishedEvent,
TextOutput,
type TimedString,
createTimedString,
isTimedString,
} from '../io.js';

Expand Down Expand Up @@ -143,8 +144,10 @@ class SegmentSynchronizerImpl {
private textData: TextData;
private audioData: AudioData;
private speed: number;
private outputStream: IdentityTransform<string>;
private outputStreamWriter: WritableStreamDefaultWriter<string>;
Comment thread
toubatbrian marked this conversation as resolved.
// Emit TimedString objects so downstream outputs (e.g. RoomIO's json_format) can
// attach `end_time` reflecting synchronized playback timing.
private outputStream: IdentityTransform<string | TimedString>;
private outputStreamWriter: WritableStreamDefaultWriter<string | TimedString>;
private captureTask: Promise<void>;
private startWallTime?: number;

Expand Down Expand Up @@ -200,7 +203,7 @@ class SegmentSynchronizerImpl {
return this.textData.pushedText.length > this.textData.forwardedText.length;
}

get readable(): ReadableStream<string> {
get readable(): ReadableStream<string | TimedString> {
return this.outputStream.readable;
}

Expand Down Expand Up @@ -353,7 +356,16 @@ class SegmentSynchronizerImpl {
pushedTextCursor = wordEnd;

if (this.playbackCompleted) {
this.outputStreamWriter.write(forwardedWord);
this.outputStreamWriter.write(
createTimedString({
text: forwardedWord,
endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined,
}),
);
const cleanWords = this.options.splitWords(word);
const cleanWord = cleanWords.length > 0 ? cleanWords[0]![0] : word;
this.textData.forwardedHyphens += this.options.hyphenateWord(cleanWord).length;
this.textData.forwardedText += forwardedWord;
continue;
}

Expand Down Expand Up @@ -390,12 +402,28 @@ class SegmentSynchronizerImpl {
}

await this.sleepIfNotClosed(delayTime / 2);
this.outputStreamWriter.write(forwardedWord);
this.outputStreamWriter.write(
createTimedString({
text: forwardedWord,
endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined,
}),
);
await this.sleepIfNotClosed(delayTime / 2);

this.textData.forwardedHyphens += wordHyphens;
this.textData.forwardedText += forwardedWord;
}

if (pushedTextCursor < this.textData.pushedText.length) {
const remaining = this.textData.pushedText.slice(pushedTextCursor);
this.outputStreamWriter.write(
createTimedString({
text: remaining,
endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined,
}),
);
this.textData.forwardedText += remaining;
}
}

private calcHyphens(text: string): string[] {
Expand Down
Loading