Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions sotodlib/preprocess/pcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,8 @@ def extend(self, index, other):
def __setitem__(self, index, item):
super().__setitem__(index, self._check_item(item))

def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):
def run(self, aman, proc_aman=None, full_aman=None, select=True,
sim=False, update_plot=False):
"""
The main workhorse function for the pipeline class. This function takes
an AxisManager TOD and successively runs the pipeline of preprocessing
Expand All @@ -460,6 +461,12 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):
returned this preprocess axismanager. In this case, calls to
``process.calc_and_save()`` are skipped as the information is
expected to be present in this AxisManager.
full_aman: AxisManager (Optional)
A preprocess axismanager. This axis manager stores the outputs of
preprocessing functions (proc_aman) but without any of the detector
or samps restrictions applied, thus maintaining its original shape.
This is returned at the end of the pipeline. If not passed it is
instantiated with the same number of dets and samps as aman.
select: boolean (Optional)
if True, the aman detector axis is restricted as described in
each preprocess module. Most pipelines are developed with
Expand All @@ -475,18 +482,22 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):

Returns
-------
proc_aman: AxisManager
full_aman: AxisManager
A preprocess axismanager that contains all data products calculated
throughout the running of the pipeline

throughout the running of the pipeline.
success: str
A string that stores the name of the last process step that the pipeline
completed. If the pipeline successfully finishes all steps, success = 'end'.
"""
if proc_aman is None:
if 'preprocess' in aman:
proc_aman = aman.preprocess.copy()
full = aman.preprocess.copy()
if full_aman is None:
full_aman = aman.preprocess.copy()
else:
proc_aman = core.AxisManager(aman.dets, aman.samps)
full = core.AxisManager( aman.dets, aman.samps)
if full_aman is None:
full_aman = core.AxisManager( aman.dets, aman.samps)
run_calc = True
update_plot = False
else:
Expand All @@ -495,7 +506,8 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):
det_list = [det for det in proc_aman.dets.vals if det in aman.dets.vals]
aman.restrict('dets', det_list)
proc_aman.restrict('dets', det_list)
full = proc_aman.copy()
if full_aman is None:
full_aman = proc_aman.copy()
run_calc = False

if 'frequency_cutoffs' not in proc_aman:
Expand Down Expand Up @@ -527,30 +539,30 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):
if run_calc:
aman, proc_aman = process.calc_and_save(aman, proc_aman)
process.plot(aman, proc_aman, filename=os.path.join(self.plot_dir, '{ctime}/{obsid}', f'{step+1}_{{name}}.png'))
update_full_aman( proc_aman, full, self.wrap_valid)
update_full_aman( proc_aman, full_aman, self.wrap_valid)
if update_plot:
process.plot(aman, proc_aman, filename=os.path.join(self.plot_dir, '{ctime}/{obsid}', f'{step+1}_{{name}}.png'))
plt.close()
if select:
process.select(aman, proc_aman)
proc_aman.restrict('dets', aman.dets.vals)
self.logger.debug(f"{proc_aman.dets.count} detectors remaining")

if aman.dets.count == 0:
success = process.name
break

if run_calc:
_wrap_valid_ranges(proc_aman, full, valid_name='valid_data',
_wrap_valid_ranges(proc_aman, full_aman, valid_name='valid_data',
wrap_name='valid_data')

# copy updated frequency cutoffs to full
if "frequency_cutoffs" in full:
full.move("frequency_cutoffs", None)
full.wrap("frequency_cutoffs", proc_aman["frequency_cutoffs"])
# copy updated frequency cutoffs to full_aman
if "frequency_cutoffs" in full_aman:
full_aman.move("frequency_cutoffs", None)
full_aman.wrap("frequency_cutoffs", proc_aman["frequency_cutoffs"])

return full_aman, success

return full, success


class _FracFlaggedMixIn(object):

Expand Down
Loading