Skip to content
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 143 additions & 8 deletions src/dysh/fits/gbtfitsload.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from ..util.selection import Flag, Selection # noqa: F811
from ..util.weatherforecast import GBTWeatherForecast
from . import conf, core
from .index_file import create_index_metadata, get_index_path, write_index
from .sdfitsload import FITSBackend, SDFITSLoad, _log_mem

try:
Expand Down Expand Up @@ -3849,6 +3850,7 @@ def write(
fileobj,
multifile=True,
flags=True,
write_index=False,
verbose=False,
output_verify="exception",
overwrite=False,
Expand All @@ -3871,6 +3873,9 @@ def write(
Otherwise, write to a single SDFITS file.
flags: bool, optional
If True, write the applied flags to a `FLAGS` column in the binary table.
write_index: bool, optional
If True, write a sparrow3/GBTIDL-compatible .index file alongside each output FITS file.
When multifile=True and multiple files are written, also writes a parent directory index.
verbose: bool, optional
If True, print out some information about number of rows written per file
output_verify : str
Expand All @@ -3892,11 +3897,121 @@ def write(
e.g., `ifnum=1, plnum=[2,3]` etc.
"""
if HAS_FITSIO:
self._write_fitsio(fileobj, multifile, flags, verbose, overwrite, **kwargs)
self._write_fitsio(fileobj, multifile, flags, write_index, verbose, overwrite, **kwargs)
else:
self._write_astropy(fileobj, multifile, flags, verbose, output_verify, overwrite, checksum, **kwargs)
self._write_astropy(
fileobj, multifile, flags, write_index, verbose, output_verify, overwrite, checksum, **kwargs
)

def _build_index_metadata(self, df):
"""Extract observer/backend from index DataFrame and create IndexMetadata."""
observer = "Unknown"
if "OBSERVER" in df.columns:
obs_vals = df["OBSERVER"].dropna().unique()
if len(obs_vals) > 0:
observer = str(obs_vals[0])
backend = "Unknown"
if "BACKEND" in df.columns:
be_vals = df["BACKEND"].dropna().unique()
if len(be_vals) > 0:
backend = str(be_vals[0])
return create_index_metadata(observer=observer, backend=backend)

def _build_and_write_index(self, outfile, per_file_df, metadata, overwrite):
"""Build and write a .index file for a single output FITS file.

Parameters
----------
outfile : str or Path
Path to the output FITS file
per_file_df : pd.DataFrame
Index rows for this file (already filtered by FITSINDEX if applicable)
metadata : IndexMetadata
Pre-constructed metadata
overwrite : bool
Whether to overwrite existing index files

Returns
-------
pd.DataFrame
The prepared index DataFrame (for use in parent index aggregation)
"""
outfile = Path(outfile)
index_path = get_index_path(outfile)
if index_path.exists() and not overwrite:
raise OSError(f"Index file already exists: {index_path}")

# Renumber ROW and HDU/BINTABLE for the output file
index_rows = []
bintables = sorted(per_file_df.BINTABLE.unique())
new_extension = 1
for b in bintables:
bt_rows = per_file_df[per_file_df.BINTABLE == b].sort_values("ROW")
for new_row_idx, (_, row) in enumerate(bt_rows.iterrows()):
row_copy = row.copy()
row_copy["ROW"] = new_row_idx
row_copy["HDU"] = new_extension
row_copy["BINTABLE"] = new_extension - 1
index_rows.append(row_copy)
new_extension += 1

index_df = pd.DataFrame(index_rows).reset_index(drop=True)
index_df["INDEX"] = range(len(index_df))
index_df["FILE"] = outfile.name

# Remove internal columns not part of the standard index format
for col in ["FITSINDEX", "CHAN"]:
if col in index_df.columns:
index_df = index_df.drop(columns=col)

write_index(index_path, metadata, index_df)
return index_df

def _build_and_write_parent_index(self, fileobj, per_file_dfs, metadata, overwrite):
"""Write a parent directory index aggregating all per-file indices.

def _write_fitsio(self, fileobj, multifile, flags, verbose, overwrite, **kwargs):
Parameters
----------
fileobj : str or Path
The base output file path (e.g., 'output.fits'); used to derive parent index name
per_file_dfs : list of pd.DataFrame
List of prepared per-file index DataFrames (from _build_and_write_index)
metadata : IndexMetadata
Pre-constructed metadata
overwrite : bool
Whether to overwrite existing index files
"""
fileobj = Path(fileobj)
parent_index_path = get_index_path(fileobj)
if parent_index_path.exists() and not overwrite:
raise OSError(f"Parent index file already exists: {parent_index_path}")

combined_df = pd.concat(per_file_dfs, ignore_index=True)
combined_df["INDEX"] = range(len(combined_df))
write_index(parent_index_path, metadata, combined_df)

@staticmethod
def _multifile_name(fileobj, count):
"""Generate output filename for multi-file writes using alphabetic suffixes.

Parameters
----------
fileobj : str or Path
Base output file path
count : int
File index (0-based, maps to A, B, C, ...)

Returns
-------
Path
Output path with alphabetic suffix (e.g., 'output.A.fits')
"""
if count > 25:
raise ValueError("Cannot write more than 26 multi-file outputs with alphabetic suffixes")
p = Path(fileobj)
return p.parent / (p.stem + "." + chr(ord("A") + count) + p.suffix)

def _write_fitsio(self, fileobj, multifile, flags, write_index_file, verbose, overwrite, **kwargs):
"""Write using the fitsio chunked path (memory-efficient for large files)."""
chunk_size = kwargs.pop("chunk_size", 5000)
logger.debug(kwargs)
Expand All @@ -3912,8 +4027,11 @@ def _write_fitsio(self, fileobj, multifile, flags, verbose, overwrite, **kwargs)
fi = _final["FITSINDEX"].unique()
logger.debug(f"fitsindex {fi} ")
total_rows_written = 0
if write_index_file:
metadata = self._build_index_metadata(_final)
if multifile:
count = 0
all_index_dfs = []
for k in fi:
# Build bintable groups for this FITSINDEX
df = select_from("FITSINDEX", k, _final)
Expand All @@ -3927,8 +4045,7 @@ def _write_fitsio(self, fileobj, multifile, flags, verbose, overwrite, **kwargs)
continue

if len(fi) > 1:
p = Path(fileobj)
outfile = p.parent / (p.stem + str(count) + p.suffix)
outfile = self._multifile_name(fileobj, count)
count += 1
else:
outfile = fileobj
Expand All @@ -3939,6 +4056,11 @@ def _write_fitsio(self, fileobj, multifile, flags, verbose, overwrite, **kwargs)
total_rows_written += rows_written
if verbose:
logger.info(f"Writing {rows_written} rows to {outfile}.")
if write_index_file:
prepared_df = self._build_and_write_index(outfile, df, metadata, overwrite)
all_index_dfs.append(prepared_df)
if write_index_file and len(fi) > 1:
self._build_and_write_parent_index(fileobj, all_index_dfs, metadata, overwrite)
if verbose:
logger.info(f"Total of {total_rows_written} rows written to files.")
else:
Expand All @@ -3963,8 +4085,12 @@ def _write_fitsio(self, fileobj, multifile, flags, verbose, overwrite, **kwargs)
)
if verbose:
logger.info(f"Writing {total_rows_written} to {fileobj}")
if write_index_file:
self._build_and_write_index(fileobj, _final, metadata, overwrite)

def _write_astropy(self, fileobj, multifile, flags, verbose, output_verify, overwrite, checksum, **kwargs):
def _write_astropy(
self, fileobj, multifile, flags, write_index_file, verbose, output_verify, overwrite, checksum, **kwargs
):
"""Write using the astropy path (fallback when fitsio is unavailable)."""
logger.debug(kwargs)
selection = Selection(self._index)
Expand All @@ -3979,8 +4105,11 @@ def _write_astropy(self, fileobj, multifile, flags, verbose, output_verify, over
fi = _final["FITSINDEX"].unique()
logger.debug(f"fitsindex {fi} ")
total_rows_written = 0
if write_index_file:
metadata = self._build_index_metadata(_final)
if multifile:
count = 0
all_index_dfs = []
for k in fi:
this_rows_written = 0
hdu = self._sdf[k]._hdu[0].copy()
Expand All @@ -4004,8 +4133,7 @@ def _write_astropy(self, fileobj, multifile, flags, verbose, output_verify, over
total_rows_written += lr
this_rows_written += lr
if len(fi) > 1:
p = Path(fileobj)
outfile = p.parent / (p.stem + str(count) + p.suffix)
outfile = self._multifile_name(fileobj, count)
count += 1
else:
outfile = fileobj
Expand All @@ -4016,6 +4144,11 @@ def _write_astropy(self, fileobj, multifile, flags, verbose, output_verify, over
if verbose:
logger.info(f"Writing {this_rows_written} rows to {outfile}.")
outhdu.writeto(outfile, output_verify=output_verify, overwrite=overwrite, checksum=checksum)
if write_index_file:
prepared_df = self._build_and_write_index(outfile, df, metadata, overwrite)
all_index_dfs.append(prepared_df)
if write_index_file and len(fi) > 1:
self._build_and_write_parent_index(fileobj, all_index_dfs, metadata, overwrite)
if verbose:
logger.info(f"Total of {total_rows_written} rows written to files.")
else:
Expand Down Expand Up @@ -4048,6 +4181,8 @@ def _write_astropy(self, fileobj, multifile, flags, verbose, output_verify, over
if verbose:
logger.info(f"Writing {total_rows_written} to {fileobj}")
outhdu.writeto(fileobj, output_verify=output_verify, overwrite=overwrite, checksum=checksum)
if write_index_file:
self._build_and_write_index(fileobj, _final, metadata, overwrite)

def _update_radesys(self):
"""
Expand Down
Loading
Loading