diff --git a/sotodlib/preprocess/pcore.py b/sotodlib/preprocess/pcore.py index 8443e170f..a9a1cf3b1 100644 --- a/sotodlib/preprocess/pcore.py +++ b/sotodlib/preprocess/pcore.py @@ -436,7 +436,8 @@ def extend(self, index, other): def __setitem__(self, index, item): super().__setitem__(index, self._check_item(item)) - def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False): + def run(self, aman, proc_aman=None, full_aman=None, select=True, + sim=False, update_plot=False): """ The main workhorse function for the pipeline class. This function takes an AxisManager TOD and successively runs the pipeline of preprocessing @@ -460,6 +461,12 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False): returned this preprocess axismanager. In this case, calls to ``process.calc_and_save()`` are skipped as the information is expected to be present in this AxisManager. + full_aman: AxisManager (Optional) + A preprocess axismanager. This axis manager stores the outputs of + preprocessing functions (proc_aman) but without any of the detector + or samps restrictions applied, thus maintaining its original shape. + This is returned at the end of the pipeline. If not passed it is + instantiated with the same number of dets and samps as aman. select: boolean (Optional) if True, the aman detector axis is restricted as described in each preprocess module. Most pipelines are developed with @@ -475,18 +482,22 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False): Returns ------- - proc_aman: AxisManager + full_aman: AxisManager A preprocess axismanager that contains all data products calculated - throughout the running of the pipeline - + throughout the running of the pipeline. + success: str + A string that stores the name of the last process step that the pipeline + completed. If the pipeline successfully finishes all steps, success = 'end'. """ if proc_aman is None: if 'preprocess' in aman: proc_aman = aman.preprocess.copy() - full = aman.preprocess.copy() + if full_aman is None: + full_aman = aman.preprocess.copy() else: proc_aman = core.AxisManager(aman.dets, aman.samps) - full = core.AxisManager( aman.dets, aman.samps) + if full_aman is None: + full_aman = core.AxisManager( aman.dets, aman.samps) run_calc = True update_plot = False else: @@ -495,7 +506,8 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False): det_list = [det for det in proc_aman.dets.vals if det in aman.dets.vals] aman.restrict('dets', det_list) proc_aman.restrict('dets', det_list) - full = proc_aman.copy() + if full_aman is None: + full_aman = proc_aman.copy() run_calc = False if 'frequency_cutoffs' not in proc_aman: @@ -527,7 +539,7 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False): if run_calc: aman, proc_aman = process.calc_and_save(aman, proc_aman) process.plot(aman, proc_aman, filename=os.path.join(self.plot_dir, '{ctime}/{obsid}', f'{step+1}_{{name}}.png')) - update_full_aman( proc_aman, full, self.wrap_valid) + update_full_aman( proc_aman, full_aman, self.wrap_valid) if update_plot: process.plot(aman, proc_aman, filename=os.path.join(self.plot_dir, '{ctime}/{obsid}', f'{step+1}_{{name}}.png')) plt.close() @@ -535,22 +547,22 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False): process.select(aman, proc_aman) proc_aman.restrict('dets', aman.dets.vals) self.logger.debug(f"{proc_aman.dets.count} detectors remaining") - + if aman.dets.count == 0: success = process.name break if run_calc: - _wrap_valid_ranges(proc_aman, full, valid_name='valid_data', + _wrap_valid_ranges(proc_aman, full_aman, valid_name='valid_data', wrap_name='valid_data') - # copy updated frequency cutoffs to full - if "frequency_cutoffs" in full: - full.move("frequency_cutoffs", None) - full.wrap("frequency_cutoffs", proc_aman["frequency_cutoffs"]) + # copy updated frequency cutoffs to full_aman + if "frequency_cutoffs" in full_aman: + full_aman.move("frequency_cutoffs", None) + full_aman.wrap("frequency_cutoffs", proc_aman["frequency_cutoffs"]) + + return full_aman, success - return full, success - class _FracFlaggedMixIn(object): diff --git a/sotodlib/preprocess/preprocess_util.py b/sotodlib/preprocess/preprocess_util.py index 43de196cc..7e5622068 100644 --- a/sotodlib/preprocess/preprocess_util.py +++ b/sotodlib/preprocess/preprocess_util.py @@ -439,54 +439,29 @@ def swap_archive(config, fpath): return tc -def load_preprocess_det_select(obs_id, configs, context=None, - dets=None, meta=None, logger=None): - """Loads the metadata information for the Observation and runs through any - data selection specified by the Preprocessing Pipeline. - - Arguments - ---------- - obs_id : multiple - Passed to `context.get_obs` to load AxisManager, see Notes for - `context.get_obs` - configs : string or dictionary - Config file or loaded config directory - context : core.Context - The Context file to use. - dets : dict - Dets to restrict on from info in det_info. See context.get_meta. - meta : AxisManager - Contains supporting metadata to use for loading. - Can be pre-restricted in any way. See context.get_meta. - logger : PythonLogger - Optional. Logger object. If None, a new logger - is created. - - Returns - ------- - list - Restricted list of detector vals. +def load_and_preprocess_det_select(meta, pipe): """ + Helper function to accumulate det selections from a preprocessing pipeline. + """ + if ( + 'valid_data' in meta.preprocess and + isinstance(meta.preprocess.valid_data, core.AxisManager) + ): + keep_all = has_any_cuts(meta.preprocess.valid_data.valid_data) + else: + keep_all = np.ones(meta.dets.count,dtype=bool) + for process in pipe[:]: + keep = process.select(meta, in_place=False) + if isinstance(keep, np.ndarray): + keep_all &= keep - if logger is None: - logger = init_logger("preprocess") - - configs, context = get_preprocess_context(configs, context) - pipe = Pipeline(configs["process_pipe"], logger=logger) - - if meta is None: - meta = context.get_meta(obs_id, dets=dets) - logger.info("Restricting detectors on all processes") - keep_all = np.ones(meta.dets.count,dtype=bool) - for process in pipe[:]: - keep = process.select(meta, in_place=False) - if isinstance(keep, np.ndarray): - keep_all &= keep - return meta.dets.vals[keep_all] + return keep_all -def load_and_preprocess(obs_id, configs, context=None, dets=None, meta=None, - no_signal=None, logger=None): +def load_and_preprocess(obs_id, configs_init, configs_proc=None, context=None, + dets=None, meta=None, no_signal=None, logger=None, + return_full_aman=False, init_only=False, + ignore_cfg_check=False): """Loads the saved information from the preprocessing pipeline and runs the processing section of the pipeline. @@ -513,152 +488,104 @@ def load_and_preprocess(obs_id, configs, context=None, dets=None, meta=None, logger : PythonLogger Optional. Logger object. If None, a new logger is created. + return_full_aman : bool + Optional. Return unrestricted axis manager alongside restricted aman + if True, otherwise return None. + init_only : bool + Optional. Whether or not to run the dependent pipeline. + ignore_cfg_check : bool + If True, do not attempt to validate that configs_init is the same as + the config used to create the existing init db. Returns ------- aman : core.AxisManager or None Loaded and restricted axis manager with preprocessing metadata. Returns ``None`` if all detectors cut. + full_aman : core.AxisManager or None + Unrestricted preprocessing axis manager. Used when running multilayer + pipeline to ensure saved detector axis has the full size when saving + metadata. """ if logger is None: logger = init_logger("preprocess") - configs, context = get_preprocess_context(configs, context) - meta = context.get_meta(obs_id, dets=dets, meta=meta) - if ( - 'valid_data' in meta.preprocess and - isinstance(meta.preprocess.valid_data, core.AxisManager) - ): - keep = has_any_cuts(meta.preprocess.valid_data.valid_data) - meta.restrict("dets", keep) - else: - det_vals = load_preprocess_det_select(obs_id, configs=configs, context=context, - dets=dets, meta=meta, logger=logger) - meta.restrict("dets", [d for d in meta.dets.vals if d in det_vals]) + configs_init, context_init = get_preprocess_context(configs_init) + meta_init = context_init.get_meta(obs_id, dets=dets, meta=meta) + group_by_init = np.atleast_1d(configs_init['subobs'].get('use', 'detset')) - if meta.dets.count == 0: - logger.info(f"No detectors left after cuts in obs {obs_id}") + if meta_init.dets.count == 0: + logger.warn(f"No detectors in init db for obs {obs_id}") return None + + if return_full_aman: + full_aman = meta_init.preprocess.copy() else: - pipe = Pipeline(configs["process_pipe"], logger=logger) - aman = context.get_obs(meta, no_signal=no_signal) - pipe.run(aman, aman.preprocess, select=False) - return aman + full_aman = None + if configs_proc is not None: + configs_proc, context_proc = get_preprocess_context(configs_proc) + meta_proc = context_proc.get_meta(obs_id, dets=dets, meta=meta) + group_by_proc = np.atleast_1d(configs_proc['subobs'].get('use', 'detset')) -def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc, - dets=None, meta=None, no_signal=None, - logger=None, init_only=False, - ignore_cfg_check=False): - """Loads the saved information from the preprocessing pipeline from a - reference and a dependent database and runs the processing section of - the pipeline for each. + if (group_by_init != group_by_proc).any(): + raise ValueError('init and proc groups do not match') - Assumes preprocess_tod and multilayer_preprocess_tod have already been run - on the requested observation. + if meta_proc.dets.count == 0: + logger.warn(f"No detectors in proc db for obs {obs_id}") + return None - Arguments - ---------- - obs_id : multiple - Passed to `context.get_obs` to load AxisManager, see Notes for - `context.get_obs` - configs_init : string or dictionary - Config file or loaded config directory - configs_proc : string or dictionary - Second config file or loaded config dictionary to load - dependent databases generated using multilayer_preprocess_tod.py. - dets : dict - Dets to restrict on from info in det_info. See context.get_meta. - meta : AxisManager - Contains supporting metadata to use for loading. - Can be pre-restricted in any way. See context.get_meta. - no_signal : bool - If True, signal will be set to None. - This is a way to get the axes and pointing info without - the (large) TOD blob. Not all loaders may support this. - logger : PythonLogger - Optional. Logger object or None will generate a new one. - init_only : bool - Optional. If True, do not run the dependent pipeline. - ignore_cfg_check : bool - If True, do not attempt to validate that configs_init is the same as - the config used to create the existing init db. + if return_full_aman: + if 'valid_data' in full_aman: + full_aman.move('valid_data', None) + full_aman.merge(meta_proc.preprocess.copy()) - Returns - ------- - aman : core.AxisManager or None - Loaded and restricted axis manager with preprocessing metadata. Returns - ``None`` if all detectors cut. - """ + pipe_init = Pipeline(configs_init["process_pipe"], logger=logger) + if configs_proc is not None and not ignore_cfg_check: + aman_cfgs_ref = get_pcfg_check_aman(pipe_init) + cfg_check = check_cfg_match(aman_cfgs_ref, meta_proc.preprocess['pcfg_ref'], + logger=logger) + if not cfg_check: + raise ValueError('Dependency check between configs failed.') - if logger is None: - logger = init_logger("preprocess") + logger.info("Restricting detectors on all init pipeline processes") + keep_all = load_and_preprocess_det_select(meta_init, pipe_init) + meta_init.restrict("dets", meta_init.dets.vals[keep_all]) - configs_init, context_init = get_preprocess_context(configs_init) - meta_init = context_init.get_meta(obs_id, dets=dets, meta=meta) + if configs_proc is not None: + meta_proc.restrict("dets", meta_init.dets.vals) - configs_proc, context_proc = get_preprocess_context(configs_proc) - meta_proc = context_proc.get_meta(obs_id, dets=dets, meta=meta) + pipe_proc = Pipeline(configs_proc["process_pipe"], logger=logger) + logger.info("Restricting detectors on all proc pipeline processes") + keep_all = load_and_preprocess_det_select(meta_proc, pipe_proc) - group_by_init = np.atleast_1d(configs_init['subobs'].get('use', 'detset')) - group_by_proc = np.atleast_1d(configs_proc['subobs'].get('use', 'detset')) + meta_proc.restrict("dets", meta_proc.dets.vals[keep_all]) + meta_init.restrict('dets', meta_proc.dets.vals) - if (group_by_init != group_by_proc).any(): - raise ValueError('init and proc groups do not match') + aman = context_init.get_obs(meta_init, no_signal=no_signal) + logger.info("Running initial pipeline") + pipe_init.run(aman, aman.preprocess, select=False) - if meta_init.dets.count == 0 or meta_proc.dets.count == 0: - logger.info(f"No detectors in obs {obs_id}") - return None - else: - pipe_init = Pipeline(configs_init["process_pipe"], logger=logger) - aman_cfgs_ref = get_pcfg_check_aman(pipe_init) + if init_only: + return aman, full_aman - if ( - ignore_cfg_check or - check_cfg_match(aman_cfgs_ref, meta_proc.preprocess['pcfg_ref'], - logger=logger) - ): - pipe_proc = Pipeline(configs_proc["process_pipe"], logger=logger) - - logger.info("Restricting detectors on all proc pipeline processes") - if ( - 'valid_data' in meta_proc.preprocess and - isinstance(meta_proc.preprocess.valid_data, core.AxisManager) - ): - keep_all = has_any_cuts(meta_proc.preprocess.valid_data.valid_data) - else: - keep_all = np.ones(meta_proc.dets.count, dtype=bool) - for process in pipe_proc[:]: - keep = process.select(meta_proc, in_place=False) - if isinstance(keep, np.ndarray): - keep_all &= keep - meta_proc.restrict("dets", meta_proc.dets.vals[keep_all]) - meta_init.restrict('dets', meta_proc.dets.vals) - - aman = context_init.get_obs(meta_init, no_signal=no_signal) - logger.info("Running initial pipeline") - pipe_init.run(aman, aman.preprocess, select=False) - if init_only: - return aman - - logger.info("Running dependent pipeline") - proc_aman = context_proc.get_meta(obs_id, meta=aman) - - if 'valid_data' in aman.preprocess: - aman.preprocess.move('valid_data', None) - aman.preprocess.merge(proc_aman.preprocess) - pipe_proc.run(aman, aman.preprocess, select=False) + if configs_proc is not None: + logger.info("Running dependent pipeline") - return aman - else: - raise ValueError('Dependency check between configs failed.') + if 'valid_data' in aman.preprocess: + aman.preprocess.move('valid_data', None) + aman.preprocess.merge(meta_proc.preprocess) + pipe_proc.run(aman, aman.preprocess, select=False) + + return aman, full_aman def multilayer_load_and_preprocess_sim(obs_id, configs_init, configs_proc, sim_map, meta=None, logger=None, init_only=False, - t2ptemplate_aman=None): + t2ptemplate_aman=None, + ignore_cfg_check=False): """Loads the saved information from the preprocessing pipeline from a reference and a dependent database, loads the signal from a (simulated) map into the AxisManager and runs the processing section of the pipeline @@ -693,6 +620,9 @@ def multilayer_load_and_preprocess_sim(obs_id, configs_init, configs_proc, t2ptemplate_aman : AxisManager Optional. AxisManager to use as a template for t2p leakage deprojection. + ignore_cfg_check : bool + If True, do not attempt to validate that configs_init is the same as + the config used to create the existing init db. Returns ------- @@ -716,62 +646,72 @@ def multilayer_load_and_preprocess_sim(obs_id, configs_init, configs_proc, raise ValueError('init and proc groups do not match') if meta_init.dets.count == 0 or meta_proc.dets.count == 0: - logger.info(f"No detectors in obs {obs_id}") + logger.warn(f"No detectors in obs {obs_id}") return None - else: - pipe_init = Pipeline(configs_init["process_pipe"], logger=logger) - aman_cfgs_ref = get_pcfg_check_aman(pipe_init) - if check_cfg_match(aman_cfgs_ref, meta_proc.preprocess['pcfg_ref'], - logger=logger): - pipe_proc = Pipeline(configs_proc["process_pipe"], logger=logger) - - logger.info("Restricting detectors on all proc pipeline processes") - keep_all = np.ones(meta_proc.dets.count, dtype=bool) - for process in pipe_proc[:]: - keep = process.select(meta_proc, in_place=False) - if isinstance(keep, np.ndarray): - keep_all &= keep - meta_proc.restrict("dets", meta_proc.dets.vals[keep_all]) - meta_init.restrict('dets', meta_proc.dets.vals) - aman = context_init.get_obs(meta_proc, no_signal=True) - aman = hwp_angle_model.apply_hwp_angle_model(aman) - aman.move("signal", None) - - logger.info("Reading in simulated map") - demod_mm.from_map(aman, sim_map, wrap=True, modulated=True) - - logger.info("Running initial pipeline") - pipe_init.run(aman, aman.preprocess, sim=True) - - if init_only: - return aman - - if t2ptemplate_aman is not None: - # Replace Q,U with simulated timestreams - t2ptemplate_aman.wrap("demodQ", aman.demodQ, [(0, 'dets'), (1, 'samps')], overwrite=True) - t2ptemplate_aman.wrap("demodU", aman.demodU, [(0, 'dets'), (1, 'samps')], overwrite=True) - - t2p_aman = t2pleakage.get_t2p_coeffs( - t2ptemplate_aman, - merge_stats=False - ) - t2pleakage.subtract_t2p( - aman, - t2p_aman, - T_signal=t2ptemplate_aman.dsT - ) + pipe_init = Pipeline(configs_init["process_pipe"], logger=logger) + aman_cfgs_ref = get_pcfg_check_aman(pipe_init) + + if ( + ignore_cfg_check or + check_cfg_match(aman_cfgs_ref, meta_proc.preprocess['pcfg_ref'], + logger=logger) + ): + pipe_proc = Pipeline(configs_proc["process_pipe"], logger=logger) + + logger.info("Restricting detectors on all init pipeline processes") + keep_all = load_and_preprocess_det_select(meta_init, pipe_init) + meta_init.restrict("dets", meta_init.dets.vals[keep_all]) + meta_proc.restrict("dets", meta_init.dets.vals) + + logger.info("Restricting detectors on all proc pipeline processes") + keep_all = load_and_preprocess_det_select(meta_proc, pipe_proc) - logger.info("Running dependent pipeline") - proc_aman = context_proc.get_meta(obs_id, meta=aman) - if 'valid_data' in aman.preprocess: - aman.preprocess.move('valid_data', None) - aman.preprocess.merge(proc_aman.preprocess) - pipe_proc.run(aman, aman.preprocess, sim=True) + meta_proc.restrict("dets", meta_proc.dets.vals[keep_all]) + meta_init.restrict('dets', meta_proc.dets.vals) + aman = context_init.get_obs(meta_proc, no_signal=True) + aman = hwp_angle_model.apply_hwp_angle_model(aman) + aman.move("signal", None) + + logger.info("Reading in simulated map") + demod_mm.from_map(aman, sim_map, wrap=True, modulated=True) + + logger.info("Running initial pipeline") + pipe_init.run(aman, aman.preprocess, sim=True) + + if init_only: return aman - else: - raise ValueError('Dependency check between configs failed.') + + if t2ptemplate_aman is not None: + # Replace Q,U with simulated timestreams + t2ptemplate_aman.wrap("demodQ", aman.demodQ, + [(0, 'dets'), (1, 'samps')], + overwrite=True) + t2ptemplate_aman.wrap("demodU", aman.demodU, + [(0, 'dets'), (1, 'samps')], + overwrite=True) + + t2p_aman = t2pleakage.get_t2p_coeffs( + t2ptemplate_aman, + merge_stats=False + ) + t2pleakage.subtract_t2p( + aman, + t2p_aman, + T_signal=t2ptemplate_aman.dsT + ) + + logger.info("Running dependent pipeline") + proc_aman = context_proc.get_meta(obs_id, meta=aman) + if 'valid_data' in aman.preprocess: + aman.preprocess.move('valid_data', None) + aman.preprocess.merge(proc_aman.preprocess) + pipe_proc.run(aman, aman.preprocess, sim=True) + + return aman + else: + raise ValueError('Dependency check between configs failed.') def find_db(obs_id, configs, dets, context=None, logger=None): @@ -1188,9 +1128,14 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, if db_init_exist and not db_proc_exist: out_dict_init = None try: + # need unrestricted proc aman for second layer + if configs_proc is not None: + return_full_aman = True + else: + return_full_aman = False logger.info(f"Loading and applying preprocessing for initial layer db on {obs_id}:{group}") - aman = load_and_preprocess(obs_id=obs_id, dets=dets, configs=configs_init, - logger=logger) + aman, proc_aman = load_and_preprocess(obs_id=obs_id, dets=dets, configs_init=configs_init, + logger=logger, return_full_aman=return_full_aman) except Exception as e: errmsg, tb = PreprocessErrors.get_errors(e) logger.error(f"Initial layer Pipeline Load Error for {obs_id}: {group}\n{errmsg}\n{tb}") @@ -1205,9 +1150,8 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, elif db_init_exist and db_proc_exist: try: logger.info(f"Loading and applying preprocessing for both dbs on {obs_id}:{group}") - aman = multilayer_load_and_preprocess(obs_id=obs_id, dets=dets, configs_init=configs_init, - configs_proc=configs_proc, logger=logger, - ignore_cfg_check=ignore_cfg_check) + aman, _ = load_and_preprocess(obs_id=obs_id, dets=dets, configs_init=configs_init, + configs_proc=configs_proc, logger=logger) logger.info(f"preproc_or_load_group finished successfully for {obs_id}:{group}") return aman, None, None, (PreprocessErrors.LoadSuccess, None, None) except Exception as e: @@ -1295,7 +1239,7 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, pipe_proc = Pipeline(configs_proc["process_pipe"], plot_dir=configs_proc["plot_dir"], logger=logger) - proc_aman, success = pipe_proc.run(aman) + proc_aman, success = pipe_proc.run(aman, full_aman=proc_aman) pipe_init = Pipeline(configs_init["process_pipe"], plot_dir=configs_init["plot_dir"], logger=logger) diff --git a/sotodlib/site_pipeline/multilayer_preprocess_tod.py b/sotodlib/site_pipeline/multilayer_preprocess_tod.py index 5fc05cb4c..4f362ec43 100644 --- a/sotodlib/site_pipeline/multilayer_preprocess_tod.py +++ b/sotodlib/site_pipeline/multilayer_preprocess_tod.py @@ -464,7 +464,7 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], n_groups_fail += 1 if n_groups_fail > 0: - raise RuntimeError(f"preprocess_tod ended with {n_obs_fail}/{len(obs_errors)} " + raise RuntimeError(f"multilayer_preprocess_tod ended with {n_obs_fail}/{len(obs_errors)} " f"failed obsids and {n_groups_fail}/{len(run_list)} failed groups") logger.info("multilayer_preprocess_tod is done")