Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/vs/workbench/api/common/extHostTerminalShellIntegration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { IExtHostRpcService } from './extHostRpcService.js';
import { IExtHostTerminalService } from './extHostTerminalService.js';
import { Emitter, type Event } from '../../../base/common/event.js';
import { URI } from '../../../base/common/uri.js';
import { AsyncIterableObject, Barrier, type AsyncIterableEmitter } from '../../../base/common/async.js';
import { AsyncIterableProducer, Barrier, DeferredPromise, type AsyncIterableEmitter } from '../../../base/common/async.js';

export interface IExtHostTerminalShellIntegration extends ExtHostTerminalShellIntegrationShape {
readonly _serviceBrand: undefined;
Expand Down Expand Up @@ -400,7 +400,7 @@ class InternalTerminalShellExecution {
private _createDataStream(): AsyncIterable<string> {
if (!this._dataStream) {
if (this._isEnded) {
return AsyncIterableObject.EMPTY;
return AsyncIterableProducer.EMPTY;
}
this._dataStream = new ShellExecutionDataStream();
}
Expand Down Expand Up @@ -432,19 +432,21 @@ class InternalTerminalShellExecution {

class ShellExecutionDataStream extends Disposable {
private _barrier: Barrier | undefined;
private _iterables: AsyncIterableObject<string>[] = [];
private _completionPromises: DeferredPromise<void>[] = [];
private _emitters: AsyncIterableEmitter<string>[] = [];

createIterable(): AsyncIterable<string> {
if (!this._barrier) {
this._barrier = new Barrier();
}
const barrier = this._barrier;
const iterable = new AsyncIterableObject<string>(async emitter => {
const completionPromise = new DeferredPromise<void>();
this._completionPromises.push(completionPromise);
const iterable = new AsyncIterableProducer<string>(async emitter => {
this._emitters.push(emitter);
await barrier.wait();
completionPromise.complete(undefined);
});
this._iterables.push(iterable);
return iterable;
}

Expand All @@ -459,7 +461,7 @@ class ShellExecutionDataStream extends Disposable {
}

async flush(): Promise<void> {
await Promise.all(this._iterables.map(e => e.toPromise()));
await Promise.all(this._completionPromises.map(p => p.p));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ suite('InternalTerminalShellIntegration', () => {
}

async function emitData(data: string): Promise<void> {
// AsyncIterableObjects are initialized in a microtask, this doesn't matter in practice
// AsyncIterableProducers are initialized in a microtask, this doesn't matter in practice
// since the events will always come through in different events.
await new Promise<void>(r => queueMicrotask(r));
si.emitData(data);
Expand Down