Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions controller/breeder_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ def _assign_watermark_slot(self, breeder_config):
breeder_list = self.metadata_repo.fetch_breeders_list()
if breeder_list:
for row in breeder_list:
# breeder_list returns (id, name, creation_tsz) — fetch config separately
existing_breeder_id = row[0]
meta_row = self.metadata_repo.fetch_meta_data(existing_breeder_id)
if meta_row and len(meta_row) > 0:
Expand All @@ -250,21 +251,16 @@ def _assign_watermark_slot(self, breeder_config):
slot = existing_config.get('breeder', {}).get('watermark_slot')
if slot is not None:
used_slots.add(slot)
logger.info(f"Existing watermark slots in use: {sorted(used_slots)}")
except Exception as e:
logger.error(f"Failed to read existing breeder slots from metadata DB: {e}")
raise RuntimeError(f"Cannot assign watermark slot — metadata DB unavailable: {e}") from e
logger.warning(f"Could not read existing breeder slots: {e}. Falling back.")

if len(used_slots) < max_slots:
# Assign lowest available slot
assigned_slot = min(s for s in range(max_slots) if s not in used_slots)
breeder_config['breeder']['watermark_slot'] = assigned_slot
logger.info(f"Assigned watermark slot {assigned_slot} (used: {sorted(used_slots)})")
else:
raise RuntimeError(
f"All {max_slots} watermark slots in use ({sorted(used_slots)}). "
f"Cannot create breeder without a collision-free slot."
)
logger.warning(f"All {max_slots} watermark slots in use. Breeder may share frequencies.")

def _resolve_target_refs(self, breeder_config):
"""Resolve targetRefs to inline targets from the targets catalog
Expand Down Expand Up @@ -469,6 +465,12 @@ def create_breeder(self, breeder_config, name):
meta_state=breeder_config
)

# Create detection_rounds table and insert a round for this breeder.
# Each breeder gets a round as sender — breeders coordinate through
# this table: sender impulses, receivers hold still.
self.archive_repo.ensure_detection_rounds_table()
self.archive_repo.insert_detection_round(sender_id=breeder_uuid)

# Launch worker scripts with error handling
worker_launch_failures = []
target_count = 0
Expand Down
39 changes: 39 additions & 0 deletions controller/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,45 @@ def drop_database(self, breeder_id):
execute_ddl_query(db_config, query)
logger.info(f"Dropped archive database: {breeder_id}")

def ensure_detection_rounds_table(self):
"""Create the detection_rounds table in archive_db if it doesn't exist.

The detection_rounds table coordinates impulse detection between breeders.
Each row represents a round where one breeder (sender) pushes an impulse
while all others hold still. The controller creates this table and inserts
rows at breeder creation time.
"""
db_config = self.base_config.copy()
db_config['database'] = "archive_db"

query = """
CREATE TABLE IF NOT EXISTS detection_rounds (
round_id SERIAL PRIMARY KEY,
sender_id VARCHAR(255) NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
created_at TIMESTAMPTZ DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_detection_rounds_active
ON detection_rounds (status) WHERE status = 'active';
"""

execute_query(db_config, query)
logger.info("Ensured detection_rounds table exists in archive_db")

def insert_detection_round(self, sender_id):
"""Insert a detection round for a sender breeder.

Args:
sender_id: UUID of the breeder that will send the impulse
"""
db_config = self.base_config.copy()
db_config['database'] = "archive_db"

query = f"INSERT INTO detection_rounds (sender_id) VALUES ('{sender_id}');"
execute_query(db_config, query)
logger.info(f"Inserted detection round for sender: {sender_id}")

def get_connection_url(self, breeder_id):
"""Get PostgreSQL connection URL for a breeder database"""
return (
Expand Down
Loading