diff --git a/src/segger/io/fields.py b/src/segger/io/fields.py index 8dc30a9..26c4161 100644 --- a/src/segger/io/fields.py +++ b/src/segger/io/fields.py @@ -56,10 +56,16 @@ class XeniumBoundaryFields: @dataclass class MerscopeTranscriptFields: filename: str = 'detected_transcripts.csv' + fallback_filename: str = 'transcripts.parquet' x: str = 'global_x' y: str = 'global_y' feature: str = 'gene' cell_id: str = 'cell_id' + cell_boundary_id: str = 'cell_boundaries_id' + nucleus_boundary_id: str = 'nucleus_boundaries_id' + filter_substrings = [ + 'BLANK_*', + ] @dataclass class MerscopeBoundaryFields: diff --git a/src/segger/io/preprocessor.py b/src/segger/io/preprocessor.py index 9bbe62d..2c09870 100644 --- a/src/segger/io/preprocessor.py +++ b/src/segger/io/preprocessor.py @@ -8,6 +8,7 @@ import polars as pl import pandas as pd import json +import csv import warnings import logging @@ -56,6 +57,90 @@ def decorator(cls): return cls return decorator +def _lazyframe_column_names(lf: pl.LazyFrame) -> list[str]: + """Return column names for a LazyFrame across Polars versions.""" + try: + return lf.collect_schema().names() + except AttributeError: + return lf.columns + + +def _first_existing(columns: list[str] | set[str], candidates: list[str]) -> str | None: + """Return the first candidate column name present in `columns`.""" + column_set = set(columns) + for candidate in candidates: + if candidate in column_set: + return candidate + return None + + +def _build_boundary_index(boundaries: pd.DataFrame) -> pd.Index: + """Return the canonical string index used for cell/nucleus boundaries.""" + std = StandardBoundaryFields() + boundary_suffix = boundaries[std.boundary_type].map({ + std.nucleus_value: "0", + std.cell_value: "1", + }) + if boundary_suffix.isnull().any(): + unknown_values = sorted( + { + str(value) + for value in boundaries.loc[boundary_suffix.isnull(), std.boundary_type].unique() + } + ) + raise ValueError( + "Unsupported boundary_type values while building boundary index: " + + ", ".join(unknown_values) + ) + boundary_ids = boundaries[std.id].copy() + missing_ids = boundary_ids.isnull() + boundary_ids = boundary_ids.astype(str) + if missing_ids.any(): + fallback = pd.Series(boundaries.index, index=boundaries.index).astype(str) + boundary_ids.loc[missing_ids] = "missing_" + fallback.loc[missing_ids] + + boundary_index = boundary_ids + "_" + boundary_suffix + duplicate_counts = boundary_index.groupby(boundary_index).cumcount() + boundary_index = boundary_index.where( + duplicate_counts.eq(0), + boundary_index + "_dup" + duplicate_counts.astype(str), + ) + return pd.Index(boundary_index, dtype="object") + + +def _empty_boundaries() -> gpd.GeoDataFrame: + """Return an empty boundary frame with the canonical schema.""" + std = StandardBoundaryFields() + empty = gpd.GeoDataFrame( + { + std.id: pd.Series(dtype="object"), + std.boundary_type: pd.Series(dtype="object"), + }, + geometry=gpd.GeoSeries([], dtype="geometry"), + ) + return empty.set_index(std.id) + + +def _clean_assignment_expr(column_name: str) -> pl.Expr: + """Normalize assignment values and map null-like tokens to null.""" + cleaned = pl.col(column_name).cast(pl.String, strict=False).str.strip_chars() + lowered = cleaned.str.to_lowercase() + return ( + pl.when( + cleaned.is_null() + | cleaned.eq("").fill_null(False) + | cleaned.eq("-1").fill_null(False) + | cleaned.eq("-1.0").fill_null(False) + | lowered.is_in( + ["none", "nan", "null", "na", "n/a", "unassigned", "unknown"] + ).fill_null(False) + ) + .then(None) + .otherwise(cleaned) + ) + + + class ISTPreprocessor(ABC): """ Abstract base class for platform-specific preprocessing of spatial @@ -534,9 +619,355 @@ class MerscopePreprocessor(ISTPreprocessor): """ Preprocessor for Vizgen MERSCOPE datasets. """ + + @staticmethod + def _cell_assignment_candidates(raw: MerscopeTranscriptFields) -> list[str]: + return [ + raw.cell_boundary_id, + raw.cell_id, + "cell", + "cell.id", + "EntityID", + "entity_id", + ] + + @staticmethod + def _nucleus_assignment_candidates(raw: MerscopeTranscriptFields) -> list[str]: + return [ + raw.nucleus_boundary_id, + "nucleus_id", + "nucleus.id", + "NucleusID", + ] + + @staticmethod + def _resolve_assignment_columns(columns: list[str] | set[str]) -> tuple[str | None, str | None]: + raw = MerscopeTranscriptFields() + cell_col = _first_existing(columns, MerscopePreprocessor._cell_assignment_candidates(raw)) + nucleus_col = _first_existing(columns, MerscopePreprocessor._nucleus_assignment_candidates(raw)) + return cell_col, nucleus_col + @staticmethod def _validate_directory(data_dir: Path): - raise NotImplementedError() + raw_tx = MerscopeTranscriptFields() + raw_bd = MerscopeBoundaryFields() + + tx_path = MerscopePreprocessor._resolve_transcripts_path(data_dir) + tx_lf = MerscopePreprocessor._scan_transcripts_file(tx_path) + tx_columns = _lazyframe_column_names(tx_lf) + + # Keep auto-inference strict: only match MERSCOPE when native markers exist. + has_native_file = len(list(data_dir.glob(raw_tx.filename))) == 1 + has_native_schema = {raw_tx.x, raw_tx.y, raw_tx.feature}.issubset(set(tx_columns)) + if not (has_native_file or has_native_schema): + raise IOError( + "Directory does not look like a MERSCOPE output layout " + "(missing native MERSCOPE transcript markers)." + ) + + x_col = _first_existing(tx_columns, [raw_tx.x, "x", "x_location"]) + y_col = _first_existing(tx_columns, [raw_tx.y, "y", "y_location"]) + feature_col = _first_existing(tx_columns, [raw_tx.feature, "feature_name", "target"]) + if x_col is None or y_col is None or feature_col is None: + missing_core = [] + if x_col is None: + missing_core.append("x") + if y_col is None: + missing_core.append("y") + if feature_col is None: + missing_core.append("feature") + raise IOError( + f"MERSCOPE transcripts file '{tx_path.name}' does not look like " + "a minimum-usable schema. Missing required core columns: " + f"{missing_core}." + ) + + cell_boundary_matches = list(data_dir.glob(raw_bd.cell_filename)) + nucleus_boundary_matches = list(data_dir.glob(raw_bd.nucleus_filename)) + if len(cell_boundary_matches) > 1 or len(nucleus_boundary_matches) > 1: + raise IOError( + "MERSCOPE sample directory must contain at most one boundary file " + "for each of cell_boundaries.parquet and nucleus_boundaries.parquet." + ) + + if len(cell_boundary_matches) == 0 and len(nucleus_boundary_matches) == 0: + cell_assignment_col, nucleus_assignment_col = MerscopePreprocessor._resolve_assignment_columns( + tx_columns + ) + if cell_assignment_col is None and nucleus_assignment_col is None: + assignment_candidates = sorted( + { + *MerscopePreprocessor._cell_assignment_candidates(raw_tx), + *MerscopePreprocessor._nucleus_assignment_candidates(raw_tx), + } + ) + raise IOError( + "MERSCOPE input requires either boundary parquet files " + "(cell_boundaries.parquet / nucleus_boundaries.parquet) " + "or transcript assignment columns " + f"{assignment_candidates}." + ) + available_assignment_cols = [ + c for c in (cell_assignment_col, nucleus_assignment_col) if c is not None + ] + has_any_assignment = ( + tx_lf + .with_columns([ + _clean_assignment_expr(c).alias(f"__clean_{i}") + for i, c in enumerate(available_assignment_cols) + ]) + .filter( + pl.any_horizontal( + [ + pl.col(f"__clean_{i}").is_not_null() + for i in range(len(available_assignment_cols)) + ] + ) + ) + .limit(1) + .collect() + .height + > 0 + ) + if not has_any_assignment: + raise IOError( + "MERSCOPE transcripts contain assignment columns but no assigned " + "cell/nucleus values were found." + ) + + @staticmethod + def _resolve_transcripts_path(data_dir: Path) -> Path: + raw_tx = MerscopeTranscriptFields() + matches_by_pattern: dict[str, list[Path]] = {} + for pattern in (raw_tx.filename, raw_tx.fallback_filename): + matches_by_pattern[pattern] = sorted(data_dir.glob(pattern)) + + for pattern, matches in matches_by_pattern.items(): + if len(matches) > 1: + raise IOError( + f"MERSCOPE sample directory must contain at most one file " + f"matching '{pattern}', but found {len(matches)}." + ) + + primary = matches_by_pattern[raw_tx.filename] + fallback = matches_by_pattern[raw_tx.fallback_filename] + if len(primary) == 1: + return primary[0] + if len(fallback) == 1: + return fallback[0] + raise IOError( + "MERSCOPE sample directory must contain either " + f"'{raw_tx.filename}' or '{raw_tx.fallback_filename}'." + ) + + @staticmethod + def _scan_transcripts_file(path: Path) -> pl.LazyFrame: + if path.suffix.lower() == ".csv": + with path.open("r", encoding="utf-8", newline="") as handle: + header = next(csv.reader(handle), []) + + # Some MERSCOPE CSVs duplicate coordinate columns (e.g. `x`), which + # Polars rejects. Rename only the duplicates and preserve first copies. + seen: dict[str, int] = {} + normalized: list[str] = [] + for idx, name in enumerate(header): + base = str(name) + if base == "": + base = f"unnamed_{idx}" + dup_idx = seen.get(base, 0) + seen[base] = dup_idx + 1 + normalized.append(base if dup_idx == 0 else f"{base}__dup{dup_idx}") + + has_duplicate_names = len(set(header)) != len(header) + has_blank_names = any(str(name) == "" for name in header) + if has_duplicate_names or has_blank_names: + warnings.warn( + f"MERSCOPE transcript CSV '{path.name}' has duplicate/blank column names; " + "auto-renaming duplicate headers for robust parsing.", + RuntimeWarning, + stacklevel=2, + ) + return pl.scan_csv(path, has_header=True, new_columns=normalized) + return pl.scan_csv(path) + if path.suffix.lower() == ".parquet": + return pl.scan_parquet(path, parallel="row_groups") + raise ValueError(f"Unsupported MERSCOPE transcript file format: {path}") + + @staticmethod + def _clean_id_expr(column_name: str) -> pl.Expr: + return _clean_assignment_expr(column_name) + + @cached_property + def transcripts(self) -> pl.DataFrame: + raw = MerscopeTranscriptFields() + std = StandardTranscriptFields() + + source_path = self._resolve_transcripts_path(self.data_dir) + lf = self._scan_transcripts_file(source_path) + columns = _lazyframe_column_names(lf) + + x_col = _first_existing(columns, [raw.x, "x", "x_location"]) + y_col = _first_existing(columns, [raw.y, "y", "y_location"]) + feature_col = _first_existing(columns, [raw.feature, "feature_name", "target"]) + + if x_col is None or y_col is None or feature_col is None: + raise ValueError( + "MERSCOPE transcripts missing required columns. " + f"Need x/y/feature; available columns: {columns}" + ) + + cell_assignment_col, nucleus_assignment_col = self._resolve_assignment_columns(columns) + + if cell_assignment_col is not None: + cell_id_expr = self._clean_id_expr(cell_assignment_col) + cell_present_expr = cell_id_expr.is_not_null() + elif nucleus_assignment_col is not None: + cell_id_expr = self._clean_id_expr(nucleus_assignment_col) + cell_present_expr = cell_id_expr.is_not_null() + else: + assignment_candidates = sorted( + { + *self._cell_assignment_candidates(raw), + *self._nucleus_assignment_candidates(raw), + } + ) + raise ValueError( + "MERSCOPE transcripts missing any cell assignment column " + f"{assignment_candidates}." + ) + + nucleus_present_expr = ( + self._clean_id_expr(nucleus_assignment_col).is_not_null() + if nucleus_assignment_col is not None + else pl.lit(False) + ) + compartment_expr = ( + pl.when(nucleus_present_expr) + .then(std.nucleus_value) + .when(cell_present_expr) + .then(std.cytoplasmic_value) + .otherwise(std.extracellular_value) + .alias(std.compartment) + ) + + lf = lf.filter( + pl.col(feature_col).str.contains("|".join(raw.filter_substrings)).not_() + ) + + select_exprs: list[pl.Expr] = [ + pl.col(std.row_index), + pl.col(x_col).alias(std.x), + pl.col(y_col).alias(std.y), + pl.col(feature_col).alias(std.feature), + cell_id_expr.alias(std.cell_id), + compartment_expr, + ] + + lf = lf.with_row_index(name=std.row_index) + return lf.select(select_exprs).collect() + + @staticmethod + def _empty_boundaries() -> gpd.GeoDataFrame: + return _empty_boundaries() + + @staticmethod + def _load_boundary_file(path: Path, boundary_type: str) -> gpd.GeoDataFrame: + raw = MerscopeBoundaryFields() + std = StandardBoundaryFields() + + try: + gdf = gpd.read_parquet(path) + except Exception: + gdf = None + + if gdf is not None and hasattr(gdf, "geometry"): + tmp = gdf.copy() + if std.id not in tmp.columns: + if raw.id in tmp.columns: + tmp = tmp.rename(columns={raw.id: std.id}) + else: + tmp = tmp.reset_index() + idx_col = tmp.columns[0] + tmp = tmp.rename(columns={idx_col: std.id}) + tmp[std.id] = tmp[std.id].astype(str) + tmp = tmp.dropna(subset=[std.id]).drop_duplicates(subset=[std.id], keep="first") + tmp = fix_invalid_geometry(tmp) + tmp[std.boundary_type] = boundary_type + return tmp.set_index(std.id) + + bd = pl.read_parquet(path, parallel="row_groups") + id_col = _first_existing(bd.columns, [raw.id, std.id, "EntityID", "cell_id", "id"]) + x_col = _first_existing(bd.columns, ["vertex_x", "x", "global_x"]) + y_col = _first_existing(bd.columns, ["vertex_y", "y", "global_y"]) + if id_col is None or x_col is None or y_col is None: + raise ValueError( + f"Could not parse MERSCOPE boundary file '{path}'. " + f"Expected geometry column or contour columns with id/x/y." + ) + + tmp = contours_to_polygons( + x=bd[x_col].to_numpy(), + y=bd[y_col].to_numpy(), + ids=bd[id_col].to_numpy(), + ) + tmp = fix_invalid_geometry(tmp) + tmp = tmp.reset_index(drop=False, names=std.id) + tmp[std.id] = tmp[std.id].astype(str) + tmp[std.boundary_type] = boundary_type + return tmp.set_index(std.id) + + + @cached_property + def boundaries(self) -> gpd.GeoDataFrame: + raw_bd = MerscopeBoundaryFields() + std = StandardBoundaryFields() + + cell_boundary_matches = sorted(self.data_dir.glob(raw_bd.cell_filename)) + nucleus_boundary_matches = sorted(self.data_dir.glob(raw_bd.nucleus_filename)) + tx_path = self._resolve_transcripts_path(self.data_dir) + tx_columns = _lazyframe_column_names(self._scan_transcripts_file(tx_path)) + cell_assignment_col, nucleus_assignment_col = self._resolve_assignment_columns(tx_columns) + + cells = ( + self._load_boundary_file(cell_boundary_matches[0], std.cell_value) + if len(cell_boundary_matches) == 1 + else self._empty_boundaries() + ) + nuclei = ( + self._load_boundary_file(nucleus_boundary_matches[0], std.nucleus_value) + if len(nucleus_boundary_matches) == 1 + else self._empty_boundaries() + ) + + # Fall back to mirrored boundaries when one type is unavailable. + if len(cells) == 0 and len(nuclei) > 0: + cells = nuclei.copy() + cells[std.boundary_type] = std.cell_value + if len(nuclei) == 0 and len(cells) > 0: + nuclei = cells.copy() + nuclei[std.boundary_type] = std.nucleus_value + if len(cells) == 0 and len(nuclei) == 0: + raise ValueError("Could not construct any MERSCOPE boundaries.") + + cell_ids = pd.Index(cells.index.astype(str)) + nucleus_ids = pd.Index(nuclei.index.astype(str)) + shared_ids = cell_ids.intersection(nucleus_ids) + + cells[std.contains_nucleus] = False + if len(shared_ids) > 0: + cells.loc[shared_ids, std.contains_nucleus] = True + nuclei[std.contains_nucleus] = True + + bd = pd.concat( + [ + cells.reset_index(drop=False, names=std.id), + nuclei.reset_index(drop=False, names=std.id), + ], + ignore_index=True, + ) + bd[std.id] = bd[std.id].astype(str) + bd.index = _build_boundary_index(bd) + return bd def _infer_platform(data_dir: Path) -> str: