diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs index a6a4533318..3217629525 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs @@ -535,6 +535,46 @@ impl ActorContext { self.config().max_queue_size } + /// Removes all messages from the queue and resets metadata. + pub async fn reset(&self) -> Result<()> { + self.ensure_initialized().await?; + + // List and delete all message keys + let entries = self.list_message_entries().await?; + if !entries.is_empty() { + let keys: Vec> = entries.iter().map(|(k, _)| k.clone()).collect(); + let key_refs: Vec<&[u8]> = keys.iter().map(Vec::as_slice).collect(); + self.0 + .kv + .batch_delete(&key_refs) + .await + .context("delete all queue messages")?; + } + + // Reset metadata + let metadata = QueueMetadata { + next_id: 0, + size: 0, + }; + let encoded_metadata = + encode_queue_metadata(&metadata).context("encode reset queue metadata")?; + self.0 + .kv + .put(&QUEUE_METADATA_KEY, &encoded_metadata) + .await + .context("persist reset queue metadata")?; + *self.0.queue_metadata.lock().await = metadata; + + // Drop all completion waiters + self.0.queue_completion_waiters.clear_async().await; + + // Update metrics and notify inspector + self.0.metrics.set_queue_depth(0); + self.notify_inspector_update(0); + + Ok(()) + } + pub(crate) fn configure_queue(&self, config: ActorConfig) { *self.0.queue_config.lock() = config; } diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs index 1c2929df00..8a783e9499 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs @@ -127,6 +127,30 @@ impl RegistryDispatcher { }; json_http_response(StatusCode::OK, &payload) } + (http::Method::DELETE, "/inspector/queue") => { + match instance.ctx.queue().reset().await { + Ok(_) => json_http_response(StatusCode::OK, &json!({})), + Err(error) => Err(error).context("reset inspector queue"), + } + } + (http::Method::POST, "/inspector/queue") => { + let body: InspectorEnqueueBody = match parse_json_body(request) { + Ok(body) => body, + Err(response) => return Ok(Some(response)), + }; + let cbor_body = encode_json_as_cbor(&body.body.unwrap_or(serde_json::Value::Null))?; + match instance.ctx.queue().send(&body.name, &cbor_body).await { + Ok(message) => json_http_response( + StatusCode::OK, + &json!({ + "id": message.id, + "name": message.name, + "createdAtMs": message.created_at, + }), + ), + Err(error) => Err(error).context("enqueue inspector queue message"), + } + } (http::Method::GET, "/inspector/workflow-history") => self .inspector_workflow_history(instance) .await diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs index 47aca378d1..89507be479 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs @@ -241,6 +241,13 @@ struct InspectorWorkflowReplayBody { entry_id: Option, } +#[derive(Debug, Default, Deserialize)] +#[serde(default)] +struct InspectorEnqueueBody { + name: String, + body: Option, +} + #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct InspectorQueueMessageJson { diff --git a/rivetkit-typescript/packages/rivetkit-napi/src/queue.rs b/rivetkit-typescript/packages/rivetkit-napi/src/queue.rs index f75927f0a7..2d53c83e9f 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/src/queue.rs +++ b/rivetkit-typescript/packages/rivetkit-napi/src/queue.rs @@ -216,6 +216,11 @@ impl Queue { self.inner.max_size() } + #[napi] + pub async fn reset(&self) -> napi::Result<()> { + self.inner.reset().await.map_err(napi_anyhow_error) + } + #[napi] pub async fn inspect_messages(&self) -> napi::Result> { self.inner diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts index f3bb283a52..c12ff8b331 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts @@ -723,6 +723,10 @@ export class NapiCoreRuntime implements CoreRuntime { return await asNativeActorContext(ctx).queue().inspectMessages(); } + async actorQueueReset(ctx: ActorContextHandle): Promise { + await asNativeActorContext(ctx).queue().reset(); + } + actorScheduleAfter( ctx: ActorContextHandle, durationMs: number, diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts index 4bee092753..64f4b752f4 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts @@ -3522,6 +3522,27 @@ export function buildNativeFactory( messages, }); } + if ( + url.pathname === "/inspector/queue" && + jsRequest.method === "DELETE" + ) { + await runtime.actorQueueReset(ctx); + return jsonResponse({}); + } + if ( + url.pathname === "/inspector/queue" && + jsRequest.method === "POST" + ) { + const body = await jsRequest.json() as { name?: string; body?: unknown }; + const name = body.name ?? ""; + const cbor = encodeCborCompat((body.body ?? null) as JsonCompatValue); + const message = await runtime.actorQueueSend(ctx, name, cbor); + return jsonResponse({ + id: Number(message.id()), + name: message.name(), + createdAtMs: message.createdAt(), + }); + } if ( url.pathname === "/inspector/traces" && jsRequest.method === "GET" diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts index 67f6b6a633..0a8ff9b68a 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts @@ -505,6 +505,7 @@ export interface CoreRuntime { actorQueueInspectMessages( ctx: ActorContextHandle, ): Promise; + actorQueueReset(ctx: ActorContextHandle): Promise; actorScheduleAfter( ctx: ActorContextHandle, diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts index 2ab9a9d0fd..d0b0bce75c 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts @@ -820,6 +820,11 @@ export class WasmCoreRuntime implements CoreRuntime { return await callHandleAsync(queue, "inspectMessages"); } + async actorQueueReset(ctx: ActorContextHandle): Promise { + const queue = childHandle(asWasmActorContext(ctx), "queue"); + await callHandleAsync(queue, "reset"); + } + actorScheduleAfter( ctx: ActorContextHandle, durationMs: number | bigint,