Source code for prismapi.prism

from typing import Tuple, Dict, Any, Optional
from json import load
from os.path import isfile
from multiprocessing.shared_memory import SharedMemory
import weakref
from ctypes import c_int, c_double
import numpy as np
from time import time, sleep
from os import name as os_name
from getpass import getuser
from prismapi.core import Measurement, Parameter


[docs] class PrismError(Exception): """Base exception class for Prism API errors.""" pass
[docs] class PrismConnectionError(PrismError): """Raised when there are issues connecting to the Prism hardware.""" pass
[docs] class PrismParameterError(PrismError): """Raised when there are issues with parameter operations.""" pass
[docs] class PrismTimeoutError(PrismError): """Raised when a timeout occurs.""" pass
[docs] class Prism: """Object for controlling and collecting data from Prism hardware. This class provides a Python interface to the Prism hardware system, allowing control of various parameters and collection of measurement data. The hardware controller is communicated with through a shared memory. The shared memory layout is defined by a JSON file that specifies the memory map configuration including data source locations and sizes. By default this file is located in a system-specific temp directory, but can be overridden by passing an explicit path. This location should not need to be modified. Attributes: memory_map (Dict): The loaded memory map configuration measurements (Dict): Dictionary of measurement data structures dofs (Dict): Dictionary of degree of freedom (DOF) data structures """
[docs] def __init__( self, memory_map_fpath: Optional[str] = None, timeout: Optional[float] = None, ): """Initialize the Prism hardware interface. Args: memory_map_fpath: Path to the memory map JSON file. If None, defaults to a system-specific temp directory location. Should not need to be modified under normal circumstances. timeout: Default timeout in seconds for blocking operations. If None, operations will block indefinitely. Raises: PrismConnectionError: If the shared memory cannot be opened or the memory map file is invalid. """ self.timeout = timeout self._start_time = None if memory_map_fpath is None: user = getuser() memory_map_fpath = ( f"C:\\Users\\{user}\\AppData\\Local\\Temp\\prism_memory_map.json" if (os_name == "nt") else f"/tmp/prism_memory_map.json" ) if not isfile(memory_map_fpath): raise PrismConnectionError( f"Memory map file not found at {memory_map_fpath}. Is the hardware running?" ) try: with open(memory_map_fpath, "r") as json_file: self.memory_map = load(json_file) except Exception as e: raise PrismConnectionError(f"Failed to load memory map: {str(e)}") try: self.sh_mem = SharedMemory(self.memory_map["name"]) except FileNotFoundError as e: raise PrismConnectionError( "Could not open shared memory. Is the hardware running?" ) except Exception as e: raise PrismConnectionError( f"Error opening hardware direct memory access: {str(e)}" ) buffer = self.sh_mem.buf self.measurements = {} for meas, meta in self.memory_map["data_sources"].items(): offset = meta["mem_offset"] x_data_size = meta["x_data_size"] y_data_size = meta["y_data_size"] self.measurements[meas] = { "name": meas, "verbose": meta["verbose"], "target_time": c_double.from_buffer(buffer, offset), "start_time": c_double.from_buffer(buffer, offset + 8), "finish_time": c_double.from_buffer(buffer, offset + 16), "valid_length": c_int.from_buffer(buffer, offset + 24), "x_data": np.frombuffer( buffer, dtype=np.float64, count=int(x_data_size / 8), offset=offset + 28, ), "y_data": np.frombuffer( buffer, dtype=np.float64, count=int(y_data_size / 8), offset=offset + 28 + x_data_size, ), "x_unit": meta["x_unit"], "y_unit": meta["y_unit"], } self.dofs = {} for name, meta in self.memory_map["dofs"].items(): self.dofs[name] = { "name": name, "verbose": meta["verbose"], "unit": meta["unit"], "lo_limit": meta["lo_limit"], "hi_limit": meta["hi_limit"], "target": c_double.from_buffer(buffer, meta["mem_offset"]), "actual": c_double.from_buffer(buffer, meta["mem_offset"] + 8), } self._finalizer = weakref.finalize(self, self.close)
[docs] def close(self) -> None: """Clean up resources and close the shared memory connection.""" # First clear all numpy array references for m in list(self.measurements.keys()): if self.measurements[m] is not None: del self.measurements[m]["x_data"] del self.measurements[m]["y_data"] del self.measurements[m] else: del self.measurements[m] # Clear all DOF references for d in list(self.dofs.keys()): if self.dofs[d] is not None: del self.dofs[d]["target"] del self.dofs[d]["actual"] del self.dofs[d] else: del self.dofs[d] if hasattr(self, "memory_map"): del self.memory_map try: if hasattr(self, "sh_mem"): self.sh_mem.close() except FileNotFoundError: pass except BufferError: import gc gc.collect() try: self.sh_mem.close() except (FileNotFoundError, BufferError): pass
[docs] def get_measurement_types(self) -> Tuple[Measurement, ...]: """Get available measurement types. Returns: Tuple of Measurement objects without data. """ return tuple( Measurement( name=self.memory_map["data_sources"][meas_key]["verbose"], key=meas_key, x_data=None, y_data=None, x_units=self.memory_map["data_sources"][meas_key]["x_unit"], y_units=self.memory_map["data_sources"][meas_key]["y_unit"], start_time=None, finish_time=None, ) for meas_key in self.memory_map["data_sources"].keys() )
[docs] def get_measurement_keys(self) -> Tuple[str, ...]: """Get available measurement keys. Returns: Tuple of measurement keys. """ return tuple(self.memory_map["data_sources"].keys())
[docs] def get_last_measurement(self, meas_key: str) -> Measurement: """Get the most recent measurement data. Args: meas_key: Measurement type key. Returns: Measurement object containing the most recent measurement data. Raises: PrismParameterError: If the measurement type key is invalid or is a camera. """ if "camera" in meas_key: raise PrismParameterError("Use camera specific function for camera reads.") if meas_key not in self.measurements: raise PrismParameterError(f"Invalid measurement type: {meas_key}") data = self.measurements[meas_key] return Measurement( name=data["verbose"], key=meas_key, x_data=data["x_data"].copy(), y_data=data["y_data"].copy(), x_units=data["x_unit"], y_units=data["y_unit"], start_time=( data["start_time"].value if data["start_time"].value < data["finish_time"].value else None ), finish_time=data["finish_time"].value, )
[docs] def single_measure( self, meas_key: str, timeout: Optional[float] = None ) -> Measurement: """Perform a single measurement and wait for completion. Args: meas_key: Measurement type key. timeout: Timeout in seconds. If None, uses the default timeout. Returns: Measurement object containing the measurement data. Raises: PrismParameterError: If the measurement type is invalid. PrismTimeoutError: If the operation times out. """ if meas_key not in self.measurements: raise PrismParameterError(f"Invalid measurement type: {meas_key}") timeout = timeout if timeout is not None else self.timeout self._start_time = time() self.measurements[meas_key]["target_time"].value = time() try: self.wait_for_new_data(meas_key, timeout) except PrismParameterError as e: if "Timeout" in str(e): raise PrismTimeoutError(f"Timeout waiting for {meas_key} measurement") raise return self.get_last_measurement(meas_key)
[docs] def start_running_measure(self, meas_key: str) -> None: """Start continuous measurement collection. Args: meas_key: Measurement type key. Raises: PrismParameterError: If the measurement type is invalid. """ if meas_key not in self.measurements: raise PrismParameterError(f"Invalid measurement type: {meas_key}") self.measurements[meas_key]["target_time"].value = 0
[docs] def stop_running_measure(self, meas_key: str) -> None: """Stop continuous measurement collection. Args: meas_key: Measurement type key. Raises: PrismParameterError: If the measurement type is invalid. """ if meas_key not in self.measurements: raise PrismParameterError(f"Invalid measurement type: {meas_key}") self.measurements[meas_key]["target_time"].value = -1
[docs] def get_all_parameters(self, list_read_only: bool = False) -> Tuple[Parameter, ...]: """Get information about available parameters. Args: list_read_only: If ``True`` the returned collection will include *read-only* parameters (those whose ``name`` contains the marker ``"*RO*"``). If ``False`` (default) read-only parameters are **excluded** so that only parameters that can be set by :py:meth:`set_parameter` are returned. Returns: Tuple[Parameter, ...]: A tuple of :class:`prismapi.core.Parameter` instances matching the filter criterion. """ parameters = tuple( self.get_parameter(param_name) for param_name in self.memory_map["dofs"] ) if list_read_only: # Return the full set including read-only parameters return parameters # Filter out parameters whose *name* field is marked as read-only return tuple(param for param in parameters if "*RO*" not in param.name)
[docs] def set_parameter( self, param_key: str, value: Any, timeout: Optional[float] = None ) -> None: """Set a parameter value and optionally wait for it to take effect. Args: param_key: Key of the parameter to set. value: New value for the parameter. timeout: Timeout in seconds. If None, does not block. Raises: PrismParameterError: If the parameter key is invalid or the value is outside the allowed range. PrismTimeoutError: If the operation times out. """ if param_key not in self.dofs: raise PrismParameterError(f"Invalid parameter name: {param_key}") dof = self.dofs[param_key] if not (dof["lo_limit"] <= value <= dof["hi_limit"]): raise PrismParameterError( f"Value {value} outside allowed range [{dof['lo_limit']}, {dof['hi_limit']}]" ) timeout = timeout if timeout is not None else self.timeout self._start_time = time() dof["target"].value = value if timeout is not None: while dof["target"].value != dof["actual"].value: if (time() - self._start_time) > timeout: raise PrismTimeoutError( f"Timeout waiting for parameter {param_key}" ) sleep(0.001)
[docs] def get_parameter(self, param_key: str) -> Parameter: """Get the current value of a parameter. Args: param_key: Key of the parameter. Returns: Current value of the parameter. Raises: PrismParameterError: If the parameter key is invalid. """ if param_key not in self.dofs: raise PrismParameterError(f"Invalid parameter key: {param_key}") dof = self.dofs[param_key] return Parameter( name=dof["verbose"], key=param_key, value=dof["actual"].value, units=dof["unit"], min=dof["lo_limit"], max=dof["hi_limit"], )
[docs] def image_bed(self) -> np.ndarray: """Capture an image from the bed camera. Returns: Numpy array containing the image data. Raises: PrismParameterError: If the bed camera is not available. PrismTimeoutError: If the operation times out. """ if "bed_camera" not in self.measurements: raise PrismParameterError("Bed camera not available") imager = self.measurements["bed_camera"] old_start = imager["finish_time"].value imager["target_time"].value = time() while old_start == imager["finish_time"].value: if self.timeout and (time() - self._start_time) > self.timeout: raise PrismTimeoutError("Timeout waiting for bed camera image") sleep(0.001) dimen = imager["x_data"] return ( imager["y_data"][: imager["valid_length"].value // 8] .reshape(dimen.astype(np.int32)) .astype(np.uint8) )
[docs] def pl_image(self) -> np.ndarray: """Capture an image from the PL camera. Returns: Numpy array containing the image data. Raises: PrismParameterError: If the PL camera is not available. PrismTimeoutError: If the operation times out. """ # Check to see if PL camera is available if "pli_camera" not in self.measurements: raise PrismParameterError("PL camera not available") imager = self.measurements["pli_camera"] old_start = imager["finish_time"].value imager["target_time"].value = time() while old_start == imager["finish_time"].value: if self.timeout and (time() - self._start_time) > self.timeout: raise PrismTimeoutError("Timeout waiting for PL camera image") sleep(0.001) dimen = imager["x_data"] return ( imager["y_data"][: imager["valid_length"].value // 8] .reshape(dimen.astype(np.int32)) .astype(np.uint8) )
[docs] def move(self, x: float, y: float, timeout: Optional[float] = None) -> None: """Move the bed to the specified position. Args: x: X position in millimeters. y: Y position in millimeters. timeout: Timeout in seconds. If None, uses the default timeout. Raises: PrismParameterError: If the position is outside the allowed range. PrismTimeoutError: If the operation times out. """ self.set_parameter("x_position", x, timeout=None) self.set_parameter("y_position", y, timeout=None) timeout = timeout if timeout is not None else self.timeout self._start_time = time() while ( self.dofs["x_position"]["target"].value != self.dofs["x_position"]["actual"].value or self.dofs["y_position"]["target"].value != self.dofs["y_position"]["actual"].value ): if timeout and (time() - self._start_time) > timeout: raise PrismTimeoutError("Timeout waiting for bed movement") sleep(0.001)
[docs] def wait_for_new_data(self, meas_key: str, timeout: Optional[float] = None) -> None: """Wait for new data to be available for a measurement. Args: meas_key: Key of the measurement type. timeout: Timeout in seconds. If None, blocks indefinitely. Raises: PrismParameterError: If the measurement key is invalid. PrismTimeoutError: If the operation times out. """ if meas_key not in self.measurements: raise PrismParameterError(f"Invalid measurement key: {meas_key}") current_finish_time = self.measurements[meas_key]["finish_time"].value start_time = time() while True: if timeout and (time() - start_time) > timeout: raise PrismTimeoutError(f"Timeout waiting for {meas_key} data.") if self.measurements[meas_key]["finish_time"].value > current_finish_time: return sleep(0.001)
[docs] def get_next_measurement( self, meas_key: str, timeout: Optional[float] = None ) -> Measurement: """Get the next measurement from the measurement queue. Args: meas_key: Key of the measurement type. timeout: Timeout in seconds. If None, blocks indefinitely. Returns: Measurement object containing the next measurement data. Raises: PrismParameterError: If the measurement key is invalid. PrismTimeoutError: If the operation times out. """ if meas_key not in self.measurements: raise PrismParameterError(f"Invalid measurement key: {meas_key}") self.wait_for_new_data(meas_key, timeout) return self.get_last_measurement(meas_key)