From b6563eae4f1c3b181e6fa735cbee357832d07c2b Mon Sep 17 00:00:00 2001 From: teutoburg Date: Fri, 25 Aug 2023 19:37:15 +0200 Subject: [PATCH] No more mutable default arguments! Also collections.abc. And some de-nesting, and some renaming, and various small stuff. --- scopesim/detector/detector_array.py | 6 +- scopesim/optics/fov_manager.py | 156 ++++++++++++++------------- scopesim/optics/fov_manager_utils.py | 4 +- 3 files changed, 87 insertions(+), 79 deletions(-) diff --git a/scopesim/detector/detector_array.py b/scopesim/detector/detector_array.py index c073b27b..b53df817 100644 --- a/scopesim/detector/detector_array.py +++ b/scopesim/detector/detector_array.py @@ -19,7 +19,7 @@ def __init__(self, detector_list=None, **kwargs): self.detectors = [] self.latest_exposure = None - def readout(self, image_planes, array_effects=[], dtcr_effects=[], **kwargs): + def readout(self, image_planes, array_effects=None, dtcr_effects=None, **kwargs): """ Read out the detector array into a FITS file @@ -54,8 +54,8 @@ def readout(self, image_planes, array_effects=[], dtcr_effects=[], **kwargs): # - add ImageHDUs # - add ASCIITableHDU with Effects meta data in final table extension - self.array_effects = array_effects - self.dtcr_effects = dtcr_effects + self.array_effects = array_effects or [] + self.dtcr_effects = dtcr_effects or [] self.meta.update(kwargs) # 0. Get the image plane that corresponds to this detector array diff --git a/scopesim/optics/fov_manager.py b/scopesim/optics/fov_manager.py index 819385e9..89d0f859 100644 --- a/scopesim/optics/fov_manager.py +++ b/scopesim/optics/fov_manager.py @@ -43,10 +43,11 @@ # """ from copy import deepcopy -import numpy as np from typing import TextIO from io import StringIO +from collections.abc import Iterable, MutableSequence +import numpy as np from astropy import units as u from . import image_plane_utils as ipu @@ -72,7 +73,7 @@ class FOVManager: All observation parameters as passed from UserCommands """ - def __init__(self, effects=[], **kwargs): + def __init__(self, effects=None, **kwargs): self.meta = {"area": "!TEL.area", "pixel_scale": "!INST.pixel_scale", "plate_scale": "!INST.plate_scale", @@ -94,9 +95,9 @@ def __init__(self, effects=[], **kwargs): params["meta"] = from_currsys({key: self.meta[key] for key in fvl_meta}) self.volumes_list = FovVolumeList(initial_volume=params) - self.effects = effects + self.effects = effects or [] self._fovs_list = [] - self.is_spectroscope = eu.is_spectroscope(effects) + self.is_spectroscope = eu.is_spectroscope(self.effects) if from_currsys(self.meta["preload_fovs"]) is True: self._fovs_list = self.generate_fovs_list() @@ -173,11 +174,11 @@ def fovs(self): return self._fovs_list @property - def fov_footprints(self, which="both"): + def fov_footprints(self): return None -class FovVolumeList(FOVSetupBase): +class FovVolumeList(FOVSetupBase, MutableSequence): """ List of FOV volumes for FOVManager @@ -211,9 +212,9 @@ def __init__(self, initial_volume=None): "yd_min": 0, "yd_max": 0} - def split(self, axis, value, aperture_id=None): + def split(self, axis, value, aperture_id=None) -> None: """ - Splits the all volumes that include axis=value into two. + Split the all volumes that include axis=value into two. - Loop through all volume dict - Find any entries where min < value < max @@ -221,8 +222,8 @@ def split(self, axis, value, aperture_id=None): Parameters ---------- - axis : str, list of str - "wave", "x", "y" + axis : {"wave", "x", "y"}, or list thereof + Which axis (``str``) or axes (``list[str]``) to use. value : float, list of floats aperture_id : int, optional Default None. If ``None``, split all volumes. If ``int``, only split @@ -244,20 +245,25 @@ def split(self, axis, value, aperture_id=None): if isinstance(axis, (tuple, list)): for ax, val in zip(axis, value): self.split(ax, val) - elif isinstance(value, (tuple, list, np.ndarray)): + return + + if isinstance(value, Iterable): for val in value: self.split(axis, val) - else: - for i, vol_old in enumerate(self.volumes): - if aperture_id in (vol_old["meta"]["aperture_id"], None) \ - and vol_old[f"{axis}_min"] < value \ - and vol_old[f"{axis}_max"] > value: - vol_new = deepcopy(vol_old) - vol_new[f"{axis}_min"] = value - vol_old[f"{axis}_max"] = value - self.volumes.insert(i+1, vol_new) - - def shrink(self, axis, values, aperture_id=None): + return + + for vol in self: + if (aperture_id is not None and + aperture_id != vol["meta"]["aperture_id"]): + continue + if vol[f"{axis}_min"] >= value or vol[f"{axis}_max"] <= value: + continue + new_vol = vol.copy() + new_vol[f"{axis}_min"] = value + vol[f"{axis}_max"] = value + self.insert(self.index(vol) + 1, new_vol) + + def shrink(self, axis, values, aperture_id=None) -> None: """ - Loop through all volume dict - Replace any entries where min < values.min @@ -265,8 +271,8 @@ def shrink(self, axis, values, aperture_id=None): Parameters ---------- - axis : str - "wave", "x", "y" + axis : {"wave", "x", "y"} or list thereof + Which axis (``str``) or axes (``list[str]``) to use. values : list of 2 floats [min, max], [min, None], [None, max] aperture_id : int, optional @@ -283,47 +289,52 @@ def shrink(self, axis, values, aperture_id=None): """ + # FIXME: Isn't this method just the same as setting self.volumes to the + # output list of self.extract()?? Except the None values. if isinstance(axis, (tuple, list)): for ax, val in zip(axis, values): self.shrink(ax, val) - else: - to_pop = [] + return + + to_pop = [] + + for vol in self: + if (aperture_id is not None and + aperture_id != vol["meta"]["aperture_id"]): + continue if values[0] is not None: - for i, vol in enumerate(self.volumes): - if aperture_id in (vol["meta"]["aperture_id"], None): - if vol[f"{axis}_max"] <= values[0]: - to_pop.append(i) - elif vol[f"{axis}_min"] < values[0]: - vol[f"{axis}_min"] = values[0] + if vol[f"{axis}_max"] <= values[0]: + to_pop.append(self.index(vol)) + continue + vol[f"{axis}_min"] = max(values[0], vol[f"{axis}_min"]) if values[1] is not None: - for i, vol in enumerate(self.volumes): - if aperture_id in (vol["meta"]["aperture_id"], None): - if vol[f"{axis}_min"] >= values[1]: - to_pop.append(i) - if vol[f"{axis}_max"] > values[1]: - vol[f"{axis}_max"] = values[1] + if vol[f"{axis}_min"] >= values[1]: + to_pop.append(self.index(vol)) + continue + vol[f"{axis}_max"] = min(values[1], vol[f"{axis}_max"]) - for i in sorted(to_pop)[::-1]: - self.volumes.pop(i) + for idx in reversed(sorted(to_pop)): + self.pop(idx) def extract(self, axes, edges, aperture_id=None): """ - Returns new volumes from within all existing volumes + Return new volumes from within all existing volumes. This method DOES NOT alter the existing self.volumes list To include the returned volumes, add them to the self.volumes list Parameters ---------- - axes : str, list of str - "wave", "x", "y" + axes : list of either {"wave", "x", "y"} + Which axis (``list`` of single ``str``) or axes (``list[str]``) + to use. Must be ``list`` in either case. edges : list, tuple of lists Edge points for each axes listed aperture_id : int, optional Default None. If ``None``, extract from all volumes. If ``int``, - only extract from volumes with this ``aperture_id`` in the meta dict + only extract from volumes with this `aperture_id` in the meta dict Examples -------- @@ -345,38 +356,36 @@ def extract(self, axes, edges, aperture_id=None): A list of all new volumes extracted from existing volumes """ - new_vols = [] - for old_vol in self.volumes: - if aperture_id in (old_vol["meta"]["aperture_id"], None): - add_flag = True - new_vol = deepcopy(old_vol) + def _get_new_vols(): + for vol in self: + if (aperture_id is not None and + aperture_id != vol["meta"]["aperture_id"]): + continue + if not all(_volume_in_range(vol, axis, edge) for axis, edge + in zip(axes, edges)): + continue + new_vol = vol.copy() for axis, edge in zip(axes, edges): - if edge[0] <= old_vol[f"{axis}_max"] and \ - edge[1] >= old_vol[f"{axis}_min"]: - new_vol[f"{axis}_min"] = max(edge[0], old_vol[f"{axis}_min"]) - new_vol[f"{axis}_max"] = min(edge[1], old_vol[f"{axis}_max"]) - else: - add_flag = False + new_vol[f"{axis}_min"] = max(edge[0], vol[f"{axis}_min"]) + new_vol[f"{axis}_max"] = min(edge[1], vol[f"{axis}_max"]) + yield new_vol - if add_flag is True: - new_vols.append(new_vol) - - return new_vols + return list(_get_new_vols()) def __len__(self): return len(self.volumes) - def __iter__(self): - return iter(self.volumes) + def __getitem__(self, index): + return self.volumes[index] - def __getitem__(self, key): - return self.volumes[key] + def __setitem__(self, index, value): + self.volumes[index] = value - def __setitem__(self, key, value): - self.volumes[item] = value + def __delitem__(self, index): + del self.volumes[index] - def __delitem__(self, key): - del self.volumes[key] + def insert(self, index, value): + self.volumes.insert(index, value) def write_string(self, stream: TextIO) -> None: """Write formatted string representation to I/O stream""" @@ -403,21 +412,18 @@ def __str__(self) -> str: output = str_stream.getvalue() return output - def __iadd__(self, other): - if isinstance(other, list): - self.volumes += other - else: - raise ValueError(f"Can only add lists of volumes: {other}") - - return self - def __add__(self, other): + # TODO: Is this functionality actually used anywhere? new_self = deepcopy(self) new_self += other return new_self +def _volume_in_range(vol: dict, axis: str, edge) -> bool: + return edge[0] <= vol[f"{axis}_max"] and edge[1] >= vol[f"{axis}_min"] + + # Spectroscopy FOV setup # ====================== # diff --git a/scopesim/optics/fov_manager_utils.py b/scopesim/optics/fov_manager_utils.py index 05165939..c03e6084 100644 --- a/scopesim/optics/fov_manager_utils.py +++ b/scopesim/optics/fov_manager_utils.py @@ -326,7 +326,9 @@ def get_spectroscopy_headers(effects, **kwargs): return fov_headers -def get_spectroscopy_fovs(headers, shifts, effects=[], **kwargs): +def get_spectroscopy_fovs(headers, shifts, effects=None, **kwargs): + if effects is None: + effects = [] shift_waves = shifts["wavelengths"] # in [um] shift_dx = shifts["x_shifts"] # in [deg]