diff --git a/software/squid/abc.py b/software/squid/abc.py index f9f25577..8aa3c3da 100644 --- a/software/squid/abc.py +++ b/software/squid/abc.py @@ -1,5 +1,15 @@ from abc import ABC, abstractmethod -from typing import Tuple +from typing import Callable, Optional, Tuple +import abc +import enum +import time + +import pydantic +import numpy as np + +import squid.logging +from squid.config import AxisConfig, StageConfig, CameraConfig +from squid.exceptions import SquidTimeout class LightSource(ABC): @@ -122,17 +132,6 @@ def shut_down(self): pass -import abc -import time -from typing import Optional - -import pydantic - -import squid.logging -from squid.config import AxisConfig, StageConfig -from squid.exceptions import SquidTimeout - - class Pos(pydantic.BaseModel): x_mm: float y_mm: float @@ -225,3 +224,285 @@ def wait_for_idle(self, timeout_s): self._log.error(error_message) raise SquidTimeout(error_message) + + +class CameraAcquisitionMode(enum.Enum): + SOFTWARE_TRIGGER = "SOFTWARE_TRIGGER" + HARDWARE_TRIGGER = "HARDWARE_TRIGGER" + CONTINUOUS = "CONTINUOUS" + + +class CameraFrameFormat(enum.Enum): + """ + This is all known camera frame formats in the Cephla world, but not all cameras will + support all of these. + """ + + RAW = "RAW" + RGB = "RGB" + + +class AbstractCamera(metaclass=abc.ABCMeta): + def __init__(self, camera_config: CameraConfig): + """ + Init should open the camera, configure it as needed based on camera_config and reasonable + defaults, and make it immediately available for use in grabbing frames. + """ + self._config = camera_config + self._log = squid.logging.get_logger(self.__class__.__name__) + self._hw_trigger_fn = None + + # Frame callbacks is a list of (id, callback) managed by add_frame_callback and remove_frame_callback. + # Your frame receiving functions should call self._send_frame_to_callbacks(frame), and doesn't need + # to do more than that. + self._frame_callbacks = [] + + def add_frame_callback(self, frame_callback: Callable[[np.ndarray], None]) -> int: + """ + Adds a new callback that will be called with the receipt of every new frame. This callback + should not block for a long time because it will be called in the frame receiving hot path! + + Returns the callback ID that can be used to remove the callback later if needed. + """ + try: + next_id = max(t[0] for t in self._frame_callbacks) + 1 + except ValueError: + next_id = 1 + + self._frame_callbacks.append((next_id, frame_callback)) + + return next_id + + def remove_frame_callback(self, callback_id): + try: + idx_to_remove = [t[0] for t in self._frame_callbacks].index(callback_id) + self._log.debug(f"Removing callback with id={callback_id} at idx={idx_to_remove}.") + del self._frame_callbacks[idx_to_remove] + except ValueError: + self._log.warning(f"No callback with id={callback_id}, cannot remove it.") + + def _propogate_frame(self, frame): + """ + Implementations can call this to propogate a new frame to all registered callbacks. + """ + for cb in self._frame_callbacks: + cb(frame) + + @abc.abstractmethod + def set_exposure_time(self, exposure_time_ms: float): + pass + + @abc.abstractmethod + def get_exposure_time(self) -> float: + """ + Returns the current exposure time in milliseconds. + """ + pass + + @abc.abstractmethod + def set_frame_format(self, frame_format: CameraFrameFormat): + """ + If this camera supports the given frame format, set it and make sure that + all subsequent frames are in this format. + + If not, throw a ValueError. + """ + pass + + @abc.abstractmethod + def get_frame_format(self) -> CameraFrameFormat: + pass + + @abc.abstractmethod + def set_pixel_format(self, pixel_format: squid.config.CameraPixelFormat): + """ + If this camera supports the given pixel format, enable it and make sure that all + subsequent captures use this pixel format. + + If not, throw a ValueError. + """ + pass + + @abc.abstractmethod + def get_pixel_format(self) -> squid.config.CameraPixelFormat: + pass + + @abc.abstractmethod + def set_resolution(self, width: int, height: int): + """ + If the camera supports this width x height pixel format, set it and make sure + all subsequent frames are of this resolution. + + If not, throw a ValueError. + """ + pass + + @abc.abstractmethod + def get_resolution(self) -> Tuple[int, int]: + """ + Return the (width, height) resolution of captures made by the camera right now. + """ + pass + + @abc.abstractmethod + def set_analog_gain(self, analog_gain: float): + """ + Set analog gain as an input multiple. EG 1 = no gain, 100 = 100x gain. + """ + pass + + @abc.abstractmethod + def get_analog_gain(self) -> float: + """ + Returns gain in the same units as set_analog_gain. + """ + pass + + @abc.abstractmethod + def start_streaming(self): + """ + This starts camera frame streaming. Whether this results in frames immediately depends + on the current triggering mode. If frames require triggering, no frames will come until + triggers are sent. If the camera is in continuous mode, frames will start immediately. + """ + pass + + @abc.abstractmethod + def stop_streaming(self): + """ + Stops camera frame streaming, which means frames will only come in with a call go get_frame + """ + pass + + @abc.abstractmethod + def get_frame(self) -> np.ndarray: + """ + If needed, send a trigger to request a frame. Then block and wait until the next frame comes in, + and return it. + """ + pass + + @abc.abstractmethod + def get_frame_id(self) -> int: + """ + Returns the frame id of the current frame. This should increase by 1 with every frame received + from the camera + """ + pass + + @abc.abstractmethod + def get_white_balance_gains(self) -> Tuple[float, float, float]: + """ + Returns the (R, G, B) white balance gains + """ + pass + + @abc.abstractmethod + def set_white_balance_gains(self, red_gain: float, green_gain: float, blue_gain: float): + """ + Set the (R, G, B) white balance gains. + """ + pass + + @abc.abstractmethod + def set_auto_white_balance_gains(self) -> Tuple[float, float, float]: + """ + Runs auto white balance, then returns the resulting updated gains. + """ + pass + + @abc.abstractmethod + def set_black_level(self, black_level: float): + """ + Sets the black level of captured images. + """ + pass + + @abc.abstractmethod + def get_black_level(self) -> float: + """ + Gets the black level set on the camera. + """ + pass + + def set_acquisition_mode( + self, acquisition_mode: CameraAcquisitionMode, hw_trigger_fn: Optional[Callable[[None], None]] + ): + """ + Sets the acquisition mode. If you are specifying hardware trigger, and an external + system needs to send the trigger, you must specify a hw_trigger_fn. This function must be callable in such + a way that it immediately sends a hardware trigger, and only returns when the trigger has been sent. + + hw_trigger_fn must be valid for the duration of this camera's acquisition mode being set to HARDWARE + """ + if acquisition_mode is CameraAcquisitionMode.HARDWARE_TRIGGER: + if not hw_trigger_fn: + raise ValueError("Cannot set HARDWARE_TRIGGER camera acquisition mode without a hw_trigger_fn.") + self._hw_trigger_fn = hw_trigger_fn + else: + self._hw_trigger_fn = None + + return self._set_acquisition_mode_imp(acquisition_mode=acquisition_mode) + + @abc.abstractmethod + def _set_acquisition_mode_imp(self, acquisition_mode: CameraAcquisitionMode): + """ + Your subclass must implement this such that it switches the camera to this acquisition mode. The top level + set_acquisition_mode handles storing the self._hw_trigger_fn for you so you are guaranteed to have a valid + callable self._hw_trigger_fn if in hardware trigger mode. + """ + pass + + @abc.abstractmethod + def get_acquisition_mode(self) -> CameraAcquisitionMode: + """ + Returns the current acquisition mode. + """ + pass + + @abc.abstractmethod + def send_trigger(self): + """ + If in an acquisition mode that needs triggering, send a trigger. If in HARDWARE_TRIGGER mode, you are + guaranteed to have a self._hw_trigger_fn and should call that. If in CONTINUOUS mode, this can be + a no-op. + + When this returns, it does not mean it is safe to immediately send another trigger. + """ + + @abc.abstractmethod + def cancel_exposure(self): + """ + If in the middle of an exposure, cancel it. This can be a No-Op if the camera does not support this, + as long as it's valid to do arbitrary operations on the camera after this call. + """ + pass + + @abc.abstractmethod + def set_region_of_interest(self, offset_x: int, offset_y: int, width: int, height: int): + """ + Set the region of interest of the camera so that returned frames only contain this subset of the full sensor image. + """ + pass + + @abc.abstractmethod + def get_region_of_interest(self) -> Tuple[int, int, int, int]: + """ + Returns the region of interest as a tuple of (x corner, y corner, width, height) + """ + pass + + @abc.abstractmethod + def set_temperature(self, temperature_deg_c: Optional[float]): + """ + Set the desired temperature of the camera in degrees C. If None is given as input, use + a sane default for the camera. + """ + pass + + @abc.abstractmethod + def get_temperature(self) -> float: + """ + Get the current temperature of the camera in deg C. + """ + pass diff --git a/software/squid/camera/__init__.py b/software/squid/camera/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/software/squid/camera/utils.py b/software/squid/camera/utils.py new file mode 100644 index 00000000..e47521eb --- /dev/null +++ b/software/squid/camera/utils.py @@ -0,0 +1,182 @@ +import functools +from typing import Optional, Tuple + +import numpy as np + +from squid.config import CameraConfig, CameraPixelFormat +from squid.abc import AbstractCamera, CameraAcquisitionMode, CameraFrameFormat + + +def get_camera(config: CameraConfig, simulated: bool = False) -> AbstractCamera: + """ + Try to import, and then build, the requested camera. We import on a case-by-case basis + because some cameras require system level installations, and so in many cases camera + driver imports will fail. + """ + if simulated: + return SimulatedCamera(config) + + raise NotImplementedError(f"Camera of type={config.camera_type} not yet supported.") + + +class SimulatedCamera(AbstractCamera): + @staticmethod + def debug_log(method): + import inspect + + @functools.wraps(method) + def _logged_method(self, *args, **kwargs): + kwargs_pairs = tuple(f"{k}={v}" for (k, v) in kwargs.items()) + args_str = tuple(str(a) for a in args) + self._log.debug(f"{inspect.currentframe().f_code.co_name}({','.join(args_str + kwargs_pairs)})") + return method(self, *args, **kwargs) + + return _logged_method + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._frame_id = 1 + self._current_frame = None + + self._exposure_time = None + self._frame_format = CameraFrameFormat.RAW + self._pixel_format = None + self.set_pixel_format(self._config.default_pixel_format) + self._resolution = None + self.set_resolution(self._config.default_resolution[0], self._config.default_resolution[1]) + self._analog_gain = None + self._white_balance_gains = None + self._black_level = None + self._acquisition_mode = None + self._roi = (0, 0, self.get_resolution()[0], self.get_resolution()[1]) + self._temperature_setpoint = None + + @debug_log + def set_exposure_time(self, exposure_time_ms: float): + self._exposure_time = exposure_time_ms + + @debug_log + def get_exposure_time(self) -> float: + return self._exposure_time + + @debug_log + def set_frame_format(self, frame_format: CameraFrameFormat): + self._frame_format = frame_format + + @debug_log + def get_frame_format(self) -> CameraFrameFormat: + return self._frame_format + + @debug_log + def set_pixel_format(self, pixel_format: CameraPixelFormat): + self._pixel_format = pixel_format + + @debug_log + def get_pixel_format(self) -> CameraPixelFormat: + return self._pixel_format + + @debug_log + def set_resolution(self, width: int, height: int): + self._resolution = (width, height) + + @debug_log + def get_resolution(self) -> Tuple[int, int]: + return self._resolution + + @debug_log + def set_analog_gain(self, analog_gain: float): + self._analog_gain = analog_gain + + @debug_log + def get_analog_gain(self) -> float: + return self._analog_gain + + @debug_log + def start_streaming(self): + raise NotImplementedError("Streaming is not implemented on the sim camera yet") + + @debug_log + def stop_streaming(self): + pass + + @debug_log + def get_frame(self) -> np.ndarray: + self.send_trigger() + return self._current_frame + + @debug_log + def get_white_balance_gains(self) -> Tuple[float, float, float]: + return self._white_balance_gains + + @debug_log + def set_white_balance_gains(self, red_gain: float, green_gain: float, blue_gain: float): + self._white_balance_gains = (red_gain, green_gain, blue_gain) + + @debug_log + def set_auto_white_balance_gains(self) -> Tuple[float, float, float]: + self.set_white_balance_gains(1.0, 1.0, 1.0) + + return self.get_white_balance_gains() + + @debug_log + def set_black_level(self, black_level: float): + self._black_level = black_level + + @debug_log + def get_black_level(self) -> float: + return self._black_level + + @debug_log + def _set_acquisition_mode_imp(self, acquisition_mode: CameraAcquisitionMode): + self._acquisition_mode = acquisition_mode + + @debug_log + def get_acquisition_mode(self) -> CameraAcquisitionMode: + return self._acquisition_mode + + @debug_log + def send_trigger(self): + (height, width) = self.get_resolution() + if self.get_frame_id() == 1: + if self.get_pixel_format() == CameraPixelFormat.MONO8: + self._current_frame = np.random.randint(255, size=(height, width), dtype=np.uint8) + self._current_frame[height // 2 - 99 : height // 2 + 100, width // 2 - 99 : width // 2 + 100] = 200 + elif self.get_pixel_format() == CameraPixelFormat.MONO12: + self._current_frame = np.random.randint(4095, size=(height, width), dtype=np.uint16) + self._current_frame[height // 2 - 99 : height // 2 + 100, width // 2 - 99 : width // 2 + 100] = 200 * 16 + self._current_frame = self._current_frame << 4 + elif self.get_pixel_format() == CameraPixelFormat.MONO16: + self._current_frame = np.random.randint(65535, size=(height, width), dtype=np.uint16) + self._current_frame[height // 2 - 99 : height // 2 + 100, width // 2 - 99 : width // 2 + 100] = ( + 200 * 256 + ) + else: + raise NotImplementedError(f"Simulated camera does not support pixel_format={self.get_pixel_format()}") + else: + self._current_frame = np.roll(self._current_frame, 10, axis=0) + + self._frame_id += 1 + self._propogate_frame(self._current_frame) + + @debug_log + def cancel_exposure(self): + pass + + @debug_log + def set_region_of_interest(self, offset_x: int, offset_y: int, width: int, height: int): + self._roi = (offset_x, offset_y, width, height) + + @debug_log + def get_region_of_interest(self) -> Tuple[int, int, int, int]: + return self._roi + + @debug_log + def set_temperature(self, temperature_deg_c: Optional[float]): + self._temperature_setpoint = temperature_deg_c + + @debug_log + def get_temperature(self) -> float: + return self._temperature_setpoint + + def get_frame_id(self) -> int: + return self._frame_id diff --git a/software/squid/config.py b/software/squid/config.py index b4c58068..484ae738 100644 --- a/software/squid/config.py +++ b/software/squid/config.py @@ -1,6 +1,6 @@ import enum import math -from typing import Optional +from typing import Optional, Tuple import pydantic @@ -139,10 +139,74 @@ class StageConfig(pydantic.BaseModel): ), ) -""" -Returns the StageConfig that existed at process startup. -""" - -def get_stage_config(): +def get_stage_config() -> StageConfig: + """ + Returns the StageConfig that existed at process startup. + """ return _stage_config + + +class CameraType(enum.Enum): + TOUPCAM = "TOUPCAM" + FLIR = "FLIR" + HAMAMATSU = "HAMAMATSU" + IDS = "IDS" + + +def _old_camera_type_to_enum(old_string) -> CameraType: + if old_string == "Toupcam": + return CameraType.TOUPCAM + elif old_string == "FLIR": + return CameraType.FLIR + elif old_string == "Hamamatsu": + return CameraType.HAMAMATSU + elif old_string == "iDS": + return CameraType.IDS + raise ValueError(f"Unknown old camera type {old_string=}") + + +class CameraPixelFormat(enum.Enum): + """ + This is all known Pixel Formats in the Cephla world, but not all cameras will support + all of these. + """ + + MONO8 = "MONO8" + MONO12 = "MONO12" + MONO14 = "MONO14" + MONO16 = "MONO16" + RGB24 = "RGB24" + RGB32 = "RGB32" + RGB48 = "RGB48" + + +class CameraConfig(pydantic.BaseModel): + """ + Most camera parameters are runtime configurable, so CameraConfig is more about defining what + camera must be available and used for a particular function in the system. + + If we want to capture the settings a camera used for a particular capture, another model called + CameraState, or something, might be more appropriate. + """ + + # NOTE(imo): Not "type" because that's a python builtin and can cause confusion + camera_type: CameraType + + default_resolution: Tuple[int, int] + + default_pixel_format: CameraPixelFormat + + +_camera_config = CameraConfig( + camera_type=_old_camera_type_to_enum(_def.CAMERA_TYPE), + default_resolution=(_def.Acquisition.CROP_WIDTH, _def.Acquisition.CROP_HEIGHT), + default_pixel_format=(_def.DEFAULT_PIXEL_FORMAT), +) + + +def get_camera_config() -> CameraConfig: + """ + Returns the CameraConfig that existed at process startup. + """ + return _camera_config diff --git a/software/tests/squid/test_camera.py b/software/tests/squid/test_camera.py new file mode 100644 index 00000000..ecfa13dd --- /dev/null +++ b/software/tests/squid/test_camera.py @@ -0,0 +1,23 @@ +import squid.camera.utils +import squid.config + + +def test_create_simulated_camera(): + sim_cam = squid.camera.utils.get_camera(squid.config.get_camera_config(), simulated=True) + + +def test_simulated_camera(): + sim_cam = squid.camera.utils.get_camera(squid.config.get_camera_config(), simulated=True) + + # Really basic tests to make sure the simulated camera does what is expected. + assert sim_cam.get_frame() is not None + frame_id = sim_cam.get_frame_id() + assert sim_cam.get_frame() is not None + assert sim_cam.get_frame_id() != frame_id + + frame = sim_cam.get_frame() + (frame_width, frame_height, *_) = frame.shape + (res_width, res_height) = sim_cam.get_resolution() + + assert frame_width == res_width + assert frame_height == res_height