from typing import Tuple, Dict, Any, Optional
from json import load
from os.path import isfile
from multiprocessing.shared_memory import SharedMemory
import weakref
from ctypes import c_int, c_double
import numpy as np
from time import time, sleep
from os import name as os_name
from getpass import getuser
from prismapi.core import Measurement, Parameter
[docs]
class PrismError(Exception):
"""Base exception class for Prism API errors."""
pass
[docs]
class PrismConnectionError(PrismError):
"""Raised when there are issues connecting to the Prism hardware."""
pass
[docs]
class PrismParameterError(PrismError):
"""Raised when there are issues with parameter operations."""
pass
[docs]
class PrismTimeoutError(PrismError):
"""Raised when a timeout occurs."""
pass
[docs]
class Prism:
"""Object for controlling and collecting data from Prism hardware.
This class provides a Python interface to the Prism hardware system, allowing
control of various parameters and collection of measurement data.
The hardware controller is communicated with through a shared memory. The
shared memory layout is defined by a JSON file that specifies the memory
map configuration including data source locations and sizes. By default this
file is located in a system-specific temp directory, but can be overridden
by passing an explicit path. This location should not need to be modified.
Attributes:
memory_map (Dict): The loaded memory map configuration
measurements (Dict): Dictionary of measurement data structures
dofs (Dict): Dictionary of degree of freedom (DOF) data structures
"""
[docs]
def __init__(
self,
memory_map_fpath: Optional[str] = None,
timeout: Optional[float] = None,
):
"""Initialize the Prism hardware interface.
Args:
memory_map_fpath: Path to the memory map JSON file. If None, defaults to
a system-specific temp directory location. Should not need to be
modified under normal circumstances.
timeout: Default timeout in seconds for blocking operations. If None,
operations will block indefinitely.
Raises:
PrismConnectionError: If the shared memory cannot be opened or the
memory map file is invalid.
"""
self.timeout = timeout
self._start_time = None
if memory_map_fpath is None:
user = getuser()
memory_map_fpath = (
f"C:\\Users\\{user}\\AppData\\Local\\Temp\\prism_memory_map.json"
if (os_name == "nt")
else f"/tmp/prism_memory_map.json"
)
if not isfile(memory_map_fpath):
raise PrismConnectionError(
f"Memory map file not found at {memory_map_fpath}. Is the hardware running?"
)
try:
with open(memory_map_fpath, "r") as json_file:
self.memory_map = load(json_file)
except Exception as e:
raise PrismConnectionError(f"Failed to load memory map: {str(e)}")
try:
self.sh_mem = SharedMemory(self.memory_map["name"])
except FileNotFoundError as e:
raise PrismConnectionError(
"Could not open shared memory. Is the hardware running?"
)
except Exception as e:
raise PrismConnectionError(
f"Error opening hardware direct memory access: {str(e)}"
)
buffer = self.sh_mem.buf
self.measurements = {}
for meas, meta in self.memory_map["data_sources"].items():
offset = meta["mem_offset"]
x_data_size = meta["x_data_size"]
y_data_size = meta["y_data_size"]
self.measurements[meas] = {
"name": meas,
"verbose": meta["verbose"],
"target_time": c_double.from_buffer(buffer, offset),
"start_time": c_double.from_buffer(buffer, offset + 8),
"finish_time": c_double.from_buffer(buffer, offset + 16),
"valid_length": c_int.from_buffer(buffer, offset + 24),
"x_data": np.frombuffer(
buffer,
dtype=np.float64,
count=int(x_data_size / 8),
offset=offset + 28,
),
"y_data": np.frombuffer(
buffer,
dtype=np.float64,
count=int(y_data_size / 8),
offset=offset + 28 + x_data_size,
),
"x_unit": meta["x_unit"],
"y_unit": meta["y_unit"],
}
self.dofs = {}
for name, meta in self.memory_map["dofs"].items():
self.dofs[name] = {
"name": name,
"verbose": meta["verbose"],
"unit": meta["unit"],
"lo_limit": meta["lo_limit"],
"hi_limit": meta["hi_limit"],
"target": c_double.from_buffer(buffer, meta["mem_offset"]),
"actual": c_double.from_buffer(buffer, meta["mem_offset"] + 8),
}
self._finalizer = weakref.finalize(self, self.close)
[docs]
def close(self) -> None:
"""Clean up resources and close the shared memory connection."""
# First clear all numpy array references
for m in list(self.measurements.keys()):
if self.measurements[m] is not None:
del self.measurements[m]["x_data"]
del self.measurements[m]["y_data"]
del self.measurements[m]
else:
del self.measurements[m]
# Clear all DOF references
for d in list(self.dofs.keys()):
if self.dofs[d] is not None:
del self.dofs[d]["target"]
del self.dofs[d]["actual"]
del self.dofs[d]
else:
del self.dofs[d]
if hasattr(self, "memory_map"):
del self.memory_map
try:
if hasattr(self, "sh_mem"):
self.sh_mem.close()
except FileNotFoundError:
pass
except BufferError:
import gc
gc.collect()
try:
self.sh_mem.close()
except (FileNotFoundError, BufferError):
pass
[docs]
def get_measurement_types(self) -> Tuple[Measurement, ...]:
"""Get available measurement types.
Returns:
Tuple of Measurement objects without data.
"""
return tuple(
Measurement(
name=self.memory_map["data_sources"][meas_key]["verbose"],
key=meas_key,
x_data=None,
y_data=None,
x_units=self.memory_map["data_sources"][meas_key]["x_unit"],
y_units=self.memory_map["data_sources"][meas_key]["y_unit"],
start_time=None,
finish_time=None,
)
for meas_key in self.memory_map["data_sources"].keys()
)
[docs]
def get_measurement_keys(self) -> Tuple[str, ...]:
"""Get available measurement keys.
Returns:
Tuple of measurement keys.
"""
return tuple(self.memory_map["data_sources"].keys())
[docs]
def get_last_measurement(self, meas_key: str) -> Measurement:
"""Get the most recent measurement data.
Args:
meas_key: Measurement type key.
Returns:
Measurement object containing the most recent measurement data.
Raises:
PrismParameterError: If the measurement type key is invalid or is a camera.
"""
if "camera" in meas_key:
raise PrismParameterError("Use camera specific function for camera reads.")
if meas_key not in self.measurements:
raise PrismParameterError(f"Invalid measurement type: {meas_key}")
data = self.measurements[meas_key]
return Measurement(
name=data["verbose"],
key=meas_key,
x_data=data["x_data"].copy(),
y_data=data["y_data"].copy(),
x_units=data["x_unit"],
y_units=data["y_unit"],
start_time=(
data["start_time"].value
if data["start_time"].value < data["finish_time"].value
else None
),
finish_time=data["finish_time"].value,
)
[docs]
def single_measure(
self, meas_key: str, timeout: Optional[float] = None
) -> Measurement:
"""Perform a single measurement and wait for completion.
Args:
meas_key: Measurement type key.
timeout: Timeout in seconds. If None, uses the default timeout.
Returns:
Measurement object containing the measurement data.
Raises:
PrismParameterError: If the measurement type is invalid.
PrismTimeoutError: If the operation times out.
"""
if meas_key not in self.measurements:
raise PrismParameterError(f"Invalid measurement type: {meas_key}")
timeout = timeout if timeout is not None else self.timeout
self._start_time = time()
self.measurements[meas_key]["target_time"].value = time()
try:
self.wait_for_new_data(meas_key, timeout)
except PrismParameterError as e:
if "Timeout" in str(e):
raise PrismTimeoutError(f"Timeout waiting for {meas_key} measurement")
raise
return self.get_last_measurement(meas_key)
[docs]
def start_running_measure(self, meas_key: str) -> None:
"""Start continuous measurement collection.
Args:
meas_key: Measurement type key.
Raises:
PrismParameterError: If the measurement type is invalid.
"""
if meas_key not in self.measurements:
raise PrismParameterError(f"Invalid measurement type: {meas_key}")
self.measurements[meas_key]["target_time"].value = 0
[docs]
def stop_running_measure(self, meas_key: str) -> None:
"""Stop continuous measurement collection.
Args:
meas_key: Measurement type key.
Raises:
PrismParameterError: If the measurement type is invalid.
"""
if meas_key not in self.measurements:
raise PrismParameterError(f"Invalid measurement type: {meas_key}")
self.measurements[meas_key]["target_time"].value = -1
[docs]
def get_all_parameters(self, list_read_only: bool = False) -> Tuple[Parameter, ...]:
"""Get information about available parameters.
Args:
list_read_only: If ``True`` the returned collection will include *read-only* parameters (those whose ``name`` contains the marker ``"*RO*"``). If ``False`` (default) read-only parameters are **excluded** so that only parameters that can be set by :py:meth:`set_parameter` are returned.
Returns:
Tuple[Parameter, ...]: A tuple of :class:`prismapi.core.Parameter` instances matching the filter criterion.
"""
parameters = tuple(
self.get_parameter(param_name) for param_name in self.memory_map["dofs"]
)
if list_read_only:
# Return the full set including read-only parameters
return parameters
# Filter out parameters whose *name* field is marked as read-only
return tuple(param for param in parameters if "*RO*" not in param.name)
[docs]
def set_parameter(
self, param_key: str, value: Any, timeout: Optional[float] = None
) -> None:
"""Set a parameter value and optionally wait for it to take effect.
Args:
param_key: Key of the parameter to set.
value: New value for the parameter.
timeout: Timeout in seconds. If None, does not block.
Raises:
PrismParameterError: If the parameter key is invalid or the value is
outside the allowed range.
PrismTimeoutError: If the operation times out.
"""
if param_key not in self.dofs:
raise PrismParameterError(f"Invalid parameter name: {param_key}")
dof = self.dofs[param_key]
if not (dof["lo_limit"] <= value <= dof["hi_limit"]):
raise PrismParameterError(
f"Value {value} outside allowed range [{dof['lo_limit']}, {dof['hi_limit']}]"
)
timeout = timeout if timeout is not None else self.timeout
self._start_time = time()
dof["target"].value = value
if timeout is not None:
while dof["target"].value != dof["actual"].value:
if (time() - self._start_time) > timeout:
raise PrismTimeoutError(
f"Timeout waiting for parameter {param_key}"
)
sleep(0.001)
[docs]
def get_parameter(self, param_key: str) -> Parameter:
"""Get the current value of a parameter.
Args:
param_key: Key of the parameter.
Returns:
Current value of the parameter.
Raises:
PrismParameterError: If the parameter key is invalid.
"""
if param_key not in self.dofs:
raise PrismParameterError(f"Invalid parameter key: {param_key}")
dof = self.dofs[param_key]
return Parameter(
name=dof["verbose"],
key=param_key,
value=dof["actual"].value,
units=dof["unit"],
min=dof["lo_limit"],
max=dof["hi_limit"],
)
[docs]
def image_bed(self) -> np.ndarray:
"""Capture an image from the bed camera.
Returns:
Numpy array containing the image data.
Raises:
PrismParameterError: If the bed camera is not available.
PrismTimeoutError: If the operation times out.
"""
if "bed_camera" not in self.measurements:
raise PrismParameterError("Bed camera not available")
imager = self.measurements["bed_camera"]
old_start = imager["finish_time"].value
imager["target_time"].value = time()
while old_start == imager["finish_time"].value:
if self.timeout and (time() - self._start_time) > self.timeout:
raise PrismTimeoutError("Timeout waiting for bed camera image")
sleep(0.001)
dimen = imager["x_data"]
return (
imager["y_data"][: imager["valid_length"].value // 8]
.reshape(dimen.astype(np.int32))
.astype(np.uint8)
)
[docs]
def pl_image(self) -> np.ndarray:
"""Capture an image from the PL camera.
Returns:
Numpy array containing the image data.
Raises:
PrismParameterError: If the PL camera is not available.
PrismTimeoutError: If the operation times out.
"""
# Check to see if PL camera is available
if "pli_camera" not in self.measurements:
raise PrismParameterError("PL camera not available")
imager = self.measurements["pli_camera"]
old_start = imager["finish_time"].value
imager["target_time"].value = time()
while old_start == imager["finish_time"].value:
if self.timeout and (time() - self._start_time) > self.timeout:
raise PrismTimeoutError("Timeout waiting for PL camera image")
sleep(0.001)
dimen = imager["x_data"]
return (
imager["y_data"][: imager["valid_length"].value // 8]
.reshape(dimen.astype(np.int32))
.astype(np.uint8)
)
[docs]
def move(self, x: float, y: float, timeout: Optional[float] = None) -> None:
"""Move the bed to the specified position.
Args:
x: X position in millimeters.
y: Y position in millimeters.
timeout: Timeout in seconds. If None, uses the default timeout.
Raises:
PrismParameterError: If the position is outside the allowed range.
PrismTimeoutError: If the operation times out.
"""
self.set_parameter("x_position", x, timeout=None)
self.set_parameter("y_position", y, timeout=None)
timeout = timeout if timeout is not None else self.timeout
self._start_time = time()
while (
self.dofs["x_position"]["target"].value
!= self.dofs["x_position"]["actual"].value
or self.dofs["y_position"]["target"].value
!= self.dofs["y_position"]["actual"].value
):
if timeout and (time() - self._start_time) > timeout:
raise PrismTimeoutError("Timeout waiting for bed movement")
sleep(0.001)
[docs]
def wait_for_new_data(self, meas_key: str, timeout: Optional[float] = None) -> None:
"""Wait for new data to be available for a measurement.
Args:
meas_key: Key of the measurement type.
timeout: Timeout in seconds. If None, blocks indefinitely.
Raises:
PrismParameterError: If the measurement key is invalid.
PrismTimeoutError: If the operation times out.
"""
if meas_key not in self.measurements:
raise PrismParameterError(f"Invalid measurement key: {meas_key}")
current_finish_time = self.measurements[meas_key]["finish_time"].value
start_time = time()
while True:
if timeout and (time() - start_time) > timeout:
raise PrismTimeoutError(f"Timeout waiting for {meas_key} data.")
if self.measurements[meas_key]["finish_time"].value > current_finish_time:
return
sleep(0.001)
[docs]
def get_next_measurement(
self, meas_key: str, timeout: Optional[float] = None
) -> Measurement:
"""Get the next measurement from the measurement queue.
Args:
meas_key: Key of the measurement type.
timeout: Timeout in seconds. If None, blocks indefinitely.
Returns:
Measurement object containing the next measurement data.
Raises:
PrismParameterError: If the measurement key is invalid.
PrismTimeoutError: If the operation times out.
"""
if meas_key not in self.measurements:
raise PrismParameterError(f"Invalid measurement key: {meas_key}")
self.wait_for_new_data(meas_key, timeout)
return self.get_last_measurement(meas_key)