Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions apis/json-schema/ApplicationUpdatePayload.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
$schema: https://json-schema.org/draft/2020-12/schema
$id: ApplicationUpdatePayload.yaml
type: object
properties:
applicationId:
type: string
description: The application whose row changed
setupComplete:
type: boolean
description: Current setup_complete flag (after the transition)
status:
$ref: ApplicationStatus.yaml
description: Current application status (after the transition)
gitRemoteUrl:
type: string
description: Current git remote URL, if one is set
cloudDeploymentProvider:
$ref: CloudDeploymentProvider.yaml
description: Selected cloud deployment provider, if one is set
required:
- applicationId
- setupComplete
- status
description: Scoped payload for an ApplicationUpdated notification — carries only the fields the client patches in-place. Deltas for unchanged fields are omitted.
6 changes: 6 additions & 0 deletions apis/json-schema/NotificationEvent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ properties:
type: string
format: date-time
description: When the event occurred
applicationUpdate:
$ref: ApplicationUpdatePayload.yaml
description: Scoped payload for ApplicationUpdated events — present iff eventType === ApplicationUpdated
operationLogAppend:
$ref: OperationLogAppendPayload.yaml
description: Scoped payload for OperationLogAppended events — present iff eventType === OperationLogAppended
required:
- eventType
- agentRunId
Expand Down
8 changes: 8 additions & 0 deletions apis/json-schema/NotificationEventConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ properties:
type: boolean
default: true
description: Notify when cloud deployment status changes (spec 089)
applicationUpdated:
type: boolean
default: true
description: Notify when application row changes (setupComplete, status, gitRemoteUrl, cloudDeploymentProvider — spec 090)
operationLogAppended:
type: boolean
default: true
description: Notify when a new operation log entry is appended (spec 090)
required:
- agentStarted
- phaseCompleted
Expand Down
2 changes: 2 additions & 0 deletions apis/json-schema/NotificationEventType.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ enum:
- pr_blocked
- merge_review_ready
- cloud_deployment_updated
- application_updated
- operation_log_appended
description: Types of agent lifecycle notification events
10 changes: 10 additions & 0 deletions apis/json-schema/OperationLogAppendPayload.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
$schema: https://json-schema.org/draft/2020-12/schema
$id: OperationLogAppendPayload.yaml
type: object
properties:
entry:
$ref: OperationLogEntry.yaml
description: The newly-appended operation log entry
required:
- entry
description: Scoped payload for an OperationLogAppended notification — carries the newly-appended entry so clients can patch their log list in-place without a refetch.
1 change: 1 addition & 0 deletions apis/json-schema/OperationLogKind.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ enum:
- CloudDeploy
- GitRemoteCreate
- RepoSync
- ApplicationSetup
description: Kind of long-running operation an OperationLogEntry belongs to
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@
"minimatch": "^7.x"
}
},
"packageManager": "pnpm@10.30.0",
"packageManager": "pnpm@10.33.0",
"dependencies": {
"@ai-sdk/openai-compatible": "^2.0.0",
"@ai-sdk/provider": "^3.0.8",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,44 @@ export interface ScaffoldOptions {
* `package.json#name`, folder naming during intermediate steps, etc.
*/
readonly projectName: string;

/**
* ID of the Application this scaffold belongs to. Used as the
* `operationId` when the adapter appends progress/error entries to
* the shared operation log so the UI can stream the scaffold's
* stdout/stderr into the same drawer that shows deploy / publish /
* sync activity.
*/
readonly applicationId: string;

/**
* Progress callback. The adapter emits high-level phase events
* (`Starting shadcn init`, `bun add extras done`) and deduped CLI
* output lines from child processes as they arrive. Must never
* throw — errors inside the callback are swallowed by the adapter
* so a failing sink cannot abort a successful scaffold.
*
* Level mapping:
* - `Info` — phase boundaries + stdout from child processes
* - `Warn` — stderr from child processes (surfaced as warnings so
* the user can see progress chatter without the whole log
* turning red; real failures are emitted as `Error` by the
* adapter itself on non-zero exit)
* - `Error` — phase failure with the exception message
* - `Debug` — low-priority bookkeeping (reserved)
*/
readonly onLog?: ScaffoldLogCallback;
}

export type ScaffoldLogLevel = 'Debug' | 'Info' | 'Warn' | 'Error';

export type ScaffoldLogCallback = (entry: {
level: ScaffoldLogLevel;
message: string;
/** Optional multi-line block — typically captured stdout/stderr. */
detail?: string;
}) => void;

export interface ScaffoldResult {
/**
* Absolute path to the finished project root. Always equal to
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type { OperationLogEntry } from '../../../../domain/generated/output.js';

/**
* OperationLog Event Bus (port)
*
* In-process pub/sub for newly-appended operation_log_entries. The SSE
* route (StreamAgentEventsUseCase in spec 090) subscribes and re-emits
* each publish as a notification so the web client can render log
* lines in real time without polling.
*
* DB is source of truth — publishers MUST publish only after a
* successful INSERT. Subscribers MUST NOT assume they see every event
* (SW restart / reconnect may drop a window); the client should
* rehydrate from the DB on first open.
*/
export interface OperationLogEvent {
entry: OperationLogEntry;
}

export type OperationLogEventListener = (event: OperationLogEvent) => void;

export interface IOperationLogEventBus {
publish(event: OperationLogEvent): void;
subscribe(listener: OperationLogEventListener): () => void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,30 @@ import { inject, injectable } from 'tsyringe';

import type { IAgentRunRepository } from '../../ports/output/agents/agent-run-repository.interface.js';
import type { IPhaseTimingRepository } from '../../ports/output/agents/phase-timing-repository.interface.js';
import type { IApplicationRepository } from '../../ports/output/repositories/application-repository.interface.js';
import type { IInteractiveSessionRepository } from '../../ports/output/repositories/interactive-session-repository.interface.js';
import type { ICloudDeploymentEventBus } from '../../ports/output/services/cloud-deployment-event-bus.interface.js';
import type { ILogger } from '../../ports/output/services/logger.interface.js';
import type { IOperationLogEventBus } from '../../ports/output/services/operation-log-event-bus.interface.js';
import type { IProcessLivenessProbe } from '../../ports/output/services/process-liveness.interface.js';

import { ListFeaturesUseCase } from '../features/list-features.use-case.js';

import type { AgentRun, Feature } from '../../../domain/generated/output.js';
import { NotificationEventType, NotificationSeverity } from '../../../domain/generated/output.js';
import {
NotificationEventType,
NotificationSeverity,
OperationLogLevel,
} from '../../../domain/generated/output.js';

import { computeApplicationDeltas } from './stream-agent-events/compute-application-deltas.js';
import { computeFeatureDeltas } from './stream-agent-events/compute-feature-deltas.js';
import { computePhaseCompletionDeltas } from './stream-agent-events/compute-phase-completion-deltas.js';
import { computePrDeltas } from './stream-agent-events/compute-pr-deltas.js';
import { computeSessionDeltas } from './stream-agent-events/compute-session-deltas.js';
import { computeStatusDeltas } from './stream-agent-events/compute-status-deltas.js';
import type {
CachedApplicationState,
CachedFeatureState,
CachedSessionState,
StreamedAgentEvent,
Expand Down Expand Up @@ -83,6 +91,10 @@ export class StreamAgentEventsUseCase {
private readonly processLiveness: IProcessLivenessProbe,
@inject('ICloudDeploymentEventBus')
private readonly cloudEventBus: ICloudDeploymentEventBus,
@inject('IApplicationRepository')
private readonly applicationRepo: IApplicationRepository,
@inject('IOperationLogEventBus')
private readonly operationLogEventBus: IOperationLogEventBus,
@inject('ILogger')
private readonly logger: ILogger
) {}
Expand All @@ -102,6 +114,7 @@ export class StreamAgentEventsUseCase {

const featureCache = new Map<string, CachedFeatureState>();
const sessionCache = new Map<string, CachedSessionState>();
const applicationCache = new Map<string, CachedApplicationState>();

const queue: StreamedAgentEvent[] = [];
let notify: (() => void) | null = null;
Expand All @@ -115,13 +128,20 @@ export class StreamAgentEventsUseCase {
};

const unsubscribeCloudDeploy = this.subscribeCloudDeploy(enqueue);
const unsubscribeOperationLog = this.subscribeOperationLog(enqueue);

let pollErrorCount = 0;

try {
while (!signal?.aborted) {
try {
await this.pollOnce({ runIdFilter, featureCache, sessionCache, enqueue });
await this.pollOnce({
runIdFilter,
featureCache,
sessionCache,
applicationCache,
enqueue,
});
pollErrorCount = 0;
} catch (error) {
pollErrorCount++;
Expand Down Expand Up @@ -178,6 +198,11 @@ export class StreamAgentEventsUseCase {
} catch {
// Listener may already be detached.
}
try {
unsubscribeOperationLog();
} catch {
// Listener may already be detached.
}
}
}

Expand Down Expand Up @@ -218,6 +243,43 @@ export class StreamAgentEventsUseCase {
});
}

/**
* Subscribe to the in-process operation-log event bus and re-emit each
* publish as a `NotificationEventType.OperationLogAppended` notification.
* Returns the unsubscribe handle.
*/
private subscribeOperationLog(enqueue: (event: StreamedAgentEvent) => void): () => void {
return this.operationLogEventBus.subscribe(({ entry }) => {
const severity =
entry.level === OperationLogLevel.Error
? NotificationSeverity.Error
: entry.level === OperationLogLevel.Warn
? NotificationSeverity.Warning
: NotificationSeverity.Info;

const timestamp =
entry.createdAt instanceof Date
? entry.createdAt.toISOString()
: typeof entry.createdAt === 'string'
? entry.createdAt
: String(entry.createdAt);

enqueue({
kind: 'notification',
event: {
eventType: NotificationEventType.OperationLogAppended,
agentRunId: entry.operationId,
featureId: entry.operationId,
featureName: entry.operationKind,
message: entry.message,
severity,
timestamp,
operationLogAppend: { entry },
},
});
});
}

/**
* Single poll cycle: walk every feature's latest agent run, diff against
* the connection cache, and enqueue notification events for any observed
Expand All @@ -227,9 +289,10 @@ export class StreamAgentEventsUseCase {
runIdFilter?: string;
featureCache: Map<string, CachedFeatureState>;
sessionCache: Map<string, CachedSessionState>;
applicationCache: Map<string, CachedApplicationState>;
enqueue: (event: StreamedAgentEvent) => void;
}): Promise<void> {
const { runIdFilter, featureCache, sessionCache, enqueue } = args;
const { runIdFilter, featureCache, sessionCache, applicationCache, enqueue } = args;

const features = await this.listFeatures.execute();

Expand Down Expand Up @@ -294,6 +357,27 @@ export class StreamAgentEventsUseCase {
} catch {
// Ignore interactive session poll errors to not affect main polling.
}

// Application row polling — diff against per-connection cache and emit
// `ApplicationUpdated` on any watched-field change. Seed is silent.
try {
const applications = await this.applicationRepo.list();
for (const app of applications) {
if (runIdFilter && app.id !== runIdFilter) continue;
const prev = applicationCache.get(app.id);
for (const event of computeApplicationDeltas({ application: app, prev })) {
enqueue(event);
}
applicationCache.set(app.id, {
setupComplete: app.setupComplete,
status: app.status,
gitRemoteUrl: app.gitRemoteUrl,
cloudDeploymentProvider: app.cloudDeploymentProvider,
});
}
} catch {
// Ignore application-poll failures; same posture as session polling.
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Pure helper: compute ApplicationUpdated delta for a single application row.
*
* Diffs the current `Application` against the cached snapshot of a watched
* field set. Emits at most ONE notification event carrying the full updated
* payload. Seed case (no prev) returns zero events — the caller is expected
* to populate the cache on first sight.
*
* Pure: no I/O, no timers, no side effects. `prev` is NOT mutated here — the
* use-case owns cache lifecycle so the helper stays trivially testable.
*/

import type { Application } from '../../../../domain/generated/output.js';
import {
NotificationEventType,
NotificationSeverity,
} from '../../../../domain/generated/output.js';

import type { CachedApplicationState, StreamedAgentEvent } from './stream-agent-events.types.js';

const WATCHED_FIELDS = [
'setupComplete',
'status',
'gitRemoteUrl',
'cloudDeploymentProvider',
] as const;

export interface ComputeApplicationDeltasArgs {
application: Application;
prev: CachedApplicationState | undefined;
}

export function computeApplicationDeltas(args: ComputeApplicationDeltasArgs): StreamedAgentEvent[] {
const { application, prev } = args;

if (prev === undefined) return [];

let changed = false;
for (const field of WATCHED_FIELDS) {
if (prev[field] !== application[field]) {
changed = true;
break;
}
}
if (!changed) return [];

return [
{
kind: 'notification',
event: {
eventType: NotificationEventType.ApplicationUpdated,
agentRunId: '',
featureId: application.id,
featureName: application.name,
message: `Application "${application.name}" updated`,
severity: NotificationSeverity.Info,
timestamp: new Date().toISOString(),
applicationUpdate: {
applicationId: application.id,
setupComplete: application.setupComplete,
status: application.status,
gitRemoteUrl: application.gitRemoteUrl,
cloudDeploymentProvider: application.cloudDeploymentProvider,
},
},
},
];
}
Loading
Loading