Skip to content

Commit

Permalink
new stage integration complete
Browse files Browse the repository at this point in the history
  • Loading branch information
soham1202 committed Dec 31, 2024
1 parent e0a5bbe commit aa0bdef
Show file tree
Hide file tree
Showing 3 changed files with 562 additions and 38 deletions.
198 changes: 187 additions & 11 deletions software/control/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2061,6 +2061,9 @@ def set_segmentation_flag(self, flag):
def set_fluorescence_rtp_flag(self, flag):
self.do_fluorescence_rtp = flag

def set_focus_surface(self, surfaceFitter):
self.focus_surface = surfaceFitter

def set_crop(self,crop_width, crop_height):
self.crop_width = crop_width
self.crop_height = crop_height
Expand Down Expand Up @@ -2174,8 +2177,19 @@ def run_acquisition(self):
# run the acquisition
self.timestamp_acquisition_started = time.time()

# create a QThread object
if self.gen_focus_map and not self.do_reflection_af:
if self.focus_surface:
print("Using focus surface for Z interpolation")
for region_id in self.scan_region_names:
region_fov_coords = self.scan_region_fov_coords_mm[region_id]
# Convert each tuple to list for modification
for i, coords in enumerate(region_fov_coords):
x, y = coords[:2] # This handles both (x,y) and (x,y,z) formats
z = self.focus_surface.interpolate(x, y)
# Modify the list directly
region_fov_coords[i] = (x, y, z)
self.scanCoordinates.update_fov_z_level(region_id, i, z)

elif self.gen_focus_map and not self.do_reflection_af:
print("Generating focus map for multipoint grid")
starting_x_mm = self.stage.get_pos().x_mm
starting_y_mm = self.stage.get_pos().y_mm
Expand Down Expand Up @@ -2207,6 +2221,7 @@ def run_acquisition(self):
print("Invalid coordinates for focus map, aborting.")
return

# create a QThread object
self.thread = QThread()
# create a worker object
if DO_FLUORESCENCE_RTP:
Expand Down Expand Up @@ -2946,16 +2961,20 @@ def load_background_image(self, image_path):
def create_layers(self):
self.scan_overlay = np.zeros((self.image_height, self.image_width, 4), dtype=np.uint8)
self.fov_overlay = np.zeros((self.image_height, self.image_width, 4), dtype=np.uint8)
self.focus_point_overlay = np.zeros((self.image_height, self.image_width, 4), dtype=np.uint8)

self.scan_overlay_item = pg.ImageItem()
self.fov_overlay_item = pg.ImageItem()
self.focus_point_overlay_item = pg.ImageItem()

self.view.addItem(self.scan_overlay_item)
self.view.addItem(self.fov_overlay_item)
self.view.addItem(self.focus_point_overlay_item)

self.background_item.setZValue(-1) # Background layer at the bottom
self.scan_overlay_item.setZValue(0) # Scan overlay in the middle
self.fov_overlay_item.setZValue(1) # FOV overlay on top
self.fov_overlay_item.setZValue(1) # FOV overlay next
self.focus_point_overlay_item.setZValue(2) # # Focus points on top

def update_display_properties(self, sample):
if sample == 'glass slide':
Expand Down Expand Up @@ -3080,6 +3099,23 @@ def deregister_fov_to_image(self, x_mm, y_mm):
cv2.rectangle(self.scan_overlay, current_FOV_top_left, current_FOV_bottom_right, (0, 0, 0, 0), self.box_line_thickness)
self.scan_overlay_item.setImage(self.scan_overlay)

def register_focus_point(self, x_mm, y_mm):
"""Draw focus point marker as filled circle centered on the FOV"""
color = (0, 255, 0, 255) # Green RGBA
# Get FOV corner coordinates, then calculate FOV center pixel coordinates
current_FOV_top_left, current_FOV_bottom_right = self.get_FOV_pixel_coordinates(x_mm, y_mm)
center_x = (current_FOV_top_left[0] + current_FOV_bottom_right[0]) // 2
center_y = (current_FOV_top_left[1] + current_FOV_bottom_right[1]) // 2
# Draw a filled circle at the center
radius = 5 # Radius of circle in pixels
cv2.circle(self.focus_point_overlay, (center_x, center_y), radius, color, -1) # -1 thickness means filled
self.focus_point_overlay_item.setImage(self.focus_point_overlay)

def clear_focus_points(self):
"""Clear just the focus point overlay"""
self.focus_point_overlay = np.zeros((self.image_height, self.image_width, 4), dtype=np.uint8)
self.focus_point_overlay_item.setImage(self.focus_point_overlay)

def clear_slide(self):
self.background_image = self.background_image_copy.copy()
self.background_item.setImage(self.background_image)
Expand All @@ -3088,6 +3124,8 @@ def clear_slide(self):
def clear_overlay(self):
self.scan_overlay.fill(0)
self.scan_overlay_item.setImage(self.scan_overlay)
self.focus_point_overlay.fill(0)
self.focus_point_overlay_item.setImage(self.focus_point_overlay)

def handle_mouse_click(self, evt):
if not evt.double():
Expand Down Expand Up @@ -3296,8 +3334,12 @@ def scale_contrast_limits(self, target_dtype):
self.acquisition_dtype = target_dtype


class ScanCoordinates:
class ScanCoordinates(QObject):

signal_scan_coordinates_updated = Signal()

def __init__(self, objectiveStore, navigationViewer, stage: AbstractStage):
QObject.__init__(self)
# Wellplate settings
self.objectiveStore = objectiveStore
self.navigationViewer = navigationViewer
Expand Down Expand Up @@ -3349,11 +3391,11 @@ def get_selected_wells(self):
return None

selected_wells = np.array(self.well_selector.get_selected_cells())
region_centers = {}
well_centers = {}

# if no well selected
if len(selected_wells) == 0:
return region_centers
return well_centers
# populate the coordinates
rows = np.unique(selected_wells[:,0])
_increasing = True
Expand All @@ -3367,9 +3409,9 @@ def get_selected_wells(self):
x_mm = self.a1_x_mm + (column * self.well_spacing_mm) + self.wellplate_offset_x_mm
y_mm = self.a1_y_mm + (row * self.well_spacing_mm) + self.wellplate_offset_y_mm
well_id = self._index_to_row(row) + str(column+1)
region_centers[well_id] = (x_mm,y_mm)
well_centers[well_id] = (x_mm,y_mm)
_increasing = not _increasing
return region_centers
return well_centers

def set_live_scan_coordinates(self, x_mm, y_mm, scan_size_mm, overlap_percent, shape):
if shape != 'Manual' and self.format == 'glass slide':
Expand All @@ -3394,16 +3436,14 @@ def set_well_coordinates(self, scan_size_mm, overlap_percent, shape):
for well_id, (x, y) in new_region_centers.items():
if well_id not in self.region_centers:
self.add_region(well_id, x, y, scan_size_mm, overlap_percent, shape)
print(f"Updated region coordinates: {len(self.region_centers)} wells")

else:
print("Clear well coordinates")
self.clear_regions()

def set_manual_coordinates(self, manual_shapes, overlap_percent):
self.clear_regions()
if manual_shapes is not None:
# Handle manual ROIs
manual_region_added = False
for i, shape_coords in enumerate(manual_shapes):
scan_coordinates = self.add_manual_region(shape_coords, overlap_percent)
if scan_coordinates:
Expand All @@ -3414,6 +3454,10 @@ def set_manual_coordinates(self, manual_shapes, overlap_percent):
center = np.mean(shape_coords, axis=0)
self.region_centers[region_name] = [center[0], center[1]]
self.region_fov_coordinates[region_name] = scan_coordinates
manual_region_added = True
print(f"Added Manual Region: {region_name}")
if manual_region_added:
self.signal_scan_coordinates_updated.emit()
else:
print("No Manual ROI found")

Expand Down Expand Up @@ -3467,6 +3511,8 @@ def add_region(self, well_id, center_x, center_y, scan_size_mm, overlap_percent=

self.region_centers[well_id] = [float(center_x), float(center_y), float(self.stage.get_pos().z_mm)]
self.region_fov_coordinates[well_id] = scan_coordinates
self.signal_scan_coordinates_updated.emit()
print(f"Added Region: {well_id}")

def remove_region(self, well_id):
if well_id in self.region_centers:
Expand All @@ -3478,11 +3524,13 @@ def remove_region(self, well_id):
self.navigationViewer.deregister_fov_to_image(coord[0], coord[1])

print(f"Removed Region: {well_id}")
self.signal_scan_coordinates_updated.emit()

def clear_regions(self):
self.region_centers.clear()
self.region_fov_coordinates.clear()
self.navigationViewer.clear_overlay()
self.signal_scan_coordinates_updated.emit()
print("Cleared All Regions")

def add_flexible_region(self, region_id, center_x, center_y, center_z, Nx, Ny, overlap_percent=10):
Expand Down Expand Up @@ -3513,6 +3561,7 @@ def add_flexible_region(self, region_id, center_x, center_y, center_z, Nx, Ny, o
print(f"Added Flexible Region: {region_id}")
self.region_centers[region_id] = [center_x, center_y, center_z]
self.region_fov_coordinates[region_id] = scan_coordinates
self.signal_scan_coordinates_updated.emit()
else:
print(f"Region Out of Bounds: {region_id}")

Expand All @@ -3538,8 +3587,10 @@ def add_flexible_region_with_step_size(self, region_id, center_x, center_y, cent
scan_coordinates.extend(row)

if scan_coordinates: # Only add region if there are valid coordinates
print(f"Added Flexible Region: {region_id}")
self.region_centers[region_id] = [center_x, center_y, center_z]
self.region_fov_coordinates[region_id] = scan_coordinates
self.signal_scan_coordinates_updated.emit()
else:
print(f"Region Out of Bounds: {region_id}")

Expand Down Expand Up @@ -3710,6 +3761,131 @@ def update_fov_z_level(self, region_id, fov, new_z):
print(f"Updated z-level to {new_z} for region:{region_id}, fov:{fov}")


from scipy.interpolate import SmoothBivariateSpline, RBFInterpolator
class SurfaceFitter:
"""Handles fitting and interpolation of slide surfaces through measured focus points"""

def __init__(self, smoothing_factor=0.1):
self.smoothing_factor = smoothing_factor
self.surface_fit = None
self.method = 'spline' # can be 'spline' or 'rbf'
self.is_fitted = False
self.points = None

def set_method(self, method):
"""Set interpolation method
Args:
method (str): Either 'spline' or 'rbf' (Radial Basis Function)
"""
if method not in ['spline', 'rbf']:
raise ValueError("Method must be either 'spline' or 'rbf'")
self.method = method
self.is_fitted = False

def fit(self, points):
"""Fit surface through provided focus points
Args:
points (list): List of (x,y,z) tuples
Returns:
tuple: (mean_error, std_error) in mm
"""
if len(points) < 4:
raise ValueError("Need at least 4 points to fit surface")

self.points = np.array(points)
x = self.points[:,0]
y = self.points[:,1]
z = self.points[:,2]

if self.method == 'spline':
try:
self.surface_fit = SmoothBivariateSpline(
x, y, z,
kx=3, # cubic spline in x
ky=3, # cubic spline in y
s=self.smoothing_factor
)
except Exception as e:
print(f"Spline fitting failed: {str(e)}, falling back to RBF")
self.method = 'rbf'
self._fit_rbf(x, y, z)
else:
self._fit_rbf(x, y, z)

self.is_fitted = True
errors = self._calculate_fitting_errors()
return np.mean(errors), np.std(errors)

def _fit_rbf(self, x, y, z):
"""Fit using Radial Basis Function interpolation"""
xy = np.column_stack((x, y))
self.surface_fit = RBFInterpolator(
xy, z,
kernel='thin_plate_spline',
epsilon=self.smoothing_factor
)

def interpolate(self, x, y):
"""Get interpolated Z value at given (x,y) coordinates
Args:
x (float or array): X coordinate(s)
y (float or array): Y coordinate(s)
Returns:
float or array: Interpolated Z value(s)
"""
if not self.is_fitted:
raise RuntimeError("Must fit surface before interpolating")

if np.isscalar(x) and np.isscalar(y):
if self.method == 'spline':
return float(self.surface_fit.ev(x, y))
else:
return float(self.surface_fit([[x, y]]))
else:
x = np.asarray(x)
y = np.asarray(y)
if self.method == 'spline':
return self.surface_fit.ev(x, y)
else:
xy = np.column_stack((x.ravel(), y.ravel()))
z = self.surface_fit(xy)
return z.reshape(x.shape)

def _calculate_fitting_errors(self):
"""Calculate absolute errors at measured points"""
errors = []
for x, y, z_measured in self.points:
z_fit = self.interpolate(x, y)
errors.append(abs(z_fit - z_measured))
return np.array(errors)

def get_surface_grid(self, x_range, y_range, num_points=50):
"""Generate grid of interpolated Z values for visualization
Args:
x_range (tuple): (min_x, max_x)
y_range (tuple): (min_y, max_y)
num_points (int): Number of points per dimension
Returns:
tuple: (X grid, Y grid, Z grid)
"""
if not self.is_fitted:
raise RuntimeError("Must fit surface before generating grid")

x = np.linspace(x_range[0], x_range[1], num_points)
y = np.linspace(y_range[0], y_range[1], num_points)
X, Y = np.meshgrid(x, y)
Z = self.interpolate(X, Y)

return X, Y, Z


class LaserAutofocusController(QObject):

image_to_display = Signal(np.ndarray)
Expand Down
Loading

0 comments on commit aa0bdef

Please sign in to comment.