diff --git a/src/backend/app/main.py b/src/backend/app/main.py index be880c84..85c74d1e 100644 --- a/src/backend/app/main.py +++ b/src/backend/app/main.py @@ -99,3 +99,11 @@ async def lifespan(app: FastAPI): from app.routes.ws.state import router as ws_router app.include_router(ws_router) + +from app.routes.http.reverse import router as reverse_router + +app.include_router(reverse_router) + +from app.routes.ws.reverse import router as ws_reverse_router + +app.include_router(ws_reverse_router) diff --git a/src/backend/app/routes/http/download.py b/src/backend/app/routes/http/download.py index fe77c9c8..f837154f 100644 --- a/src/backend/app/routes/http/download.py +++ b/src/backend/app/routes/http/download.py @@ -13,7 +13,7 @@ router = APIRouter() -@router.get("/download/{key}") +@router.get("/download/{key:path}") async def download_files( key: str, session: SessionDep, diff --git a/src/backend/app/routes/http/reverse.py b/src/backend/app/routes/http/reverse.py new file mode 100644 index 00000000..463d689a --- /dev/null +++ b/src/backend/app/routes/http/reverse.py @@ -0,0 +1,275 @@ +import json +import uuid +from datetime import datetime, timezone +from urllib.parse import quote + +from botocore.exceptions import ClientError +from fastapi import APIRouter, Header, HTTPException, UploadFile, status, Request +from fastapi.responses import StreamingResponse +from sqlmodel import col, select +from types_aiobotocore_s3.type_defs import CompletedPartTypeDef + +from app.converter.bytes import ByteSize +from app.deps import S3Dep, SessionDep +from app.models.files import File +from app.schemas.reverse import ( + AddHostOut, + RoomCreate, + RoomCreateOut, + RoomFileEntry, + RoomOut, +) +from app.settings import settings +from app.states.room import RoomState +from app.states.app import UploadProgress +from app.tasks.clean_file import delete_expired_file +from app.tasks.persist_file import persist_file_record + +router = APIRouter(prefix="/reverse") + +CHUNK_SIZE = ByteSize(mb=8).total_bytes() + + +@router.post("/rooms", status_code=status.HTTP_201_CREATED) +async def create_room(body: RoomCreate) -> RoomCreateOut: + return await RoomState.create(name=body.name, expire_after=body.expire_after) + + +@router.get("/rooms/{room_id}") +async def get_room(room_id: str) -> RoomOut: + room = await RoomState.get(room_id) + if room is None: + raise HTTPException(status_code=404, detail="Room not found or expired") + return room + + +@router.delete("/rooms/{room_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_room( + room_id: str, + session: SessionDep, + x_host_token: str = Header(), +) -> None: + if not await RoomState.verify_host(room_id, x_host_token): + raise HTTPException(status_code=403, detail="Invalid host token") + + room = await RoomState.get(room_id) + if room is None: + raise HTTPException(status_code=404, detail="Room not found or expired") + + # Schedule file cleanup (S3 + DB) via the existing file cleanup task + if room.files: + keys = [f.key for f in room.files] + result = await session.exec(select(File).where(col(File.key).in_(keys))) + for file_record in result.all(): + delete_expired_file.delay(str(file_record.id)) + + await RoomState.publish_event(room_id, json.dumps({"type": "room_destroyed"})) + await RoomState.delete(room_id) + + +@router.post("/rooms/{room_id}/hosts", status_code=status.HTTP_201_CREATED) +async def add_host( + room_id: str, + x_host_token: str = Header(), +) -> AddHostOut: + """An existing host can invite a new host to the room.""" + if not await RoomState.verify_host(room_id, x_host_token): + raise HTTPException(status_code=403, detail="Invalid host token") + + result = await RoomState.add_host(room_id) + if result is None: + raise HTTPException(status_code=404, detail="Room not found or expired") + return result + + +@router.post("/rooms/{room_id}/upload") +async def upload_file_to_room( + room_id: str, + file: UploadFile, + file: UploadFile, + request: Request, + s3: S3Dep, + x_host_token: str = Header(), +) -> RoomFileEntry: + if not await RoomState.verify_host(room_id, x_host_token): + raise HTTPException(status_code=403, detail="Invalid host token") + + room = await RoomState.get(room_id) + if room is None: + raise HTTPException(status_code=404, detail="Room not found or expired") + + # Check room hasn't expired by time + if datetime.now(timezone.utc) > room.expires_at: + raise HTTPException(status_code=410, detail="Room has expired") + + # Namespace S3 key under the room + file_key = f"rooms/{room_id}/{uuid.uuid7()}" + filename = file.filename or str(uuid.uuid7()) + + # Track upload progress in global AppState so websockets and other + # observers can see in-flight uploads (mirrors upload.py behavior). + await UploadProgress.start(upload_key=file_key, filename=filename) + + # Multipart upload to RustFS + # Store filename in S3 metadata so the shareable download link works + # even after the room expires. + resp = await s3.create_multipart_upload( + Bucket=settings.RUSTFS_BUCKET_NAME, + Key=file_key, + ContentType=file.content_type or "application/octet-stream", + Metadata={"filename": filename}, + ) + upload_id = resp["UploadId"] + # boto3 type defs expect a sequence of CompletedPartTypeDef + parts: list[CompletedPartTypeDef] = [] + part_number = 1 + uploaded_size = 0 + + try: + while True: + chunk = await file.read(CHUNK_SIZE) + if not chunk: + break + + part_resp = await s3.upload_part( + Bucket=settings.RUSTFS_BUCKET_NAME, + Key=file_key, + UploadId=upload_id, + PartNumber=part_number, + Body=chunk, + ) + part_entry: CompletedPartTypeDef = { + "ETag": part_resp["ETag"], + "PartNumber": part_number, + } + parts.append(part_entry) + uploaded_size += len(chunk) + part_number += 1 + + # Update upload progress in global state + await UploadProgress.update( + upload_key=file_key, uploaded_bytes=uploaded_size + ) + + # If the client disconnected mid-upload, abort and evict + if await request.is_disconnected(): + await s3.abort_multipart_upload( + Bucket=settings.RUSTFS_BUCKET_NAME, + Key=file_key, + UploadId=upload_id, + ) + await UploadProgress.cancel(upload_key=file_key) + raise HTTPException(status_code=499, detail="Client disconnected") + + if not parts: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Empty file", + ) + + await s3.complete_multipart_upload( + Bucket=settings.RUSTFS_BUCKET_NAME, + Key=file_key, + UploadId=upload_id, + MultipartUpload={"Parts": parts}, + ) + await UploadProgress.finish(upload_key=file_key, final_size=uploaded_size) + except HTTPException: + # Ensure failed uploads are removed from AppState + await UploadProgress.cancel(upload_key=file_key) + raise + except Exception: + await s3.abort_multipart_upload( + Bucket=settings.RUSTFS_BUCKET_NAME, + Key=file_key, + UploadId=upload_id, + ) + # Ensure failed uploads are removed from AppState + await UploadProgress.cancel(upload_key=file_key) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Upload failed", + ) + + now = datetime.now(timezone.utc) + + # Persist File record + schedule cleanup in background (non-blocking) + persist_file_record.delay( + key=file_key, + filename=filename, + size=uploaded_size, + created_at=now.isoformat(), + expires_at=room.expires_at.isoformat(), + content_type=file.content_type or "application/octet-stream", + ) + + entry = RoomFileEntry( + key=file_key, + filename=filename, + size=uploaded_size, + uploaded_at=now, + download_url=f"{settings.ROOT_PATH}/download/{file_key}", + ) + + # Persist in room state + fan-out event via pub/sub (atomic append) + added = await RoomState.add_file(room_id, entry) + if not added: + # Room vanished mid-upload — clean up the S3 object + await s3.delete_object(Bucket=settings.RUSTFS_BUCKET_NAME, Key=file_key) + # Remove in-flight upload from global state + await UploadProgress.cancel(upload_key=file_key) + raise HTTPException(status_code=404, detail="Room expired during upload") + + return entry + + +@router.get("/rooms/{room_id}/files") +async def list_room_files(room_id: str) -> list[RoomFileEntry]: + """Late-joining clients call this to discover all files already in the room.""" + room = await RoomState.get(room_id) + if room is None: + raise HTTPException(status_code=404, detail="Room not found or expired") + return room.files + + +@router.get("/rooms/{room_id}/files/{file_key:path}/download") +async def download_room_file( + room_id: str, + file_key: str, + s3: S3Dep, +) -> StreamingResponse: + """Stream a room file from RustFS. Designed for late or slow clients.""" + room = await RoomState.get(room_id) + if room is None: + raise HTTPException(status_code=404, detail="Room not found or expired") + + # Verify the file belongs to this room + entry = next((f for f in room.files if f.key == file_key), None) + if entry is None: + raise HTTPException(status_code=404, detail="File not found in room") + + try: + s3_response = await s3.get_object( + Bucket=settings.RUSTFS_BUCKET_NAME, Key=file_key + ) + except ClientError as e: + code = e.response.get("Error", {}).get("Code") + if code == "NoSuchKey": + raise HTTPException(status_code=404, detail="File not found in storage") + raise HTTPException(status_code=500, detail="Storage error") + + async def _stream(): + try: + async for chunk in s3_response["Body"]: + yield chunk + finally: + s3_response["Body"].close() + + safe_filename = quote(entry.filename) + return StreamingResponse( + _stream(), + media_type=s3_response.get("ContentType", "application/octet-stream"), + headers={ + "Content-Disposition": f"attachment; filename*=UTF-8''{safe_filename}", + }, + ) diff --git a/src/backend/app/routes/http/speedtest.py b/src/backend/app/routes/http/speedtest.py index 93b84d15..aa6e006a 100644 --- a/src/backend/app/routes/http/speedtest.py +++ b/src/backend/app/routes/http/speedtest.py @@ -7,14 +7,14 @@ from app.schemas.speedtest import UploadPayload from app.settings import settings -router = APIRouter() +router = APIRouter(prefix="/speedtest") # 1 MB chunk of random data CHUNK_SIZE = 1024 * 1024 RANDOM_BYTES = os.urandom(CHUNK_SIZE) -@router.get("/speedtest/download", tags=["Speedtest"]) +@router.get("/download", tags=["Speedtest"]) async def speedtest_download( size: Annotated[ int, @@ -59,7 +59,7 @@ async def iter_content() -> AsyncIterator[bytes]: ) -@router.post("/speedtest/upload", tags=["Speedtest"]) +@router.post("/upload", tags=["Speedtest"]) async def speedtest_upload(request: Request): """ Upload speedtest endpoint. diff --git a/src/backend/app/routes/http/token.py b/src/backend/app/routes/http/token.py index 7f24821b..8621e050 100644 --- a/src/backend/app/routes/http/token.py +++ b/src/backend/app/routes/http/token.py @@ -2,10 +2,10 @@ from app.deps import CurrentUser -router = APIRouter() +router = APIRouter(prefix="/token") -@router.get("/token/validate", response_model=dict) +@router.get("/validate", response_model=dict) async def validate_token(_: CurrentUser): """ Validates the JWT and returns the corresponding user info. diff --git a/src/backend/app/routes/http/upload.py b/src/backend/app/routes/http/upload.py index 8f9004a9..9c42c0ce 100644 --- a/src/backend/app/routes/http/upload.py +++ b/src/backend/app/routes/http/upload.py @@ -6,6 +6,7 @@ APIRouter, BackgroundTasks, Form, + Request, HTTPException, UploadFile, status, @@ -53,6 +54,7 @@ async def upload_file( filename: Annotated[str | None, Form()], expire_after_n_download: Annotated[int, Form()], expire_after: Annotated[int, Form()], + request: Request, # Dependency Injection s3: S3Dep, session: SessionDep, @@ -141,6 +143,16 @@ async def upload_file( upload_key=str(key), uploaded_bytes=uploaded_size ) + # If the client disconnected mid-upload, abort and evict + if await request.is_disconnected(): + await s3.abort_multipart_upload( + Bucket=settings.RUSTFS_BUCKET_NAME, + Key=str(key), + UploadId=upload_id, + ) + await UploadProgress.cancel(upload_key=str(key)) + raise HTTPException(status_code=499, detail="Client disconnected") + await s3.complete_multipart_upload( Bucket=settings.RUSTFS_BUCKET_NAME, Key=str(key), diff --git a/src/backend/app/routes/ws/reverse.py b/src/backend/app/routes/ws/reverse.py new file mode 100644 index 00000000..15ee0f3f --- /dev/null +++ b/src/backend/app/routes/ws/reverse.py @@ -0,0 +1,207 @@ +import asyncio +import json +import logging +from contextlib import suppress + +import aioboto3 +import redis.asyncio as aioredis +from botocore.exceptions import ClientError +from fastapi import APIRouter, WebSocket, WebSocketDisconnect + +from app.schemas.reverse import RoomFileEntry, RoomFileEvent +from app.settings import settings +from app.states.room import RoomState + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +async def _stream_file_to_ws(ws: WebSocket, entry: RoomFileEntry) -> None: + """Fetch a file from RustFS and relay every chunk to *ws*. + + Mirrors the streaming pattern in ``download.py`` — the client receives raw + bytes without ever knowing about the backing S3 store. + """ + session = aioboto3.Session() + async with session.client( + "s3", + endpoint_url=settings.RUSTFS_ENDPOINT_URL, + aws_access_key_id=settings.RUSTFS_ACCESS_KEY, + aws_secret_access_key=settings.RUSTFS_SECRET_ACCESS_KEY, + ) as s3: + try: + s3_response = await s3.get_object( + Bucket=settings.RUSTFS_BUCKET_NAME, Key=entry.key + ) + except ClientError: + await ws.send_text( + json.dumps( + { + "type": "file_error", + "key": entry.key, + "detail": "Not found in storage", + } + ) + ) + return + + # Header — tells the client which file is coming and how large it is + await ws.send_text( + json.dumps( + { + "type": "file_start", + "key": entry.key, + "filename": entry.filename, + "size": entry.size, + } + ) + ) + + # Binary chunks — same as ``async for chunk in s3_response["Body"]`` + try: + async for chunk in s3_response["Body"]: + await ws.send_bytes(chunk) + finally: + s3_response["Body"].close() + + # Footer — signals the client that this file is complete + await ws.send_text(json.dumps({"type": "file_end", "key": entry.key})) + + +@router.websocket("/ws/reverse/rooms/{room_id}") +async def room_ws(ws: WebSocket, room_id: str, host_token: str | None = None): + # Quick existence check before accepting the connection + room = await RoomState.get(room_id) + if room is None: + await ws.close(code=4004, reason="Room not found or expired") + return + + # Determine if this connection is the host + is_host = False + if host_token is not None: + is_host = await RoomState.verify_host(room_id, host_token) + if not is_host: + await ws.close(code=4003, reason="Invalid host token") + return + + await ws.accept() + + # ── subscribe FIRST so no events are lost between snapshot and listen ── + sub_client = aioredis.from_url( + settings.REDIS_ENDPOINT, encoding="utf-8", decode_responses=True + ) + pubsub = sub_client.pubsub() + channel = RoomState.channel_for(room_id) + await pubsub.subscribe(channel) + + # ── snapshot (re-read after subscribe to guarantee consistency) ─── + room = await RoomState.get(room_id) + if room is None: + await pubsub.unsubscribe(channel) + await pubsub.aclose() + await sub_client.aclose() + await ws.close(code=4004, reason="Room expired") + return + + await ws.send_text( + json.dumps({"type": "snapshot", "room": room.model_dump(mode="json")}) + ) + + # Queue that serialises all file streams (catch-up + real-time). + # Existing files from the snapshot are enqueued first; new events + # arriving through pub/sub are appended in order. + file_queue: asyncio.Queue[RoomFileEntry | None] = asyncio.Queue() + seen_keys: set[str] = set() + + # Enqueue existing files so late clients get them streamed + for entry in room.files: + seen_keys.add(entry.key) + file_queue.put_nowait(entry) + + async def _listen_events() -> None: + """Read room events from Redis pub/sub and either queue file + streams or forward lightweight notifications. + + Events for files already in the snapshot are deduplicated via + ``seen_keys`` so late clients don't receive the same file twice. + """ + try: + async for message in pubsub.listen(): + if message["type"] != "message": + continue + + raw = message["data"] + try: + data = json.loads(raw) + except Exception: + continue + + # Host destroyed the room — notify client and close + if data.get("type") == "room_destroyed": + await ws.send_text(json.dumps({"type": "room_destroyed"})) + file_queue.put_nowait(None) + await ws.close(code=4001, reason="Room destroyed by host") + return + + # Host count changed — forward directly + if data.get("type") == "host_count": + await ws.send_text(json.dumps(data)) + continue + + try: + event = RoomFileEvent.model_validate(data) + except Exception: + logger.exception("Bad event on room channel %s", room_id) + continue + + if event.event == "file_added": + if event.file.key in seen_keys: + # Already queued from the snapshot — skip duplicate + continue + seen_keys.add(event.file.key) + file_queue.put_nowait(event.file) + elif event.event == "file_removed": + await ws.send_text( + json.dumps( + { + "type": "file_removed", + "key": event.file.key, + "filename": event.file.filename, + } + ) + ) + except WebSocketDisconnect, asyncio.CancelledError: + pass + + # ── streamer — pulls from queue and streams one file at a time ─── + async def _stream_queued_files() -> None: + try: + while True: + entry = await file_queue.get() + if entry is None: + break + await _stream_file_to_ws(ws, entry) + except WebSocketDisconnect, asyncio.CancelledError: + pass + + listen_task = asyncio.create_task(_listen_events()) + stream_task = asyncio.create_task(_stream_queued_files()) + + try: + # Keep alive — read (and discard) client pings / messages + while True: + await ws.receive_text() + except WebSocketDisconnect: + pass + finally: + listen_task.cancel() + file_queue.put_nowait(None) # unblock streamer + stream_task.cancel() + with suppress(asyncio.CancelledError): + await listen_task + with suppress(asyncio.CancelledError): + await stream_task + await pubsub.unsubscribe(channel) + await pubsub.aclose() + await sub_client.aclose() diff --git a/src/backend/app/schemas/reverse.py b/src/backend/app/schemas/reverse.py new file mode 100644 index 00000000..a555d108 --- /dev/null +++ b/src/backend/app/schemas/reverse.py @@ -0,0 +1,42 @@ +from datetime import datetime + +from pydantic import BaseModel + + +class RoomFileEntry(BaseModel): + key: str + filename: str + size: int + uploaded_at: datetime + download_url: str + + +class RoomOut(BaseModel): + id: str + name: str + created_at: datetime + expires_at: datetime + files: list[RoomFileEntry] = [] + host_count: int = 1 + + +class RoomCreateOut(RoomOut): + """Returned only once at room creation — includes the host secret.""" + + host_token: str + + +class RoomCreate(BaseModel): + name: str + expire_after: int # seconds from now + + +class AddHostOut(BaseModel): + host_token: str + + +class RoomFileEvent(BaseModel): + """Pushed to WebSocket subscribers on file activity.""" + + event: str # "file_added" | "file_removed" + file: RoomFileEntry diff --git a/src/backend/app/settings.py b/src/backend/app/settings.py index c73f4ef7..e2807397 100644 --- a/src/backend/app/settings.py +++ b/src/backend/app/settings.py @@ -74,6 +74,8 @@ def SQLALCHEMY_DATABASE_URI(self) -> PostgresDsn | str: STATE_REDIS_KEY: str = "chithi:global_state" STATE_CHANNEL: str = "chithi:state_changed" + # Upload cleanup: stale in-flight uploads older than this (seconds) are cleared + UPLOAD_STALE_SECONDS: int = 600 settings = Settings() # type: ignore diff --git a/src/backend/app/states/app.py b/src/backend/app/states/app.py index 39fe451d..e836e7ef 100644 --- a/src/backend/app/states/app.py +++ b/src/backend/app/states/app.py @@ -1,4 +1,5 @@ from typing import Any +from datetime import datetime, timezone import redis.asyncio as redis from pydantic import BaseModel @@ -14,46 +15,68 @@ class UploadProgress(GlobalState, BaseModel): filename: str uploaded_bytes: int = 0 done: bool = False + last_updated: datetime | None = None @classmethod - async def start(cls, upload_key: str, filename: str) -> AppState: - """Register a new in-flight upload.""" + async def start(cls, upload_key: str, filename: str) -> "AppState": + """Register a new in-flight upload and set last_updated.""" current = await AppState.get() - current.active_uploads.append(cls(upload_key=upload_key, filename=filename)) + now = datetime.now(timezone.utc) + current.active_uploads.append( + cls(upload_key=upload_key, filename=filename, last_updated=now) + ) await AppState.set(current) return current @classmethod - async def update(cls, upload_key: str, uploaded_bytes: int) -> AppState: - """Update the byte count for an in-flight upload.""" + async def update(cls, upload_key: str, uploaded_bytes: int) -> "AppState": + """Update the byte count and touch last_updated for an in-flight upload.""" current = await AppState.get() + now = datetime.now(timezone.utc) for upload in current.active_uploads: if upload.upload_key == upload_key: + # Add the delta to total_space_used so partial uploads are accounted + delta = uploaded_bytes - (upload.uploaded_bytes or 0) + if delta > 0: + current.total_space_used += delta upload.uploaded_bytes = uploaded_bytes + upload.last_updated = now break await AppState.set(current) return current @classmethod - async def finish(cls, upload_key: str, final_size: int) -> AppState: - """Mark an upload as done and add its size to total_space_used.""" + async def finish(cls, upload_key: str, final_size: int) -> "AppState": + """Mark an upload as done, touch last_updated, and add its size to total_space_used.""" current = await AppState.get() + now = datetime.now(timezone.utc) for upload in current.active_uploads: if upload.upload_key == upload_key: + # If we tracked partial bytes earlier, only add the remaining delta + delta = final_size - (upload.uploaded_bytes or 0) + if delta > 0: + current.total_space_used += delta upload.done = True upload.uploaded_bytes = final_size + upload.last_updated = now break - current.total_space_used += final_size await AppState.set(current) return current @classmethod - async def cancel(cls, upload_key: str) -> AppState: - """Remove a failed/disconnected upload from the active list.""" + async def cancel(cls, upload_key: str) -> "AppState": + """Remove a failed/disconnected upload from the active list without adjusting totals.""" current = await AppState.get() - current.active_uploads = [ - u for u in current.active_uploads if u.upload_key != upload_key - ] + # Subtract any tracked uploaded bytes from totals when cancelling + remaining = [] + freed = 0 + for u in current.active_uploads: + if u.upload_key == upload_key: + freed += u.uploaded_bytes or 0 + else: + remaining.append(u) + current.active_uploads = remaining + current.total_space_used = max(0, current.total_space_used - freed) await AppState.set(current) return current diff --git a/src/backend/app/states/room.py b/src/backend/app/states/room.py new file mode 100644 index 00000000..e184fe71 --- /dev/null +++ b/src/backend/app/states/room.py @@ -0,0 +1,196 @@ +import hashlib +import json +import secrets +import uuid +from datetime import datetime, timedelta, timezone + +from app.schemas.reverse import ( + AddHostOut, + RoomCreateOut, + RoomFileEntry, + RoomFileEvent, + RoomOut, +) +from app.states._global import GlobalState + +# Lua script: atomically remove an element from a JSON array by matching +# a field value, and return the removed element (if any). +_LUA_REMOVE_FILE = """ +local raw = redis.call('JSON.GET', KEYS[1], '$.files') +if not raw then return nil end +local outer = cjson.decode(raw) +local files = outer[1] +local removed = nil +local new_files = {} +for _, f in ipairs(files) do + if f['key'] == ARGV[1] then + removed = cjson.encode(f) + else + new_files[#new_files + 1] = f + end +end +if removed then + redis.call('JSON.SET', KEYS[1], '$.files', cjson.encode(new_files)) +end +return removed +""" + + +def _room_key(room_id: str) -> str: + return f"chithi:room:{room_id}" + + +def _room_channel(room_id: str) -> str: + return f"chithi:room:{room_id}:events" + + +class RoomState(GlobalState): + """Manage rooms stored in Redis.""" + + @classmethod + async def create(cls, name: str, expire_after: int) -> RoomCreateOut: + room_id = str(uuid.uuid7()) + now = datetime.now(timezone.utc) + + # Generate a cryptographically random host token; store its + # SHA-256 hash in Redis so the plaintext is never persisted. + host_token = secrets.token_urlsafe(32) + token_hash = hashlib.sha256(host_token.encode()).hexdigest() + + # The Redis document includes host_token_hashes (internal) but + # RoomOut (public) never exposes it. + payload = { + "id": room_id, + "name": name, + "created_at": now.isoformat(), + "expires_at": (now + timedelta(seconds=expire_after)).isoformat(), + "files": [], + "host_token_hashes": [token_hash], + } + key = _room_key(room_id) + await cls._json_set(key, payload) + await cls._client().expire(key, expire_after) + + return RoomCreateOut( + id=room_id, + name=name, + created_at=now, + expires_at=now + timedelta(seconds=expire_after), + files=[], + host_token=host_token, + ) + + @classmethod + async def get(cls, room_id: str) -> RoomOut | None: + data = await cls._json_get(_room_key(room_id)) + if data is None: + return None + # Compute host_count from internal field before stripping it + hashes = data.pop("host_token_hashes", []) + data.pop("host_token_hash", None) # compat with old format + data["host_count"] = len(hashes) if isinstance(hashes, list) else 1 + return RoomOut.model_validate(data) + + @classmethod + async def verify_host(cls, room_id: str, host_token: str) -> bool: + """Return True if *host_token* matches any stored hash for this room.""" + data = await cls._json_get(_room_key(room_id)) + if data is None: + return False + provided_hash = hashlib.sha256(host_token.encode()).hexdigest() + # Support new list format and legacy single-hash format + hashes = data.get("host_token_hashes", []) + if isinstance(hashes, list): + return any(secrets.compare_digest(h, provided_hash) for h in hashes) + stored_hash = data.get("host_token_hash", "") + return secrets.compare_digest(stored_hash, provided_hash) + + @classmethod + async def add_host(cls, room_id: str) -> AddHostOut | None: + """Generate a new host token and append its hash to the room.""" + key = _room_key(room_id) + data = await cls._json_get(key) + if data is None: + return None + + host_token = secrets.token_urlsafe(32) + token_hash = hashlib.sha256(host_token.encode()).hexdigest() + + client = cls._client() + try: + await client.execute_command( + "JSON.ARRAPPEND", + key, + "$.host_token_hashes", + json.dumps(token_hash), + ) + except Exception: + return None + + # Broadcast host count change so all clients update + hashes = data.get("host_token_hashes", []) + new_count = (len(hashes) if isinstance(hashes, list) else 1) + 1 + await cls._publish( + _room_channel(room_id), + json.dumps({"type": "host_count", "count": new_count}), + ) + + return AddHostOut(host_token=host_token) + + @classmethod + async def delete(cls, room_id: str) -> bool: + result = await cls._client().delete(_room_key(room_id)) + return result > 0 + + @classmethod + async def add_file(cls, room_id: str, entry: RoomFileEntry) -> bool: + """Atomically append a file entry to the room's files array. + + Uses ``JSON.ARRAPPEND`` so concurrent uploads from different FastAPI + instances never clobber each other. + + Returns True on success, False if the room no longer exists. + """ + key = _room_key(room_id) + client = cls._client() + try: + await client.execute_command( + "JSON.ARRAPPEND", + key, + "$.files", + json.dumps(entry.model_dump(mode="json")), + ) + except Exception: + # Key doesn't exist (room expired) or path missing + return False + + # Fan-out notification — every subscribed instance picks this up + event = RoomFileEvent(event="file_added", file=entry) + await cls._publish(_room_channel(room_id), event.model_dump_json()) + return True + + @classmethod + async def remove_file(cls, room_id: str, file_key: str) -> RoomFileEntry | None: + """Atomically remove a file by key using a Lua script. + + Returns the removed entry, or None if not found / room expired. + """ + key = _room_key(room_id) + client = cls._client() + result = await client.eval(_LUA_REMOVE_FILE, 1, key, file_key) # type: ignore[misc] + if result is None: + return None + + removed_entry = RoomFileEntry.model_validate_json(result) + + event = RoomFileEvent(event="file_removed", file=removed_entry) + await cls._publish(_room_channel(room_id), event.model_dump_json()) + return removed_entry + + @classmethod + def channel_for(cls, room_id: str) -> str: + return _room_channel(room_id) + + @classmethod + async def publish_event(cls, room_id: str, message: str) -> None: + await cls._publish(_room_channel(room_id), message) diff --git a/src/backend/app/tasks/__init__.py b/src/backend/app/tasks/__init__.py index 64fbf70c..6598a4cd 100644 --- a/src/backend/app/tasks/__init__.py +++ b/src/backend/app/tasks/__init__.py @@ -1 +1,2 @@ from .clean_file import delete_expired_file as delete_expired_file +from .persist_file import persist_file_record as persist_file_record diff --git a/src/backend/app/tasks/persist_file.py b/src/backend/app/tasks/persist_file.py new file mode 100644 index 00000000..d6a33b2b --- /dev/null +++ b/src/backend/app/tasks/persist_file.py @@ -0,0 +1,44 @@ +from datetime import datetime + +from app.celery import celery +from app.db import AsyncSessionLocal +from app.models.files import File +from app.tasks.clean_file import delete_expired_file + + +@celery.task +async def persist_file_record( + key: str, + filename: str, + size: int, + created_at: str, + expires_at: str, + content_type: str, +) -> str: + """Create a File record in the DB and schedule its cleanup. + + Runs as a Celery background task so the upload HTTP response is not + blocked by database I/O. + """ + created = datetime.fromisoformat(created_at) + expires = datetime.fromisoformat(expires_at) + + file_record = File( + key=key, + filename=filename, + size=size, + created_at=created, + expires_at=expires, + expire_after_n_download=2**31 - 1, # effectively unlimited + download_count=0, + ) + + async with AsyncSessionLocal() as session: + session.add(file_record) + await session.commit() + await session.refresh(file_record) + + # Schedule file cleanup at expiry + delete_expired_file.apply_async((str(file_record.id),), eta=expires) + + return str(file_record.id) diff --git a/src/backend/pyproject.toml b/src/backend/pyproject.toml index 6d809790..4eaf8e75 100644 --- a/src/backend/pyproject.toml +++ b/src/backend/pyproject.toml @@ -7,13 +7,13 @@ requires-python = ">=3.14" dependencies = [ # Fastapi "fastapi>=0.127.0", - "python-multipart>=0.0.21", # Faster Loops "uvloop>=0.22.1; sys_platform == 'linux'", # ASGI server "uvicorn>=0.40.0", "websockets>=15.0.1", - # Password Hashinh + "python-multipart>=0.0.21", + # Password Hashing "pwdlib[argon2]>=0.3.0", # Pydantic based settings manager "pydantic-settings>=2.12.0", @@ -32,17 +32,17 @@ dependencies = [ "celery>=5.6.1", "celery-aio-pool>=0.1.0rc8", # Redis for caching + "celery-types>=0.24.0", "redis[hiredis]>=6.4.0", # Mandatory type "types-aiobotocore-s3>=2.25.2", + "types-aioboto3>=15.5.0", ] [dependency-groups] dev = [ - "celery-types>=0.24.0", "fastapi-cli>=0.0.16", "ruff>=0.14.10", - "types-aioboto3>=15.5.0", # Task runner "poethepoet>=0.39.0", ] diff --git a/src/backend/scripts/start_celery_beat.sh b/src/backend/scripts/start_celery_beat.sh new file mode 100644 index 00000000..49db1ebb --- /dev/null +++ b/src/backend/scripts/start_celery_beat.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +export CELERY_CUSTOM_WORKER_POOL='celery_aio_pool.pool:AsyncIOPool' +exec celery -A app.celery beat --loglevel=info \ No newline at end of file diff --git a/src/backend/uv.lock b/src/backend/uv.lock index cb071393..6f7c4953 100644 --- a/src/backend/uv.lock +++ b/src/backend/uv.lock @@ -395,6 +395,7 @@ dependencies = [ { name = "alembic" }, { name = "celery" }, { name = "celery-aio-pool" }, + { name = "celery-types" }, { name = "fastapi" }, { name = "pwdlib", extra = ["argon2"] }, { name = "pydantic-settings" }, @@ -404,6 +405,7 @@ dependencies = [ { name = "sqlalchemy", extra = ["postgresql-asyncpg"] }, { name = "sqlmodel" }, { name = "typer" }, + { name = "types-aioboto3" }, { name = "types-aiobotocore-s3" }, { name = "uvicorn" }, { name = "uvloop", marker = "sys_platform == 'linux'" }, @@ -412,11 +414,9 @@ dependencies = [ [package.dev-dependencies] dev = [ - { name = "celery-types" }, { name = "fastapi-cli" }, { name = "poethepoet" }, { name = "ruff" }, - { name = "types-aioboto3" }, ] [package.metadata] @@ -425,6 +425,7 @@ requires-dist = [ { name = "alembic", specifier = ">=1.17.2" }, { name = "celery", specifier = ">=5.6.1" }, { name = "celery-aio-pool", specifier = ">=0.1.0rc8" }, + { name = "celery-types", specifier = ">=0.24.0" }, { name = "fastapi", specifier = ">=0.127.0" }, { name = "pwdlib", extras = ["argon2"], specifier = ">=0.3.0" }, { name = "pydantic-settings", specifier = ">=2.12.0" }, @@ -434,6 +435,7 @@ requires-dist = [ { name = "sqlalchemy", extras = ["postgresql-asyncpg"], specifier = ">=2.0.45" }, { name = "sqlmodel", specifier = ">=0.0.27" }, { name = "typer", specifier = ">=0.20.1" }, + { name = "types-aioboto3", specifier = ">=15.5.0" }, { name = "types-aiobotocore-s3", specifier = ">=2.25.2" }, { name = "uvicorn", specifier = ">=0.40.0" }, { name = "uvloop", marker = "sys_platform == 'linux'", specifier = ">=0.22.1" }, @@ -442,11 +444,9 @@ requires-dist = [ [package.metadata.requires-dev] dev = [ - { name = "celery-types", specifier = ">=0.24.0" }, { name = "fastapi-cli", specifier = ">=0.0.16" }, { name = "poethepoet", specifier = ">=0.39.0" }, { name = "ruff", specifier = ">=0.14.10" }, - { name = "types-aioboto3", specifier = ">=15.5.0" }, ] [[package]] diff --git a/src/frontend/src/lib/components/ui/sonner/sonner.svelte b/src/frontend/src/lib/components/ui/sonner/sonner.svelte index f75e34c4..742b14d7 100644 --- a/src/frontend/src/lib/components/ui/sonner/sonner.svelte +++ b/src/frontend/src/lib/components/ui/sonner/sonner.svelte @@ -1,7 +1,7 @@ + +
+
+
+

Chithi

+

+ Encrypt and send files with a link that automatically expires. What would you like to do? +

+
+ + + + {#if showReconnect} + + + + + Reconnect to Room + + + Paste the host link you received when creating the room (the URL with the # token). + + + +
+ + e.key === 'Enter' && handleReconnect()} + /> +
+
+ + + + +
+ {/if} +
+
diff --git a/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/reverse/+page.svelte b/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/reverse/+page.svelte new file mode 100644 index 00000000..446f99b7 --- /dev/null +++ b/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/reverse/+page.svelte @@ -0,0 +1,187 @@ + + +
+
+
+

Reverse File Share

+

+ Host a room to push files to everyone — clients receive them in real time or download via a + permanent link. +

+
+ + {#if landingView === 'main'} +
+ (landingView = 'create')} + > + + + + Create a Room + + + Host a room and share files. Guests receive them in real time via WebSocket. + + + + + (landingView = 'join')} + > + + + + Join a Room + + + Enter a room ID or follow a shared link to receive files from the host. + + + +
+ {:else if landingView === 'create'} + + + + + Create a Room + + + +
+ + e.key === 'Enter' && createRoom()} + /> +
+
+ + +

+ {#if expireAfter >= 3600} + {(expireAfter / 3600).toFixed(1)} hour(s) + {:else} + {Math.round(expireAfter / 60)} minute(s) + {/if} +

+
+
+ + + + +
+ {:else if landingView === 'join'} + + + + + Join a Room + + + +
+ + e.key === 'Enter' && goJoin()} + /> +
+
+ + + + +
+ {/if} +
+
diff --git a/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/reverse/[room_id]/+page.svelte b/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/reverse/[room_id]/+page.svelte new file mode 100644 index 00000000..98c9b6c9 --- /dev/null +++ b/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/reverse/[room_id]/+page.svelte @@ -0,0 +1,826 @@ + + + +{#if loadStatus === 'loading'} +
+
+ + Loading room… +
+
+ + +{:else if loadStatus === 'not_found'} +
+
+

Room Not Found

+

This room doesn't exist or has expired.

+ +
+
+ + +{:else if loadStatus === 'error'} +
+
+

Something went wrong

+

Failed to load the room. Please try again.

+
+ + +
+
+
+ + +{:else if loadStatus === 'loaded' && room} +
+ +
+
+
+

{room.name}

+ + {isHost ? 'Host' : 'Guest'} + + + + + + + {hostCount} + {hostCount === 1 ? 'host' : 'hosts'} + + + + {hostCount} host{hostCount === 1 ? '' : 's'} can upload to this room + + + + + + + {#if wsConnected} + + {:else} + + {/if} + + + {wsConnected ? 'WebSocket connected' : 'Disconnected'} + + + +
+

+ Expires: {new Date(room.expires_at).toLocaleString()} +

+
+ +
+ + + + + + {shareUrl} + + + + {#if isHost} + + + + + + Generate and copy a host invite link + + + {/if} + + +
+
+ + + + + {#if isHost} + + + + + Upload Files + + + Files you upload are pushed to all connected clients via WebSocket. A permanent download + link is also generated for each file. + + + + +
fileInput?.click()} + onkeydown={(e) => e.key === 'Enter' && fileInput?.click()} + role="button" + tabindex="0" + ondragover={(e) => e.preventDefault()} + ondrop={(e) => { + e.preventDefault(); + addFiles(e.dataTransfer?.files ?? null); + }} + > + +

Click or drop files here

+ addFiles((e.currentTarget as HTMLInputElement).files)} + /> +
+ + + {#if pendingFiles.length > 0} +
+

+ Queued — {formatFileSize(totalUploadSize)} +

+ {#each pendingFiles as file, i} +
+ + {file.name} + {formatFileSize(file.size)} + +
+ {/each} +
+ {/if} + + + {#if isUploading || (uploads.length > 0 && completedUploads < totalUploads)} +
+
+ Overall — {completedUploads}/{totalUploads} files + {overallProgress.current.toFixed(0)}% +
+ +
+ {/if} + + + {#if uploads.length > 0} +
+ {#each uploads as u} +
+
+ + {u.file.name} + {formatFileSize(u.file.size)} + {#if u.status === 'done'} + + {:else if u.status === 'error'} + + {:else if u.status === 'uploading'} + + {/if} +
+ {#if u.status === 'uploading' || u.status === 'pending'} + + {/if} + {#if u.status === 'done' && u.entry} + {@const fileKey = u.entry.key} + {@const downloadUrl = u.entry.download_url} +
+ {downloadUrl} + + + + +
+ {/if} +
+ {/each} +
+ {/if} +
+ + + +
+ {/if} + + + {#if !isHost && receiveState.type === 'streaming'} + + + + + Receiving: {receiveState.filename} + + + +
+ + {formatFileSize(receiveState.received)} / {formatFileSize(receiveState.size)} + + {streamProgress.toFixed(0)}% +
+ +
+
+ {/if} + + + + + + + + {isHost ? 'Shared Files' : 'Available Files'} + + {roomFiles.length} + + {#if !isHost} + + Files are streamed to you automatically. Use the download links to save them — links + remain valid until the room expires. + + {/if} + + + {#if roomFiles.length === 0} +
+ + {isHost ? 'No files uploaded yet.' : 'Waiting for host to share files…'} +
+ {:else} +
+ {#each roomFiles as f} + {@const downloaded = downloadedFiles.find((d) => d.key === f.key)} +
+
+ +
+

{f.filename}

+

{formatFileSize(f.size)}

+
+ + + + + + + + Copy permanent download link + + + + + {#if f.key} + + + + {/if} + + + {#if !isHost && downloaded?.objectUrl} + + + + {/if} +
+
+ {/each} +
+ {/if} +
+
+ + + {#if !isHost && downloadedFiles.length > 0} + + + Received Files + + Files streamed to you this session. "Save" downloads from browser memory, "Permalink" + fetches from the server link — which remains valid until the room expires. + + + +
+ {#each downloadedFiles as df} +
+ +
+

{df.filename}

+

{formatFileSize(df.size)}

+
+ {#if df.objectUrl} + + + + {/if} + {#if df.key} + + + + {/if} +
+ {/each} +
+
+
+ {/if} +
+{/if} diff --git a/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/reverse/[room_id]/+page.ts b/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/reverse/[room_id]/+page.ts new file mode 100644 index 00000000..42a828c1 --- /dev/null +++ b/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/reverse/[room_id]/+page.ts @@ -0,0 +1 @@ +export const trailingSlash = 'ignore'; diff --git a/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/(upload)/+page.svelte b/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/upload/+page.svelte similarity index 100% rename from src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/(upload)/+page.svelte rename to src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/upload/+page.svelte diff --git a/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/(upload)/+page.ts b/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/upload/+page.ts similarity index 100% rename from src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/(upload)/+page.ts rename to src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/upload/+page.ts diff --git a/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/(upload)/recent_upload.svelte b/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/upload/recent_upload.svelte similarity index 100% rename from src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/(upload)/recent_upload.svelte rename to src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/upload/recent_upload.svelte diff --git a/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/(upload)/upload_showcase.svelte b/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/upload/upload_showcase.svelte similarity index 98% rename from src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/(upload)/upload_showcase.svelte rename to src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/upload/upload_showcase.svelte index fbe19b59..e80a74ee 100644 --- a/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/(upload)/upload_showcase.svelte +++ b/src/frontend/src/routes/(needs_onboarding)/(navbar_and_footer)/upload/upload_showcase.svelte @@ -267,7 +267,7 @@ {/if} - + {#if uploadPcts.length > 0} {@const activeStartPct = finishedPct.current > 0 ? Math.max(finishedPct.current, MIN_SEGMENT_PCT) : 0} @@ -285,7 +285,7 @@ > {/if} - + {#if appState.current.total_available_space} {@const filledPct = (finishedPct.current > 0 ? Math.max(finishedPct.current, MIN_SEGMENT_PCT) : 0) +