Skip to content

Commit

Permalink
Merge pull request #54 from Cephla-Lab/merge-celesta
Browse files Browse the repository at this point in the history
interface: start pulling out illumination control
  • Loading branch information
hongquanli authored Jan 2, 2025
2 parents e0a5bbe + fd5a0f9 commit 9be7749
Show file tree
Hide file tree
Showing 7 changed files with 586 additions and 59 deletions.
1 change: 1 addition & 0 deletions software/control/_def.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ class SOFTWARE_POS_LIMIT:
USE_LDI_SERIAL_CONTROL = False
LDI_INTENSITY_MODE = 'PC'
LDI_SHUTTER_MODE = 'PC'
USE_CELESTA_ETHENET_CONTROL = True

XLIGHT_EMISSION_FILTER_MAPPING = {405:1,470:2,555:3,640:4,730:5}
XLIGHT_SERIAL_NUMBER = "B00031BE"
Expand Down
206 changes: 206 additions & 0 deletions software/control/celesta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
"""
Generic Lumencor laser control via HTTP (ethernet connection).
Bogdan 3/19
revised HL 2/2024
"""
import urllib.request
import traceback
from squid.abc import LightSource
from control.microscope import LightSourceType, IntensityControlMode, ShutterControlMode

def lumencor_httpcommand(command = 'GET IP',ip = '192.168.201.200'):
"""
Sends commands to the lumencor system via http.
Plese find commands here:
http://lumencor.com/wp-content/uploads/sites/11/2019/01/57-10018.pdf
"""
command_full = r'http://'+ip+'/service/?command='+command.replace(' ','%20')
with urllib.request.urlopen(command_full) as response:
message = eval(response.read()) # the default is conveniently JSON so eval creates dictionary
return message

class CELESTA(LightSource):
"""
This controls a lumencor object (default: Celesta) using HTTP.
Please connect the provided cat5e, RJ45 ethernet cable between the PC and Lumencor system.
"""
def __init__(self, **kwds):
"""
Connect to the Lumencor system via HTTP and check if you get the right response.
"""
self.on = False
self.ip = kwds.get('ip', '192.168.201.200')
[self.pmin, self.pmax] = 0,1000
try:
# See if the system returns back the right IP.
self.message = self.get_IP()
assert (self.message['message'] == 'A IP '+self.ip)
self.n_lasers = self.get_number_lasers()
self.live = True
except:
print(traceback.format_exc())
self.live = False
print("Failed to connect to Lumencor Laser at ip:", ip)

if self.live:
[self.pmin, self.pmax] = self.get_intensity_range()
self.set_shutter_control_mode(True)
for i in range(self.n_lasers):
if (not self.get_shutter_state(i)):
self.set_shutter_state(i,False)

self.channel_mappings = {
405: 0,
470: 2,
488: 2,
545: 4,
550: 4,
555: 4,
561: 4,
638: 5,
640: 5,
730: 6,
735: 6,
750: 6
}

def initialize(self):
pass

def set_intensity_control_mode(self, mode):
pass

def get_intensity_control_mode(self):
pass

def get_number_lasers(self):
"""Return the number of lasers the current lumencor system can control"""
self.message = lumencor_httpcommand(command ='GET CHMAP', ip=self.ip)
if self.message['message'][0]=='A':
return len(self.message['message'].split(' '))-2
return 0

def get_color(self,laser_id):
"""Returns the color of the current laser"""
self.message = lumencor_httpcommand(command ='GET CHMAP', ip=self.ip)
colors = self.message['message'].split(' ')[2:]
print(colors)
return colors[int(laser_id)]

def get_IP(self):
self.message = lumencor_httpcommand(command = 'GET IP', ip=self.ip)
return self.message

def get_shutter_control_mode(self):
"""
Return True/False the lasers can be controlled with TTL.
"""
self.message = lumencor_httpcommand(command = 'GET TTLENABLE', ip=self.ip)
response = self.message['message']
if response[-1]=='1':
return ShutterControlMode.TTL
else:
return ShutterControlMode.Software

def set_shutter_control_mode(self, mode):
"""
Turn on/off external TTL control mode.
"""
if mode == ShutterControlMode.TTL:
ttl_enable = '1'
else:
ttl_enable = '0'
self.message = lumencor_httpcommand(command = 'SET TTLENABLE '+ttl_enable,ip=self.ip)

def get_shutter_state(self,laser_id):
"""
Return True/False the laser is on/off.
"""
self.message = lumencor_httpcommand(command = 'GET CH '+str(laser_id), ip=self.ip)
response = self.message['message']
self.on = response[-1]=='1'
return self.on

def get_intensity_range(self):
"""
Return [minimum power, maximum power].
"""
max_int =1000 # default
self.message = lumencor_httpcommand(command = 'GET MAXINT', ip=self.ip)
if self.message['message'][0]=='A':
max_int = float(self.message['message'].split(' ')[-1])
return [0, max_int]

def get_intensity(self,laser_id):
"""
Return the current laser power.
"""
self.message = lumencor_httpcommand(command = 'GET CHINT '+str(laser_id), ip=self.ip)
# print(command = 'GET CHINT '+str(laser_id), ip=self.ip)
response = self.message['message']
power = float(response.split(' ')[-1])
return power

def set_shutter_state(self, laser_id, on):
"""
Turn the laser on/off.
"""
if on:
self.message = lumencor_httpcommand(command = 'SET CH '+str(laser_id)+' 1', ip=self.ip)
self.on = True
else:
self.message = lumencor_httpcommand(command = 'SET CH '+str(laser_id)+' 0', ip=self.ip)
self.on = False
print("Turning On/Off", self.on, self.message)

def set_intensity(self, laser_id, power_in_mw):
"""
power_in_mw - The desired laser power in mW.
"""
print("Setting Power", power_in_mw, self.message)
if power_in_mw > self.pmax:
power_in_mw = self.pmax
self.message = lumencor_httpcommand(command ='SET CHINT '+str(laser_id)+' '+ str(int(power_in_mw)), ip=self.ip)
if self.message['message'][0]=='A':
return True
return False

def shut_down(self):
"""
Turn the laser off.
"""
if self.live:
for i in range(self.n_lasers):
self.set_intensity(i,0)
self.set_shutter_state(i,False)

def get_status(self):
"""
Get the status
"""
return self.live

#
# The MIT License
#
# Copyright (c) 2013 Zhuang Lab, Harvard University
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
26 changes: 8 additions & 18 deletions software/control/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import imageio as iio
import squid.abc


class ObjectiveStore:
def __init__(self, objectives_dict=OBJECTIVES, default_objective=DEFAULT_OBJECTIVE, parent=None):
self.objectives_dict = objectives_dict
Expand Down Expand Up @@ -430,7 +429,7 @@ def __init__(self,mode_id=None,name=None,color=None,camera_sn=None,exposure_time


class LiveController(QObject):
def __init__(self,camera,microcontroller,configurationManager,parent=None,control_illumination=True,use_internal_timer_for_hardware_trigger=True,for_displacement_measurement=False):
def __init__(self,camera,microcontroller,configurationManager,illuminationController,parent=None,control_illumination=True,use_internal_timer_for_hardware_trigger=True,for_displacement_measurement=False):
QObject.__init__(self)
self.microscope = parent
self.camera = camera
Expand All @@ -441,6 +440,7 @@ def __init__(self,camera,microcontroller,configurationManager,parent=None,contro
self.is_live = False
self.control_illumination = control_illumination
self.illumination_on = False
self.illuminationController = illuminationController
self.use_internal_timer_for_hardware_trigger = use_internal_timer_for_hardware_trigger # use QTimer vs timer in the MCU
self.for_displacement_measurement = for_displacement_measurement

Expand Down Expand Up @@ -471,17 +471,17 @@ def __init__(self,camera,microcontroller,configurationManager,parent=None,contro

# illumination control
def turn_on_illumination(self):
if USE_LDI_SERIAL_CONTROL and 'Fluorescence' in self.currentConfiguration.name and LDI_SHUTTER_MODE == 'PC':
self.ldi.set_active_channel_shutter(1)
if self.illuminationController is not None and not 'LED matrix' in self.currentConfiguration.name:
self.illuminationController.turn_on_illumination(int(self.configurationManager.extract_wavelength(self.currentConfiguration.name)))
elif SUPPORT_SCIMICROSCOPY_LED_ARRAY and 'LED matrix' in self.currentConfiguration.name:
self.led_array.turn_on_illumination()
else:
self.microcontroller.turn_on_illumination()
self.illumination_on = True

def turn_off_illumination(self):
if USE_LDI_SERIAL_CONTROL and 'Fluorescence' in self.currentConfiguration.name and LDI_SHUTTER_MODE == 'PC':
self.ldi.set_active_channel_shutter(0)
if self.illuminationController is not None and not 'LED matrix' in self.currentConfiguration.name:
self.illuminationController.turn_off_illumination(int(self.configurationManager.extract_wavelength(self.currentConfiguration.name)))
elif SUPPORT_SCIMICROSCOPY_LED_ARRAY and 'LED matrix' in self.currentConfiguration.name:
self.led_array.turn_off_illumination()
else:
Expand Down Expand Up @@ -526,18 +526,8 @@ def set_illumination(self,illumination_source,intensity,update_channel_settings=
self.microcontroller.set_illumination_led_matrix(illumination_source,r=(intensity/100)*LED_MATRIX_R_FACTOR,g=(intensity/100)*LED_MATRIX_G_FACTOR,b=(intensity/100)*LED_MATRIX_B_FACTOR)
else:
# update illumination
if USE_LDI_SERIAL_CONTROL and 'Fluorescence' in self.currentConfiguration.name:
if LDI_SHUTTER_MODE == 'PC':
# set LDI active channel
print('set active channel to ' + str(illumination_source))
self.ldi.set_active_channel(int(illumination_source))
if LDI_INTENSITY_MODE == 'PC':
if update_channel_settings:
# set intensity for active channel
print('set intensity')
self.ldi.set_intensity(int(illumination_source),intensity)
if LDI_SHUTTER_MODE == "EXT" or LDI_INTENSITY_MODE == "EXT":
self.microcontroller.set_illumination(illumination_source,intensity)
if self.illuminationController is not None:
self.illuminationController.set_intensity(int(self.configurationManager.extract_wavelength(self.currentConfiguration.name)), intensity)
elif ENABLE_NL5 and NL5_USE_DOUT and 'Fluorescence' in self.currentConfiguration.name:
wavelength = int(self.currentConfiguration.name[13:16])
self.microscope.nl5.set_active_channel(NL5_WAVENLENGTH_MAP[wavelength])
Expand Down
38 changes: 24 additions & 14 deletions software/control/gui_hcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import squid.stage.cephla
import squid.stage.utils
import control.microscope
from control.microscope import LightSourceType, IntensityControlMode, ShutterControlMode

log = squid.logging.get_logger(__name__)

Expand Down Expand Up @@ -175,6 +176,7 @@ def __init__(self, is_simulation=False, live_only_mode=False, *args, **kwargs):
settings_menu.addAction(led_matrix_action)

def loadObjects(self, is_simulation):
self.illuminationController = None
if is_simulation:
self.loadSimulationObjects()
else:
Expand All @@ -189,11 +191,12 @@ def loadObjects(self, is_simulation):
self.configurationManager = core.ConfigurationManager(filename='./channel_configurations.xml')
self.contrastManager = core.ContrastManager()
self.streamHandler = core.StreamHandler(display_resolution_scaling=DEFAULT_DISPLAY_CROP/100)
self.liveController = core.LiveController(self.camera, self.microcontroller, self.configurationManager, parent=self)

self.liveController = core.LiveController(self.camera, self.microcontroller, self.configurationManager, self.illuminationController, parent=self)
self.stage: squid.abc.AbstractStage = squid.stage.cephla.CephlaStage(microcontroller = self.microcontroller, stage_config = squid.config.get_stage_config())
self.slidePositionController = core.SlidePositionController(self.stage, self.liveController, is_for_wellplate=True)
self.autofocusController = core.AutoFocusController(self.camera, self.stage, self.liveController, self.microcontroller)
self.slidePositionController = core.SlidePositionController(self.stage, self.liveController, is_for_wellplate=True)
self.autofocusController = core.AutoFocusController(self.camera, self.stage, self.liveController, self.microcontroller)
self.imageSaver = core.ImageSaver()
self.imageDisplay = core.ImageDisplay()
if ENABLE_TRACKING:
Expand All @@ -219,6 +222,7 @@ def loadObjects(self, is_simulation):

def loadSimulationObjects(self):
self.log.debug("Loading simulated hardware objects...")
self.microcontroller = microcontroller.Microcontroller(existing_serial=microcontroller.SimSerial())
# Initialize simulation objects
if ENABLE_SPINNING_DISK_CONFOCAL:
self.xlight = serial_peripherals.XLight_Simulation()
Expand All @@ -231,19 +235,24 @@ def loadSimulationObjects(self):
self.camera_focus = camera_fc.Camera_Simulation()
if USE_LDI_SERIAL_CONTROL:
self.ldi = serial_peripherals.LDI_Simulation()
self.illuminationController = control.microscope.IlluminationController(self.microcontroller, self.ldi.intensity_mode, self.ldi.shutter_mode, LightSourceType.LDI, self.ldi)
self.camera = camera.Camera_Simulation(rotate_image_angle=ROTATE_IMAGE_ANGLE, flip_image=FLIP_IMAGE)
self.camera.set_pixel_format(DEFAULT_PIXEL_FORMAT)
if USE_ZABER_EMISSION_FILTER_WHEEL:
self.emission_filter_wheel = serial_peripherals.FilterController_Simulation(115200, 8, serial.PARITY_NONE, serial.STOPBITS_ONE)
if USE_OPTOSPIN_EMISSION_FILTER_WHEEL:
self.emission_filter_wheel = serial_peripherals.Optospin_Simulation(SN=None)

self.microcontroller = microcontroller.Microcontroller(existing_serial=microcontroller.SimSerial())
self.emission_filter_wheel = serial_peripherals.Optospin_Simulation(SN=None)
if USE_SQUID_FILTERWHEEL:
self.squid_filter_wheel = filterwheel.SquidFilterWheelWrapper_Simulation(None)

def loadHardwareObjects(self):
# Initialize hardware objects
try:
self.microcontroller = microcontroller.Microcontroller(version=CONTROLLER_VERSION, sn=CONTROLLER_SN)
except Exception:
self.log.error(f"Error initializing Microcontroller")
raise

if ENABLE_SPINNING_DISK_CONFOCAL:
try:
self.xlight = serial_peripherals.XLight(XLIGHT_SERIAL_NUMBER, XLIGHT_SLEEP_TIME_FOR_WHEEL)
Expand Down Expand Up @@ -272,13 +281,20 @@ def loadHardwareObjects(self):
if USE_LDI_SERIAL_CONTROL:
try:
self.ldi = serial_peripherals.LDI()
self.ldi.run()
self.ldi.set_intensity_mode(LDI_INTENSITY_MODE)
self.ldi.set_shutter_mode(LDI_SHUTTER_MODE)
self.illuminationController = control.microscope.IlluminationController(self.microcontroller, self.ldi.intensity_mode, self.ldi.shutter_mode, LightSourceType.LDI, self.ldi)
except Exception:
self.log.error("Error initializing LDI")
raise

if USE_CELESTA_ETHENET_CONTROL:
try:
import control.celesta
self.celesta = control.celesta.CELESTA()
self.illuminationController = control.microscope.IlluminationController(self.microcontroller, IntensityControlMode.Software, ShutterControlMode.TTL, LightSourceType.CELESTA, self.celesta)
except Exception:
self.log.error("Error initializing CELESTA")
raise

if SUPPORT_LASER_AUTOFOCUS:
try:
sn_camera_focus = camera_fc.get_sn_by_model(FOCUS_CAMERA_MODEL)
Expand Down Expand Up @@ -319,12 +335,6 @@ def loadHardwareObjects(self):
self.log.error("Error initializing Prior Stage")
raise

try:
self.microcontroller = microcontroller.Microcontroller(version=CONTROLLER_VERSION, sn=CONTROLLER_SN)
except Exception:
self.log.error(f"Error initializing Microcontroller")
raise

def setupHardware(self):
# Setup hardware components
if USE_ZABER_EMISSION_FILTER_WHEEL:
Expand Down
Loading

0 comments on commit 9be7749

Please sign in to comment.