diff --git a/sotodlib/mapmaking/ml_mapmaker.py b/sotodlib/mapmaking/ml_mapmaker.py index 5c4cc557d..13e39b22e 100644 --- a/sotodlib/mapmaking/ml_mapmaker.py +++ b/sotodlib/mapmaking/ml_mapmaker.py @@ -650,6 +650,7 @@ def add_obs(self, id, obs, nmat, Nd): """ # First scan our mask to find which samples need this # treatment + ctime = obs.timestamps if self.recenter: rec = smutils.evaluate_recentering(self.recenter, ctime=ctime[len(ctime) // 2], geom=(self.mask.shape, self.mask.wcs), site=smutils.unarr(obs.site)) diff --git a/sotodlib/mapmaking/noise_model.py b/sotodlib/mapmaking/noise_model.py index 73165602d..b1334954b 100644 --- a/sotodlib/mapmaking/noise_model.py +++ b/sotodlib/mapmaking/noise_model.py @@ -111,6 +111,45 @@ def write(self, fname): def from_bunch(data): return NmatUncorr(spacing=data.spacing, nbin=data.nbin, nmin=data.nmin, bins=data.bins, ips_binned=data.ips_binned, ivar=data.ivar, window=data.window, nwin=data.nwin) +class NmatDebug(Nmat): + def __init__(self, ivar=None, alpha=-3, fknee=2.5, profile=None, bsize=256, downweight=1): + self.bsize = bsize + self.ivar = ivar + self.alpha = alpha + self.fknee = fknee + self.profile = profile + self.downweight = downweight + self.nwin = 0 + self.ready = ivar is not None + def build(self, tod, srate, **kwargs): + nsamp = tod.shape[1] + nblock = nsamp//self.bsize + var = np.median(np.var(tod[:,:nblock*self.bsize].reshape(-1,nblock,self.bsize),-1),-1) + with utils.nowarn(): + ivar = utils.without_nan(1/var)*self.downweight + f = np.fft.rfftfreq(nsamp, 1/srate) + with utils.nowarn(): + profile = 1/(1+(f/self.fknee)**self.alpha) + profile /= nsamp # fft normalization + return NmatDebug(ivar=ivar, alpha=self.alpha, fknee=self.fknee, bsize=self.bsize, profile=profile, downweight=self.downweight) + def apply(self, tod, inplace=True): + tod = self.white(tod, inplace=inplace) + ft = np.empty((tod.shape[0],tod.shape[1]//2+1),utils.complex_dtype(tod.dtype)) + fft.rfft(tod, ft) + ft *= self.profile + fft.irfft(ft, tod) + return tod + def white(self, tod, inplace=True, nwin=None): + if not inplace: tod = tod.copy() + tod *= self.ivar[:,None] + return tod + def write(self, fname): + self.check_ready() + data = bunch.Bunch(type="NmatDebug") + for field in ["ivar", "alpha", "fknee", "profile", "bsize", "downweight"]: + data[field] = getattr(self, field) + bunch.write(fname, data) + class NmatDetvecs(Nmat): def __init__(self, bin_edges=None, eig_lim=16, single_lim=0.55, mode_bins=[0.25,4.0,20], downweight=[], window=2, nwin=None, verbose=False, bins=None, @@ -174,7 +213,7 @@ def build_fourier(self, ftod, nsamp, srate, nwin=0, **kwargs): b = np.maximum(1,b) E[bi], D[bi], Nd[bi] = measure_detvecs(ftod[:,b[0]:b[1]], vecs) # Optionally downweight the lowest frequency bins - if self.downweight != None and len(self.downweight) > 0: + if self.downweight is not None and len(self.downweight) > 0: D[:len(self.downweight)] /= np.array(self.downweight)[:,None] # Instead of VEV' we can have just VV' if we bake sqrt(E) into V V = vecs[None]*E[:,None]**0.5 @@ -187,11 +226,10 @@ def build_fourier(self, ftod, nsamp, srate, nwin=0, **kwargs): # Also compute a representative white noise level bsize = bins[:,1]-bins[:,0] ivar = np.sum(iD*bsize[:,None],0)/np.sum(bsize) - # What about units? I haven't applied any fourier unit factors so far, - # so we're in plain power units. From the uncorrelated model I found - # that factor of tod.shape[1] is needed - iD *= nsamp - iV *= nsamp**0.5 + # I originally normalized iD and iV here, but then undid that normalization + # when actually applying the model. Let's just skip that + #iD *= nsamp + #iV *= nsamp**0.5 ivar *= nsamp # Fix dtype bins = np.ascontiguousarray(bins.astype(np.int32)) @@ -221,11 +259,12 @@ def apply_fourier(self, ftod, nsamp, slow=False): for bi, b in enumerate(self.bins): # Want to multiply by iD + siViV' ft = ftod[:,b[0]:b[1]] - iD = self.iD[bi]/nsamp - iV = self.iV[bi]/nsamp**0.5 + iD = self.iD[bi] #/nsamp + iV = self.iV[bi] #/nsamp**0.5 ft[:] = iD[:,None]*ft + self.s*iV.dot(iV.T.dot(ft)) else: - so3g.nmat_detvecs_apply(ftod.view(dtype), self.bins, self.iD, self.iV, float(self.s), float(nsamp)) + # using normalization 1 instead of nsamp, since I didn't multiply by nsamp earlier + so3g.nmat_detvecs_apply(ftod.view(dtype), self.bins, self.iD, self.iV, float(self.s), float(1)) # I divided by the normalization above instead of passing normalize=True # here to reduce the number of operations needed @@ -441,7 +480,7 @@ def build_fourier(self, ftod, nsamp, srate, **kwargs): b = np.maximum(1,b) E[bi], D[bi], Nd[bi] = measure_detvecs(ftod[:,b[0]:b[1]], vecs) # Optionally downweight the lowest frequency bins - if self.downweight != None and len(self.downweight) > 0: + if self.downweight is not None and len(self.downweight) > 0: D[:len(self.downweight)] /= np.array(self.downweight)[:,None] # Instead of VEV' we can have just VV' if we bake sqrt(E) into V V = vecs[None]*E[:,None]**0.5 @@ -454,11 +493,10 @@ def build_fourier(self, ftod, nsamp, srate, **kwargs): # Also compute a representative white noise level bsize = bins[:,1]-bins[:,0] ivar = np.sum(iD*bsize[:,None],0)/np.sum(bsize) - # What about units? I haven't applied any fourier unit factors so far, - # so we're in plain power units. From the uncorrelated model I found - # that factor of tod.shape[1] is needed - iD *= nsamp - iV *= nsamp**0.5 + # I originally normalized iD and iV here, but then undid that normalization + # when actually applying the model. Let's just skip that + #iD *= nsamp + #iV *= nsamp**0.5 ivar *= nsamp # Fix dtype bins = np.ascontiguousarray(bins.astype(np.int32)) @@ -486,11 +524,12 @@ def apply_fourier(self, ftod, nsamp, slow=False): for bi, b in enumerate(self.bins): # Want to multiply by iD + siViV' ft = ftod[:,b[0]:b[1]] - iD = self.iD[bi]/nsamp - iV = self.iV[bi]/nsamp**0.5 + iD = self.iD[bi] #/nsamp + iV = self.iV[bi] #/nsamp**0.5 ft[:] = iD[:,None]*ft + self.s*iV.dot(iV.T.dot(ft)) else: - so3g.nmat_detvecs_apply(ftod.view(dtype), self.bins, self.iD, self.iV, float(self.s), float(nsamp), dct_binning=True) + # using normalization 1 instead of nsamp, since I didn't multiply by nsamp earlier + so3g.nmat_detvecs_apply(ftod.view(dtype), self.bins, self.iD, self.iV, float(self.s), float(1), dct_binning=True) # I divided by the normalization above instead of passing normalize=True # here to reduce the number of operations needed diff --git a/sotodlib/mapmaking/utils.py b/sotodlib/mapmaking/utils.py index 4cdc9e6cc..36e44b353 100644 --- a/sotodlib/mapmaking/utils.py +++ b/sotodlib/mapmaking/utils.py @@ -1,3 +1,4 @@ +import sys import logging import os from typing import Optional, Any, Union @@ -155,10 +156,13 @@ def measure_cov(d, nmax=10000): def project_out(d, modes): return d-modes.T.dot(modes.dot(d)) def project_out_from_matrix(A, V): - # Used Woodbury to project out the given vectors from the covmat A + """This is the equivalent of deprojecting V in a timestream, + and then measuring the covariance of the resulting timestream. + It will therefore always be positive definite.""" if V.size == 0: return A - Q = A.dot(V) - return A - Q.dot(np.linalg.solve(np.conj(V.T).dot(Q), np.conj(Q.T))) + AV = A.dot(V) + # q = a-VV'a, Q = = A + VV'AV'V - AVV' - VV'A + return A + V.dot(V.T.dot(AV)).dot(V.T) - AV.dot(V.T) - V.dot(AV.T) def measure_power(d): return np.real(np.mean(d*np.conj(d),-1)) @@ -322,6 +326,18 @@ def get_subids_file(fname, context=None): sub_ids = expand_ids(sub_ids, context=context) return sub_ids +def get_obsinfo_subids(subids, context): + """Given a list of subids, return a ResultSet with one entry + from the obsdb for each subid. Ideally one would have this + from the original query that defined the subids, but since + subids are often read in directly from a file, we can't rely + on that. This function is a bit inefficient, since it starts + from the full obsdb and then looks up the subids from it""" + obsids = split_subids(subids)[0] + ids_str = ",".join(f"'{subid}'" for subid in obsids) + rs = context.obsdb.query("obs_id IN (%s)" % ids_str) + return rs + def expand_ids(obs_ids, context=None, bands=None): """Given a list of ids that are either obs_ids or sub_ids, expand any obs_ids into sub_ids and return the resulting list. @@ -380,7 +396,7 @@ def expand_ids(obs_ids, context=None, bands=None): sub_ids.append("%s:ws%d:%s" % (obs_id, si, band)) return sub_ids -def filter_subids(subids, wafers=None, bands=None): +def filter_subids(subids, wafers=None, bands=None, ots=None): subids = np.asarray(subids) if wafers is not None: wafs = astr_tok(subids,":",1) @@ -388,6 +404,11 @@ def filter_subids(subids, wafers=None, bands=None): if bands is not None: bpass = astr_tok(subids,":",2) subids = subids[np.isin(bpass, bands)] + if ots is not None: + # Somewhat hacky implementation + obs_ids = astr_tok(subids, ":",0) + has_ot = np.prod([np.char.find(obs_ids, ot) for ot in ots], 0) + subids = subids[has_ot >= 0] return subids def astr_cat(*arrs): @@ -484,10 +505,16 @@ def get_pos(name, ctime, sys=None): elif name == "auto": return np.array([0,0]) # would use geom here else: - obj = getattr(ephem, name)() - djd = ctime/86400 + 40587.0 + 2400000.5 - 2415020 - obj.compute(djd) - return np.array([obj.a_ra, obj.a_dec]) + try: + planet = coords.planets.SlowSource.for_named_source( + name, ctime) + ra0, dec0 = planet.pos(np.mean(ctime)) + except: + obj = getattr(ephem, name)() + djd = ctime/86400 + 40587.0 + 2400000.5 - 2415020 + obj.compute(djd) + ra0, dec0 = obj.a_ra, obj.a_dec + return np.array([ra0, dec0]) else: return to_cel(name, sys, ctime, site, weather) p1 = get_pos(info["from"], ctime, info["from_sys"]) @@ -569,8 +596,11 @@ def rangemat_sum(rangemat): res[i] = np.sum(ra[:,1]-ra[:,0]) return res -def find_usable_detectors(obs, maxcut=0.1, glitch_flags: str = "flags.glitch_flags"): - ncut = rangemat_sum(obs[glitch_flags]) +def find_usable_detectors(obs, maxcut=0.1, glitch_flags: str = "flags.glitch_flags", to_null : str = "flags.expected_flags"): + flag = obs[glitch_flags] + if to_null and to_null in obs: + flag = flag*~obs[to_null] + ncut = rangemat_sum(flag) good = ncut < obs.samps.count * maxcut return obs.dets.vals[good] @@ -750,6 +780,35 @@ def setup_passes(downsample="1", maxiter="500", interpol="nearest", npass=None): passes.append(entry) return passes +def distribute_tods_ra(obsinfo, nsplit, site="so", weather="typical"): + """For each row in obsinfo, assign it to one of nsplit categories + such that each category is as compact in RA as possible, and each + category has as similar size as possible. Assumes that all obs + are at the same site.""" + if len(obsinfo) <= nsplit: + return np.arange(nsplit)[:len(obsinfo)] + else: + # Get a reprsentative RA per entry + az = obsinfo["az_center"] * putils.degree + el = obsinfo["el_center"] * putils.degree + ctime = (obsinfo["start_time"] + obsinfo["stop_time"])/2 + ra = so3g.proj.CelestialSightLine.az_el(ctime, az, el, site=site, weather=weather).coords()[:,0] + # Put everything on the same copy of the sky + ra = putils.rewind_compact(ra) + # Partition sorted ras equally by weight. Would ideally include + # ndet too, but the effective ndet depends on the cuts etc. + order = np.argsort(ra) + weight= obsinfo["n_samples"][order] + cum = putils.cumsum(weight) + wtot = cum[-1]+weight[-1] # = np.sum(weight) + owners = np.zeros(len(obsinfo),int) + owners[order] = putils.floor(cum*nsplit/wtot) + # Make sure none ended up empty. If they did, fall back to unweighted, + # which will always succeed + if np.any(np.bincount(owners, minlength=nsplit) == 0): + owners[order] = np.arange(len(obsinfo))*nsplit/len(obsinfo) + return owners + Base = declarative_base() class AtomicInfo(Base): diff --git a/sotodlib/site_pipeline/make_ml_map.py b/sotodlib/site_pipeline/make_ml_map.py index 420b37eb4..92f94e05c 100644 --- a/sotodlib/site_pipeline/make_ml_map.py +++ b/sotodlib/site_pipeline/make_ml_map.py @@ -1,3 +1,6 @@ +import numpy as np +import sys + def get_parser(parser=None): # a config file to pass all parameters is pending import argparse @@ -6,9 +9,11 @@ def get_parser(parser=None): parser.add_argument("query") parser.add_argument("area") parser.add_argument("odir") + parser.add_argument("preprocess_config", help="Preprocess configuration file") parser.add_argument("prefix", nargs="?") parser.add_argument( "--comps", type=str, default="TQU",help="List of components to solve for. T, QU or TQU, but only TQU is consistent with the actual data") parser.add_argument("-W", "--wafers", type=str, default=None, help="Detector wafer subsets to map with. ,-sep") + parser.add_argument("-O", "--ots", type=str, default=None, help="Optics tubes to map with. ,-sep") parser.add_argument("-B", "--bands", type=str, default=None, help="Bandpasses to map. ,-sep") parser.add_argument("-C", "--context", type=str, default="/mnt/so1/shared/todsims/pipe-s0001/v4/context.yaml") parser.add_argument( "--tods", type=str, default=None, help="Arbitrary slice to apply to the list of tods to analyse") @@ -19,32 +24,48 @@ def get_parser(parser=None): parser.add_argument("-v", "--verbose", action="count", default=0) parser.add_argument("-q", "--quiet", action="count", default=0) parser.add_argument("-@", "--center-at", type=str, default=None) - parser.add_argument("-w", "--window", type=float, default=0.0) + parser.add_argument("-w", "--window", type=float, default=2.0) parser.add_argument("-i", "--inject", type=str, default=None, help="Path to map to inject. Equatorial coordinates") parser.add_argument( "--nocal", action="store_true", help="Disable calibration. Useful for sims") parser.add_argument( "--nmat-dir", type=str, default="{odir}/nmats") parser.add_argument( "--nmat-mode", type=str, default="build", help="How to build the noise matrix. 'build': Always build from tod. 'cache': Use if available in nmat-dir, otherwise build and save. 'load': Load from nmat-dir, error if missing. 'save': Build from tod and save.") parser.add_argument("-d", "--downsample", type=str, default="1", help="Downsample TOD by this factor. ,-sep") + parser.add_argument("-D", "--downgrade", type=int, default=1, help="Downgrade the input area by this factor") parser.add_argument( "--maxiter", type=str, default="500",help="Max number of CG steps per pass. ,-sep") parser.add_argument( "--interpol", type=str, default="nearest", help="Pmat interpol per pass. ,-sep") parser.add_argument("-T", "--tiled" , type=int, default=1, help="0: untiled maps. Nonzero: tiled maps") parser.add_argument( "--srcsamp", type=str, default=None, help="path to mask file where True regions indicate where bright object mitigation should be applied. Mask is in equatorial coordinates. Not tiled, so should be low-res to not waste memory.") parser.add_argument( "--unit", type=str, default="uK", help="Unit of the maps") + parser.add_argument( "--iunit", type=str, default="K", help="Unit of the raw tod") + parser.add_argument( "--maxcut", type=float, default=.3, help="Maximum fraction of cut samples in a detector.") + parser.add_argument( "--no-sidelobe", action="store_true", help="Do not mask Moon/Sun sidelobes") + parser.add_argument( "--sun-mask", type=str, default="/global/cfs/cdirs/sobs/users/sigurdkn/masks/sidelobe/sun.fits", help="Location of Sun sidelobe mask") + parser.add_argument( "--moon-mask", type=str, default="/global/cfs/cdirs/sobs/users/sigurdkn/masks/sidelobe/moon.fits", help="Location of Moon sidelobe mask") + parser.add_argument("--hits", action="store_true", help="Write hits maps") + parser.add_argument("--cut-type", type=str, default="full") return parser +unit_defs = {"nK":1e-9, "uK":1e-6, "mK":1e-3, "K":1.0, "kK":1e3, "MK": 1e6, "GK":1e9} + +def exit(comm, code): + if comm is not None: # This check might need an expansion + comm.Abort(code) + else: + sys.exit(code) + def main(**args): - import numpy as np, sys, time, warnings, os, so3g - from sotodlib.core import Context, AxisManager, IndexAxis - from sotodlib.io import metadata # PerDetectorHdf5 work-around - from sotodlib import tod_ops, mapmaking, core - from sotodlib.tod_ops import filters - from sotodlib.mapmaking import log - from pixell import enmap, utils, fft, bunch, wcsutils, mpi, bench - import yaml - - #try: import moby2.analysis.socompat - #except ImportError: warnings.warn("Can't import moby2.analysis.socompat. ACT data input probably won't work") - class DataMissing(Exception): pass + import time, warnings, os, so3g + + from sotodlib import mapmaking, core + from sotodlib.coords import sidelobes + from sotodlib.preprocess import preprocess_util as pp_util + from sotodlib.site_pipeline.utils import depth1_utils as d1u + from sotodlib.site_pipeline.utils.config import _get_config + + from pixell import enmap, utils, bunch, wcsutils, mpi, bench + + LoaderError = d1u.LoaderError + DataMissing = d1u.DataMissing warnings.simplefilter('ignore') args = bunch.Bunch(**args) @@ -53,12 +74,13 @@ class DataMissing(Exception): pass comm = mpi.COMM_WORLD shape, wcs = enmap.read_map_geometry(args.area) + if args.downgrade > 1: + shape, wcs = enmap.downgrade_geometry(shape, wcs, args.downgrade) + # Reconstruct that wcs in case default fields have changed; otherwise # we risk adding information in MPI due to reconstruction, and that # can cause is_compatible failures. wcs = wcsutils.WCS(wcs.to_header()) - # Set shape to None to allow map to fit these TODs exactly. - #shape = None comps = args.comps ncomp = len(comps) @@ -75,12 +97,13 @@ class DataMissing(Exception): pass recenter = mapmaking.parse_recentering(args.center_at) with bench.mark('context'): - context = Context(args.context) + context = core.Context(args.context) + ots = args.ots.split(",") if args.ots else None wafers = args.wafers.split(",") if args.wafers else None bands = args.bands .split(",") if args.bands else None sub_ids = mapmaking.get_subids(args.query, context=context) - sub_ids = mapmaking.filter_subids(sub_ids, wafers=wafers, bands=bands) + sub_ids = mapmaking.filter_subids(sub_ids, wafers=wafers, bands=bands, ots=ots) # restrict tod selection further. E.g. --tods [0], --tods[:1], --tods[::100], --tods[[0,1,5,10]], etc. if args.tods: @@ -91,16 +114,30 @@ class DataMissing(Exception): pass if len(sub_ids) == 0: if comm.rank == 0: print("No tods found!") - sys.exit(1) + exit(comm, 1) L.info("Found %d tods" % (len(sub_ids))) + # Define our task distribution + obsinfo = mapmaking.get_obsinfo_subids(sub_ids, context) + owner = mapmaking.distribute_tods_ra(obsinfo, comm.size, site=SITE) + myinds = np.where(owner==comm.rank)[0] + if args.inject: map_to_inject = enmap.read_map(args.inject).astype(dtype_map) if args.srcsamp: srcsamp_mask = enmap.read_map(args.srcsamp) + # set up the preprocessing + try: + preproc = _get_config(args.preprocess_config) + except: + if comm.rank==0: + L.info(f"{args.preprocess_config} is not a valid config") + exit(comm, 1) + passes = mapmaking.setup_passes(downsample=args.downsample, maxiter=args.maxiter, interpol=args.interpol) + to_skip = [] for ipass, passinfo in enumerate(passes): L.info("Starting pass %d/%d maxit %d down %d interp %s" % (ipass+1, len(passes), passinfo.maxiter, passinfo.downsample, passinfo.interpol)) pass_prefix = prefix + "pass%d_" % (ipass+1) @@ -142,31 +179,34 @@ class DataMissing(Exception): pass if args.nmat == "uncorr": noise_model = mapmaking.NmatUncorr() elif args.nmat == "corr": noise_model = mapmaking.NmatDetvecs(verbose=verbose>1, window=args.window) elif args.nmat == "corr_dct": noise_model = mapmaking.NmatDetvecsDCT(verbose=verbose>1) + elif args.nmat == "debug": noise_model = mapmaking.NmatDebug() else: raise ValueError("Unrecognized noise model '%s'" % args.nmat) - signal_cut = mapmaking.SignalCut(comm, dtype=dtype_tod) + signal_cut = mapmaking.SignalCut(comm, dtype=dtype_tod, cut_type=args.cut_type) signal_map = mapmaking.SignalMap(shape, wcs, comm, comps=comps, dtype=dtype_map, recenter=recenter, tiled=args.tiled>0, interpol=args.interpol) signals = [signal_cut, signal_map] if args.srcsamp: + signal_srcsamp = mapmaking.SignalSrcsamp(comm, srcsamp_mask, recenter=recenter, dtype=dtype_tod) signal_srcsamp = mapmaking.SignalSrcsamp(comm, srcsamp_mask, dtype=dtype_tod) # This *must* come after all the other signals due to how it zeros out the # affected samples in the tod (inherited from SignalCut). Might have been better # to factorize out that zeroing into its own thing that's easier to control. signals.append(signal_srcsamp) mapmaker = mapmaking.MLMapmaker(signals, noise_model=noise_model, dtype=dtype_tod, verbose=verbose>0) + sidelobe_cutters = {} nkept = 0 - # TODO: Fix the task distribution. The current one doesn't care which mpi - # task gets which tods, which sabotages the pixel-saving effects of tiled maps! - # To be able to distribute the tods sensibly, we need a rough estimate of where - # on the sky each tod is. We should be able to get this using the central - # ctime, az and el for each tod. - for ind in range(comm.rank, len(sub_ids), comm.size): + + for ind in myinds: # Detsets correspond to separate files, so treat them as separate TODs. sub_id = sub_ids[ind] obs_id, wafer, band = sub_id.split(":") name = sub_id.replace(":", "_") L.debug("Processing %s" % sub_id) + if sub_id in to_skip: + L.debug("Skipped %s (Cut in previous pass)" % (sub_id)) + continue + try: meta = context.get_meta(sub_id) # Optionally restrict to maximum number of detectors. This is mainly @@ -179,8 +219,26 @@ class DataMissing(Exception): pass if len(my_dets) == 0: raise DataMissing("no dets left") # Actually read the data with bench.mark("read_obs %s" % sub_id): - obs = context.get_obs(sub_id, meta=meta) - + #obs = context.get_obs(sub_id, meta=meta) + obs, _ = pp_util.load_and_preprocess(obs_id, preproc, context=context, meta=meta) + if obs.dets.count < 50: + L.debug("Skipped %s (Not enough detectors)" % (sub_id)) + L.debug("Datacount: %s full" % (sub_id)) + to_skip += [sub_id] + continue + # Check nans + if not np.all(np.isfinite(obs.signal)): + L.debug("Skipped %s (there is a nan in signal)" % (sub_id)) + L.debug("Datacount: %s full" % (sub_id)) + to_skip += [sub_id] + continue + # Check all 0s + zero_dets = np.sum(obs.signal, axis=1) # sum the signal across the samples + if np.any(zero_dets == 0.0): + L.debug("%s has all 0s in at least 1 detector" % (sub_id)) + obs.restrict('dets', obs.dets.vals[np.logical_not(zero_dets == 0.0)]) + # Cut non-optical dets, this will be redundant if the preprocessing already cut them + obs.restrict('dets', obs.dets.vals[obs.det_info.wafer.type == 'OPTC']) # Fix boresight mapmaking.fix_boresight_glitches(obs) # Get our sample rate. Would have been nice to have this available in the axisman @@ -190,15 +248,15 @@ class DataMissing(Exception): pass obs.wrap("weather", np.full(1, "typical")) obs.wrap("site", np.full(1, SITE)) - # Prepare our data. FFT-truncate for faster fft ops - obs.restrict("samps", [0, fft.fft_len(obs.samps.count)]) - # Desolope to make it periodic. This should be done *before* # dropping to single precision, to avoid unnecessary loss of precision due # to potential high offses in the raw tod. utils.deslope(obs.signal, w=5, inplace=True) obs.signal = obs.signal.astype(dtype_tod) + # Convert to our target unit + obs.signal *= unit_defs[args.iunit]/unit_defs[args.unit] + if "flags" not in obs: obs.wrap("flags", core.AxisManager(obs.dets, obs.samps)) @@ -210,37 +268,25 @@ class DataMissing(Exception): pass # Optionally skip all the calibration. Useful for sims. if not args.nocal: + # measure rms, and convert to µKrts for comparison with sens_limits + rms = d1u.measure_rms(obs.signal, dt=1/srate) + rms *= unit_defs[args.unit]/unit_defs["uK"] + good = d1u.sensitivity_cut(rms, d1u.SENS_LIMITS[band]) + if np.logical_not(good).sum() / obs.dets.count > 0.5: + L.debug("Skipped %s (more than 50 percent of detectors cut by sens)" % (sub_id)) + L.debug("Datacount: %s full" % (sub_id)) + to_skip += [sub_id] + continue + else: + obs.restrict("dets", good) # Disqualify overly cut detectors - good_dets = mapmaking.find_usable_detectors(obs) + good_dets = mapmaking.find_usable_detectors(obs, args.maxcut) obs.restrict("dets", good_dets) if obs.dets.count == 0: + to_skip += [sub_id] L.debug("Skipped %s (all dets cut)" % (sub_id)) + L.debug("Datacount: %s full" % (sub_id)) continue - # Gapfill glitches. This function name isn't the clearest - tod_ops.get_gap_fill(obs, flags=obs.flags.glitch_flags, swap=True) - # Gain calibration - gain = 1 - for gtype in ["relcal","abscal"]: - gain *= obs[gtype][:,None] - obs.signal *= gain - # Fourier-space calibration - fsig = fft.rfft(obs.signal) - freq = fft.rfftfreq(obs.samps.count, 1/srate) - # iir filter - iir_filter = filters.iir_filter()(freq, obs) - fsig /= iir_filter - gain /= iir_filter[0].real # keep track of total gain for our record - fsig /= filters.timeconst_filter(None)(freq, obs) - fft.irfft(fsig, obs.signal, normalize=True) - del fsig - - # Apply pointing correction. - #obs.focal_plane.xi += obs.boresight_offset.xi - #obs.focal_plane.eta += obs.boresight_offset.eta - #obs.focal_plane.gamma += obs.boresight_offset.gamma - obs.focal_plane.xi += obs.boresight_offset.dx - obs.focal_plane.eta += obs.boresight_offset.dy - obs.focal_plane.gamma += obs.boresight_offset.gamma # Injecting at this point makes us insensitive to any bias introduced # in the earlier steps (mainly from gapfilling). The alternative is @@ -251,8 +297,16 @@ class DataMissing(Exception): pass mapmaking.inject_map(obs, map_to_inject, recenter=recenter, interpol=args.interpol) utils.deslope(obs.signal, w=5, inplace=True) + # sidelobes cuts + if not args.no_sidelobe: + cutss = sidelobes.get_cuts(obs, args, sidelobe_cutters) + for cut in cutss: + obs.flags.glitch_flags += cut + if passinfo.downsample != 1: obs = mapmaking.downsample_obs(obs, passinfo.downsample) + mmask = obs.flags.glitch_flags.mask() + L.debug(f"Datacount: {sub_id} added {obs.dets.count} {np.logical_not(mmask).sum()} ") # Maybe load precomputed noise model. # FIXME: How to handle multipass here? @@ -282,15 +336,17 @@ class DataMissing(Exception): pass print("Writing noise model %s" % nmat_file) utils.mkdir(nmat_dir) mapmaking.write_nmat(nmat_file, mapmaker.data[-1].nmat) - except (DataMissing,IndexError,ValueError) as e: + except (DataMissing,IndexError,ValueError,LoaderError,TypeError) as e: L.debug("Skipped %s (%s)" % (sub_id, str(e))) + L.debug("Datacount: %s full" % (sub_id)) continue - nkept = comm.allreduce(nkept) - if nkept == 0: + nkept_all = np.array([0],dtype='i') + comm.Allreduce(np.array([nkept],dtype='i'), nkept_all, op=mpi.SUM) + if nkept_all[0] == 0: if comm.rank == 0: L.info("All tods failed. Giving up") - sys.exit(1) + exit(1) L.info("Done building") @@ -302,6 +358,9 @@ class DataMissing(Exception): pass signal_map.write(prefix, "div", signal_map.div, unit=args.unit+'^-2') signal_map.write(prefix, "bin", enmap.map_mul(signal_map.idiv, signal_map.rhs), unit=args.unit) L.info("Wrote rhs, div, bin") + if args.hits: + signal_map.write(prefix, "hits", signal_map.hits, unit='hits') + L.info("Wrote hits") # Set up initial condition x0 = None if ipass == 0 else mapmaker.translate(mapmaker_prev, eval_prev.x_zip) diff --git a/tests/test_mapmaking_utils.py b/tests/test_mapmaking_utils.py new file mode 100644 index 000000000..5074d7202 --- /dev/null +++ b/tests/test_mapmaking_utils.py @@ -0,0 +1,103 @@ +# Copyright (c) 2023-2025 Simons Observatory. +# Full license can be found in the top level "LICENSE" file. + +import numpy as np +import so3g + +from sotodlib import core +from sotodlib.mapmaking.utils import filter_subids, find_usable_detectors + +from ._helpers import quick_tod + +# --------------------------------------------------------------------------- +# filter_subids +# --------------------------------------------------------------------------- + +SUBIDS = np.array([ + "obs_1750770479_satp3_1111111:ws3:f090", + "obs_1750774090_satp3_1111111:ws0:f150", + "obs_1750777688_satp3_1111011:ws0:f090", + "obs_1750781294_satp3_1110111:ws0:f150", + "obs_1750785970_satp3_1000000:ws0:f090" +]) + + +def test_filter_subids_no_filter(): + result = filter_subids(SUBIDS) + np.testing.assert_array_equal(result, SUBIDS) + + +def test_filter_subids_by_wafer(): + result = filter_subids(SUBIDS, wafers=["ws0"]) + assert all(":ws0:" in s for s in result) + assert len(result) == 4 + + +def test_filter_subids_by_band(): + result = filter_subids(SUBIDS, bands=["f090"]) + assert all(s.endswith("f090") for s in result) + assert len(result) == 3 + + +def test_filter_subids_by_ot(): + result = filter_subids(SUBIDS, ots=["satp3"]) + assert all("satp3" in s for s in result) + assert len(result) == 5 + + +def test_filter_subids_no_match(): + result = filter_subids(SUBIDS, wafers=["ws99"]) + assert len(result) == 0 + + +# --------------------------------------------------------------------------- +# find_usable_detectors +# --------------------------------------------------------------------------- + +def _add_glitch_flags(tod, cut_ranges_per_det): + """Add flags.glitch_flags to a quick_tod AxisManager. + + cut_ranges_per_det: list of lists of [start, end] pairs, one per detector. + """ + n_dets = tod.dets.count + n_samps = tod.samps.count + ranges = [] + for i in range(n_dets): + r = cut_ranges_per_det[i] + if r: + ri = so3g.RangesInt32.from_array( + np.array(r, dtype=np.int32).reshape(-1, 2), n_samps + ) + else: + ri = so3g.RangesInt32(n_samps) + ranges.append(ri) + glitch_flags = so3g.proj.ranges.RangesMatrix(ranges) + flags_aman = core.AxisManager(tod.dets, tod.samps) + flags_aman.wrap("glitch_flags", glitch_flags, [(0, "dets"), (1, "samps")]) + tod.wrap("flags", flags_aman) + return tod + + +def test_find_usable_detectors_all_good(): + tod = quick_tod(3, 1000) + tod = _add_glitch_flags(tod, [[], [], []]) + result = find_usable_detectors(tod) + np.testing.assert_array_equal(result, np.array(tod.dets.vals)) + + +def test_find_usable_detectors_one_cut(): + # det0001 has 20 % of samples flagged → removed with default maxcut=0.1 + tod = quick_tod(3, 1000) + tod = _add_glitch_flags(tod, [[], [[0, 200]], []]) + result = find_usable_detectors(tod) + assert tod.dets.vals[1] not in result + assert tod.dets.vals[0] in result + assert tod.dets.vals[2] in result + + +def test_find_usable_detectors_maxcut_threshold(): + # det0: 5 % flagged — kept at maxcut=0.1, removed at maxcut=0.04 + tod = quick_tod(2, 1000) + tod = _add_glitch_flags(tod, [[[0, 50]], []]) + assert tod.dets.vals[0] in find_usable_detectors(tod, maxcut=0.1) + assert tod.dets.vals[0] not in find_usable_detectors(tod, maxcut=0.04)