Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/room-io-json-transcription.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": minor
Comment thread
toubatbrian marked this conversation as resolved.
Outdated
---

feat(room-io): add `jsonFormat` option on `RoomOutputOptions` for timed transcription output. When enabled, each chunk published on the `lk.transcription` datastream topic is a JSON object with `text`, and `start_time`/`end_time` when the chunk is a `TimedString`. Ported from livekit/agents#5472.
58 changes: 58 additions & 0 deletions agents/src/voice/room_io/_output.ts
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Comment thread
toubatbrian marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,67 @@ abstract class BaseParticipantTranscriptionOutput extends TextOutput {
protected abstract handleFlush(): void;
}

// Ref: python livekit-agents/livekit/agents/voice/room_io/_output.py - 363-383 lines
export interface ParticipantTranscriptionOutputOptions {
/** When true, each chunk sent on the `lk.transcription` datastream topic is serialized
* as a JSON object with `text`, and `start_time`/`end_time`/`confidence`/
* `start_time_offset` when the captured value is a TimedString. Each object is
* suffixed with a newline so subscribers can parse the stream line-by-line. */
jsonFormat?: boolean;
}

export class ParticipantTranscriptionOutput extends BaseParticipantTranscriptionOutput {
Comment thread
toubatbrian marked this conversation as resolved.
private writer: TextStreamWriter | null = null;
private flushTask: Task<void> | null = null;
private jsonFormat: boolean;

constructor(
room: Room,
isDeltaStream: boolean,
participant: Participant | string | null,
options: ParticipantTranscriptionOutputOptions = {},
) {
super(room, isDeltaStream, participant);
this.jsonFormat = options.jsonFormat ?? false;
}

override async captureText(text: string | TimedString) {
if (!this.participantIdentity) {
return;
}

const textStr = isTimedString(text) ? text.text : text;
this.latestText = textStr;
Comment thread
toubatbrian marked this conversation as resolved.
Outdated

// Ref: python livekit-agents/livekit/agents/voice/room_io/_output.py - 450-462 lines
// When json_format is enabled, serialize each chunk as a protobuf-compatible JSON dict.
// The Python implementation uses `agent_pb.TimedString` + `MessageToDict(preserving_proto_field_name=True)`.
// We emit the same snake_case shape directly (no protobuf runtime dependency on the JS side).
const payload = this.jsonFormat ? this.encodeJsonChunk(text) : textStr;

await this.handleCaptureText(payload);
}

private encodeJsonChunk(text: string | TimedString): string {
const obj: Record<string, unknown> = {
text: isTimedString(text) ? text.text : String(text),
};
if (isTimedString(text)) {
if (text.startTime !== undefined) {
obj.start_time = text.startTime;
}
if (text.endTime !== undefined) {
obj.end_time = text.endTime;
}
if (text.confidence !== undefined) {
obj.confidence = text.confidence;
}
if (text.startTimeOffset !== undefined) {
obj.start_time_offset = text.startTimeOffset;
}
}
return JSON.stringify(obj) + '\n';
}

protected async handleCaptureText(text: string): Promise<void> {
if (this.flushTask && !this.flushTask.done) {
Expand Down
13 changes: 12 additions & 1 deletion agents/src/voice/room_io/room_io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ export interface RoomOutputOptions {
Defaults to the AudioSource internal default (1000ms).
*/
queueSizeMs?: number;
// Ref: python livekit-agents/livekit/agents/voice/room_io/types.py - 102-103 lines
/** Send the transcription as a JSON dict for each chunk on the `lk.transcription`
datastream topic, including `start_time`/`end_time` timestamps if the chunk is a
TimedString. Each JSON object is suffixed with a newline so clients can parse the
stream line-by-line.
*/
jsonFormat: boolean;
}

const DEFAULT_ROOM_INPUT_OPTIONS: RoomInputOptions = {
Expand All @@ -120,6 +127,7 @@ const DEFAULT_ROOM_OUTPUT_OPTIONS: RoomOutputOptions = {
audioEnabled: true,
syncTranscription: true,
audioPublishOptions: new TrackPublishOptions({ source: TrackSource.SOURCE_MICROPHONE }),
jsonFormat: false,
};

export class RoomIO {
Expand Down Expand Up @@ -339,7 +347,10 @@ export class RoomIO {
options.isDeltaStream,
options.participant,
),
new ParticipantTranscriptionOutput(this.room, options.isDeltaStream, options.participant),
// Ref: python livekit-agents/livekit/agents/voice/room_io/room_io.py - 159 lines
new ParticipantTranscriptionOutput(this.room, options.isDeltaStream, options.participant, {
jsonFormat: this.outputOptions.jsonFormat,
}),
]);
}

Expand Down
27 changes: 22 additions & 5 deletions agents/src/voice/transcription/synchronizer.ts
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
type PlaybackFinishedEvent,
TextOutput,
type TimedString,
createTimedString,
isTimedString,
} from '../io.js';

Expand Down Expand Up @@ -143,8 +144,11 @@ class SegmentSynchronizerImpl {
private textData: TextData;
private audioData: AudioData;
private speed: number;
private outputStream: IdentityTransform<string>;
private outputStreamWriter: WritableStreamDefaultWriter<string>;
Comment thread
toubatbrian marked this conversation as resolved.
// Ref: python livekit-agents/livekit/agents/voice/transcription/synchronizer.py - 151 lines
// Emit TimedString objects so downstream outputs (e.g. RoomIO's json_format) can
// attach `end_time` reflecting synchronized playback timing.
private outputStream: IdentityTransform<string | TimedString>;
private outputStreamWriter: WritableStreamDefaultWriter<string | TimedString>;
private captureTask: Promise<void>;
private startWallTime?: number;

Expand Down Expand Up @@ -200,7 +204,7 @@ class SegmentSynchronizerImpl {
return this.textData.pushedText.length > this.textData.forwardedText.length;
}

get readable(): ReadableStream<string> {
get readable(): ReadableStream<string | TimedString> {
return this.outputStream.readable;
}

Expand Down Expand Up @@ -342,7 +346,14 @@ class SegmentSynchronizerImpl {
}

if (this.playbackCompleted) {
this.outputStreamWriter.write(sentence.slice(textCursor, endPos));
// Ref: python livekit-agents/livekit/agents/voice/transcription/synchronizer.py - 331-333 lines
const playedWord = sentence.slice(textCursor, endPos);
this.outputStreamWriter.write(
createTimedString({
text: playedWord,
endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined,
}),
);
textCursor = endPos;
continue;
}
Expand Down Expand Up @@ -379,7 +390,13 @@ class SegmentSynchronizerImpl {

await this.sleepIfNotClosed(delayTime / 2);
const forwardedWord = sentence.slice(textCursor, endPos);
this.outputStreamWriter.write(forwardedWord);
// Ref: python livekit-agents/livekit/agents/voice/transcription/synchronizer.py - 363-368 lines
this.outputStreamWriter.write(
createTimedString({
text: forwardedWord,
endTime: this.startWallTime ? (Date.now() - this.startWallTime) / 1000 : undefined,
}),
);

await this.sleepIfNotClosed(delayTime / 2);

Expand Down
Loading