Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ OKTA_AUDIENCE=sprout-desktop
# OKTA_AUDIENCE=sprout-api
# OKTA_PUBKEY_CLAIM=nostr_pubkey

# -----------------------------------------------------------------------------
# Ephemeral Channels (TTL testing)
# -----------------------------------------------------------------------------
# Override the TTL for all ephemeral channels (in seconds). When set, any
# channel created with a TTL tag will use this value instead of the
# client-provided one. Unset to use the client-provided TTL.
# SPROUT_EPHEMERAL_TTL_OVERRIDE=60

# How often the reaper checks for expired ephemeral channels (default: 60s).
# SPROUT_REAPER_INTERVAL_SECS=5

# -----------------------------------------------------------------------------
# Logging / Tracing
# -----------------------------------------------------------------------------
Expand Down
81 changes: 71 additions & 10 deletions crates/sprout-db/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ pub struct ChannelRecord {
pub purpose_set_by: Option<Vec<u8>>,
/// When the purpose was last set.
pub purpose_set_at: Option<DateTime<Utc>>,
/// TTL in seconds for ephemeral channels. `None` means permanent.
pub ttl_seconds: Option<i32>,
/// Deadline by which a new message must arrive or the channel is auto-archived.
pub ttl_deadline: Option<DateTime<Utc>>,
}

/// A channel membership row as returned from the database.
Expand Down Expand Up @@ -85,6 +89,7 @@ pub async fn create_channel(
visibility: ChannelVisibility,
description: Option<&str>,
created_by: &[u8],
ttl_seconds: Option<i32>,
) -> Result<ChannelRecord> {
if created_by.len() != 32 {
return Err(DbError::InvalidData(format!(
Expand All @@ -99,8 +104,9 @@ pub async fn create_channel(

sqlx::query(
r#"
INSERT INTO channels (id, name, channel_type, visibility, description, created_by)
VALUES ($1, $2, $3::channel_type, $4::channel_visibility, $5, $6)
INSERT INTO channels (id, name, channel_type, visibility, description, created_by, ttl_seconds, ttl_deadline)
VALUES ($1, $2, $3::channel_type, $4::channel_visibility, $5, $6, $7,
CASE WHEN $7 IS NOT NULL THEN NOW() + ($7 || ' seconds')::interval ELSE NULL END)
"#,
)
.bind(id)
Expand All @@ -109,6 +115,7 @@ pub async fn create_channel(
.bind(visibility.as_str())
.bind(description)
.bind(created_by)
.bind(ttl_seconds)
.execute(&mut *tx)
.await?;

Expand All @@ -135,7 +142,8 @@ pub async fn create_channel(
created_by, created_at, updated_at, archived_at, deleted_at,
nip29_group_id, topic_required, max_members,
topic, topic_set_by, topic_set_at,
purpose, purpose_set_by, purpose_set_at
purpose, purpose_set_by, purpose_set_at,
ttl_seconds, ttl_deadline
FROM channels WHERE id = $1
"#,
)
Expand All @@ -152,6 +160,7 @@ pub async fn create_channel(
///
/// Returns `(record, true)` if the channel was newly created, or `(record, false)` if a
/// channel with `channel_id` already exists (duplicate — caller should reject the event).
#[allow(clippy::too_many_arguments)]
pub async fn create_channel_with_id(
pool: &PgPool,
channel_id: Uuid,
Expand All @@ -160,6 +169,7 @@ pub async fn create_channel_with_id(
visibility: ChannelVisibility,
description: Option<&str>,
created_by: &[u8],
ttl_seconds: Option<i32>,
) -> Result<(ChannelRecord, bool)> {
if created_by.len() != 32 {
return Err(DbError::InvalidData(format!(
Expand All @@ -172,8 +182,9 @@ pub async fn create_channel_with_id(

let rows_affected = sqlx::query(
r#"
INSERT INTO channels (id, name, channel_type, visibility, description, created_by)
VALUES ($1, $2, $3::channel_type, $4::channel_visibility, $5, $6)
INSERT INTO channels (id, name, channel_type, visibility, description, created_by, ttl_seconds, ttl_deadline)
VALUES ($1, $2, $3::channel_type, $4::channel_visibility, $5, $6, $7,
CASE WHEN $7 IS NOT NULL THEN NOW() + ($7 || ' seconds')::interval ELSE NULL END)
ON CONFLICT (id) DO NOTHING
"#,
)
Expand All @@ -183,6 +194,7 @@ pub async fn create_channel_with_id(
.bind(visibility.as_str())
.bind(description)
.bind(created_by)
.bind(ttl_seconds)
.execute(&mut *tx)
.await?
.rows_affected();
Expand Down Expand Up @@ -215,7 +227,8 @@ pub async fn create_channel_with_id(
created_by, created_at, updated_at, archived_at, deleted_at,
nip29_group_id, topic_required, max_members,
topic, topic_set_by, topic_set_at,
purpose, purpose_set_by, purpose_set_at
purpose, purpose_set_by, purpose_set_at,
ttl_seconds, ttl_deadline
FROM channels WHERE id = $1
"#,
)
Expand All @@ -237,7 +250,8 @@ pub async fn get_channel(pool: &PgPool, channel_id: Uuid) -> Result<ChannelRecor
created_by, created_at, updated_at, archived_at, deleted_at,
nip29_group_id, topic_required, max_members,
topic, topic_set_by, topic_set_at,
purpose, purpose_set_by, purpose_set_at
purpose, purpose_set_by, purpose_set_at,
ttl_seconds, ttl_deadline
FROM channels WHERE id = $1 AND deleted_at IS NULL
"#,
)
Expand Down Expand Up @@ -535,7 +549,8 @@ pub async fn list_channels(pool: &PgPool, visibility: Option<&str>) -> Result<Ve
created_by, created_at, updated_at, archived_at, deleted_at,
nip29_group_id, topic_required, max_members,
topic, topic_set_by, topic_set_at,
purpose, purpose_set_by, purpose_set_at
purpose, purpose_set_by, purpose_set_at,
ttl_seconds, ttl_deadline
FROM channels
WHERE deleted_at IS NULL AND visibility::text = $1
ORDER BY created_at DESC
Expand All @@ -553,7 +568,8 @@ pub async fn list_channels(pool: &PgPool, visibility: Option<&str>) -> Result<Ve
created_by, created_at, updated_at, archived_at, deleted_at,
nip29_group_id, topic_required, max_members,
topic, topic_set_by, topic_set_at,
purpose, purpose_set_by, purpose_set_at
purpose, purpose_set_by, purpose_set_at,
ttl_seconds, ttl_deadline
FROM channels
WHERE deleted_at IS NULL
ORDER BY created_at DESC
Expand Down Expand Up @@ -596,7 +612,8 @@ async fn get_channel_tx(
created_by, created_at, updated_at, archived_at, deleted_at,
nip29_group_id, topic_required, max_members,
topic, topic_set_by, topic_set_at,
purpose, purpose_set_by, purpose_set_at
purpose, purpose_set_by, purpose_set_at,
ttl_seconds, ttl_deadline
FROM channels WHERE id = $1 AND deleted_at IS NULL
"#,
)
Expand Down Expand Up @@ -685,6 +702,7 @@ pub async fn get_accessible_channels(
c.nip29_group_id, c.topic_required, c.max_members,
c.topic, c.topic_set_by, c.topic_set_at,
c.purpose, c.purpose_set_by, c.purpose_set_at,
c.ttl_seconds, c.ttl_deadline,
(cm.channel_id IS NOT NULL) AS is_member
FROM channels c
LEFT JOIN channel_members cm
Expand Down Expand Up @@ -807,6 +825,8 @@ fn row_to_channel_record(row: sqlx::postgres::PgRow) -> Result<ChannelRecord> {
let purpose: Option<String> = row.try_get("purpose").unwrap_or(None);
let purpose_set_by: Option<Vec<u8>> = row.try_get("purpose_set_by").unwrap_or(None);
let purpose_set_at: Option<DateTime<Utc>> = row.try_get("purpose_set_at").unwrap_or(None);
let ttl_seconds: Option<i32> = row.try_get("ttl_seconds").unwrap_or(None);
let ttl_deadline: Option<DateTime<Utc>> = row.try_get("ttl_deadline").unwrap_or(None);

Ok(ChannelRecord {
id,
Expand All @@ -829,6 +849,8 @@ fn row_to_channel_record(row: sqlx::postgres::PgRow) -> Result<ChannelRecord> {
purpose,
purpose_set_by,
purpose_set_at,
ttl_seconds,
ttl_deadline,
})
}

Expand Down Expand Up @@ -1086,3 +1108,42 @@ pub async fn get_member_role(
.await?;
Ok(row.map(|r| r.try_get("role")).transpose()?)
}

/// Bump the TTL deadline for an ephemeral channel after a new message.
///
/// No-op for permanent channels or channels that are already archived/deleted.
pub async fn bump_ttl_deadline(pool: &PgPool, channel_id: Uuid) -> Result<()> {
sqlx::query(
"UPDATE channels SET ttl_deadline = NOW() + (ttl_seconds || ' seconds')::interval \
WHERE id = $1 AND ttl_seconds IS NOT NULL AND archived_at IS NULL AND deleted_at IS NULL",
)
.bind(channel_id)
.execute(pool)
.await?;
Ok(())
}

/// Archive ephemeral channels whose TTL deadline has passed.
///
/// Returns the list of channel IDs that were archived. Idempotent — the
/// `archived_at IS NULL` guard prevents double-archiving even if called
/// concurrently from multiple relay pods.
pub async fn reap_expired_ephemeral_channels(pool: &PgPool) -> Result<Vec<Uuid>> {
let rows = sqlx::query(
"UPDATE channels SET archived_at = NOW() \
WHERE ttl_seconds IS NOT NULL \
AND ttl_deadline < NOW() \
AND archived_at IS NULL \
AND deleted_at IS NULL \
RETURNING id",
)
.fetch_all(pool)
.await?;

rows.into_iter()
.map(|row| {
let id: Uuid = row.try_get("id")?;
Ok(id)
})
.collect()
}
2 changes: 2 additions & 0 deletions crates/sprout-db/src/dm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,8 @@ fn row_to_channel_record(row: sqlx::postgres::PgRow) -> Result<ChannelRecord> {
purpose: row.try_get("purpose").unwrap_or(None),
purpose_set_by: row.try_get("purpose_set_by").unwrap_or(None),
purpose_set_at: row.try_get("purpose_set_at").unwrap_or(None),
ttl_seconds: row.try_get("ttl_seconds").unwrap_or(None),
ttl_deadline: row.try_get("ttl_deadline").unwrap_or(None),
})
}

Expand Down
15 changes: 15 additions & 0 deletions crates/sprout-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ impl Db {
visibility: channel::ChannelVisibility,
description: Option<&str>,
created_by: &[u8],
ttl_seconds: Option<i32>,
) -> Result<channel::ChannelRecord> {
channel::create_channel(
&self.pool,
Expand All @@ -306,13 +307,15 @@ impl Db {
visibility,
description,
created_by,
ttl_seconds,
)
.await
}

/// Creates a channel with a client-supplied UUID.
///
/// Returns `(record, true)` if newly created, `(record, false)` if already exists.
#[allow(clippy::too_many_arguments)]
pub async fn create_channel_with_id(
&self,
channel_id: Uuid,
Expand All @@ -321,6 +324,7 @@ impl Db {
visibility: channel::ChannelVisibility,
description: Option<&str>,
created_by: &[u8],
ttl_seconds: Option<i32>,
) -> Result<(channel::ChannelRecord, bool)> {
channel::create_channel_with_id(
&self.pool,
Expand All @@ -330,6 +334,7 @@ impl Db {
visibility,
description,
created_by,
ttl_seconds,
)
.await
}
Expand Down Expand Up @@ -465,6 +470,16 @@ impl Db {
channel::get_member_role(&self.pool, channel_id, pubkey).await
}

/// Bump the TTL deadline for an ephemeral channel after a new message.
pub async fn bump_ttl_deadline(&self, channel_id: Uuid) -> Result<()> {
channel::bump_ttl_deadline(&self.pool, channel_id).await
}

/// Archive ephemeral channels whose TTL deadline has passed.
pub async fn reap_expired_ephemeral_channels(&self) -> Result<Vec<Uuid>> {
channel::reap_expired_ephemeral_channels(&self.pool).await
}

// ── Users ────────────────────────────────────────────────────────────────

/// Ensure a user record exists (upsert).
Expand Down
2 changes: 2 additions & 0 deletions crates/sprout-relay/src/api/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ fn channel_record_to_json(
"participants": participants,
"participant_pubkeys": participant_pubkeys,
"is_member": is_member,
"ttl_seconds": channel.ttl_seconds,
"ttl_deadline": channel.ttl_deadline.map(|t| t.to_rfc3339()),
})
}

Expand Down
2 changes: 2 additions & 0 deletions crates/sprout-relay/src/api/channels_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ fn channel_detail_to_json(record: &ChannelRecord, member_count: i64) -> serde_js
"topic_required": record.topic_required,
"max_members": record.max_members,
"nip29_group_id": record.nip29_group_id,
"ttl_seconds": record.ttl_seconds,
"ttl_deadline": record.ttl_deadline.map(|t| t.to_rfc3339()),
})
}

Expand Down
20 changes: 20 additions & 0 deletions crates/sprout-relay/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ pub struct Config {
pub pubkey_allowlist_enabled: bool,
/// Media storage configuration (S3/MinIO).
pub media: sprout_media::MediaConfig,

/// Optional override for ephemeral channel TTL (in seconds).
/// When set, any channel created with a TTL tag will use this value instead
/// of the client-provided one. Useful for testing ephemeral expiry quickly.
/// Example: `SPROUT_EPHEMERAL_TTL_OVERRIDE=60` → all ephemeral channels expire
/// 60 seconds after the last message.
pub ephemeral_ttl_override: Option<i32>,
}

impl Config {
Expand Down Expand Up @@ -194,6 +201,18 @@ impl Config {
}),
};

let ephemeral_ttl_override = std::env::var("SPROUT_EPHEMERAL_TTL_OVERRIDE")
.ok()
.and_then(|v| v.parse::<i32>().ok())
.filter(|&v| v > 0);

if let Some(ttl) = ephemeral_ttl_override {
warn!(
"SPROUT_EPHEMERAL_TTL_OVERRIDE={ttl}s — all ephemeral channels will use \
this TTL instead of the client-provided value."
);
}

Ok(Self {
bind_addr,
database_url,
Expand All @@ -213,6 +232,7 @@ impl Config {
metrics_port,
pubkey_allowlist_enabled,
media,
ephemeral_ttl_override,
})
}
}
Expand Down
14 changes: 14 additions & 0 deletions crates/sprout-relay/src/handlers/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,8 @@ pub async fn ingest_event(
}
});

let ttl_seconds = super::resolve_ttl(&event, state.config.ephemeral_ttl_override);

let actor_bytes = event.pubkey.serialize().to_vec();
let (_, was_created) = state
.db
Expand All @@ -1012,6 +1014,7 @@ pub async fn ingest_event(
visibility,
description.as_deref(),
&actor_bytes,
ttl_seconds,
)
.await
.map_err(|e| IngestError::Internal(format!("error: {e}")))?;
Expand Down Expand Up @@ -1261,6 +1264,17 @@ pub async fn ingest_event(
});
}

// ── 20b. Bump ephemeral channel TTL deadline ──────────────────────
// Any successfully stored channel-scoped event keeps the channel alive.
// Skip kind:9007 (create) — the deadline was just set during creation.
if let Some(ch_id) = channel_id {
if kind_u32 != KIND_NIP29_CREATE_GROUP {
if let Err(e) = state.db.bump_ttl_deadline(ch_id).await {
warn!(channel = %ch_id, "TTL deadline bump failed: {e}");
}
}
}

// ── 21. Side effects ─────────────────────────────────────────────────
if crate::handlers::side_effects::is_side_effect_kind(kind_u32) {
if let Err(e) =
Expand Down
Loading
Loading