"""
Ophyd support for the EPICS synApps sscan record
see: https://epics.anl.gov/bcda/synApps/sscan/SscanRecord.html
EXAMPLE::
import apstools.synApps
scans = apstools.synApps.SscanDevice("xxx:", name="scans")
scans.select_channels() # only the channels configured in EPICS
Public Structures
.. autosummary::
~SscanRecord
~SscanDevice
Private Structures
.. autosummary::
~sscanPositioner
~sscanDetector
~sscanTrigger
"""
# -----------------------------------------------------------------------------
# :author: Pete R. Jemian
# :email: jemian@anl.gov
# :copyright: (c) 2017-2022, UChicago Argonne, LLC
#
# Distributed under the terms of the Creative Commons Attribution 4.0 International Public License.
#
# The full license is in the file LICENSE.txt, distributed with this software.
# -----------------------------------------------------------------------------
from collections import OrderedDict
from ophyd import Device
from ophyd import Component as Cpt
from ophyd import DynamicDeviceComponent as DDC
from ophyd import FormattedComponent as FC
from ophyd import EpicsSignal
from ophyd import EpicsSignalRO
from ophyd.status import DeviceStatus
from .. import utils as APS_utils
[docs]class sscanPositioner(Device):
"""
positioner of an EPICS sscan record
.. index:: Ophyd Device; synApps sscanPositioner
.. autosummary::
~defined_in_EPICS
~reset
"""
readback_pv = FC(EpicsSignal, "{self.prefix}.R{self._ch_num}PV", kind="config")
readback_value = FC(EpicsSignalRO, "{self.prefix}.R{self._ch_num}CV", kind="hinted")
array = FC(
EpicsSignalRO,
"{self.prefix}.P{self._ch_num}CA",
kind="normal", # TODO: which kind here?
)
setpoint_pv = FC(EpicsSignal, "{self.prefix}.P{self._ch_num}PV", kind="config")
setpoint_value = FC(EpicsSignalRO, "{self.prefix}.P{self._ch_num}DV", kind="normal")
start = FC(EpicsSignal, "{self.prefix}.P{self._ch_num}SP", kind="config")
center = FC(EpicsSignal, "{self.prefix}.P{self._ch_num}CP", kind="config")
end = FC(EpicsSignal, "{self.prefix}.P{self._ch_num}EP", kind="config")
step_size = FC(EpicsSignal, "{self.prefix}.P{self._ch_num}SI", kind="config")
width = FC(EpicsSignal, "{self.prefix}.P{self._ch_num}WD", kind="config")
abs_rel = FC(EpicsSignal, "{self.prefix}.P{self._ch_num}AR", kind="config")
mode = FC(EpicsSignal, "{self.prefix}.P{self._ch_num}SM", kind="config")
units = FC(EpicsSignalRO, "{self.prefix}.P{self._ch_num}EU", kind="config")
def __init__(self, prefix, num, **kwargs):
self._ch_num = num
super().__init__(prefix, **kwargs)
[docs] def reset(self):
"""set all fields to default values"""
self.readback_pv.put("")
self.setpoint_pv.put("")
self.start.put(0)
self.center.put(0)
self.end.put(0)
self.step_size.put(0)
self.width.put(0)
self.abs_rel.put("ABSOLUTE")
self.mode.put("LINEAR")
@property
def defined_in_EPICS(self):
"""True if defined in EPICS"""
return len(self.setpoint_pv.get().strip()) > 0
[docs]class sscanDetector(Device):
"""
detector of an EPICS sscan record
.. index:: Ophyd Device; synApps sscanDetector
.. autosummary::
~defined_in_EPICS
~reset
"""
input_pv = FC(EpicsSignal, "{self.prefix}.D{self._ch_num}PV", kind="config")
current_value = FC(EpicsSignal, "{self.prefix}.D{self._ch_num}CV", kind="hinted")
array = FC(EpicsSignal, "{self.prefix}.D{self._ch_num}CA", kind="omitted")
def __init__(self, prefix, num, **kwargs):
self._ch_num = num
super().__init__(prefix, **kwargs)
[docs] def reset(self):
"""set all fields to default values"""
self.input_pv.put("")
@property
def defined_in_EPICS(self):
"""True if defined in EPICS"""
return len(self.input_pv.get().strip()) > 0
[docs]class sscanTrigger(Device):
"""
detector trigger of an EPICS sscan record
.. index:: Ophyd Device; synApps sscanTrigger
.. autosummary::
~reset
"""
trigger_pv = FC(EpicsSignal, "{self.prefix}.T{self._ch_num}PV", kind="config")
trigger_value = FC(EpicsSignal, "{self.prefix}.T{self._ch_num}CD", kind="config")
def __init__(self, prefix, num, **kwargs):
self._ch_num = num
super().__init__(prefix, **kwargs)
[docs] def reset(self):
"""set all fields to default values"""
self.trigger_pv.put("")
self.trigger_value.put(1)
@property
def defined_in_EPICS(self):
"""True if defined in EPICS"""
return len(self.trigger_pv.get().strip()) > 0
def _sscan_positioners(channel_list):
defn = OrderedDict()
for chan in channel_list:
attr = "p{}".format(chan)
defn[attr] = (sscanPositioner, "", {"num": chan})
return defn
def _sscan_detectors(channel_list):
defn = OrderedDict()
for chan in channel_list:
attr = "d{}".format(chan)
defn[attr] = (sscanDetector, "", {"num": chan})
return defn
def _sscan_triggers(channel_list):
defn = OrderedDict()
for chan in channel_list:
attr = "t{}".format(chan)
defn[attr] = (sscanTrigger, "", {"num": chan})
return defn
[docs]class SscanRecord(Device):
"""
EPICS synApps sscan record: used as ``$(P):scan(N)``
.. index:: Ophyd Device; synApps SscanRecord
.. autosummary::
~defined_in_EPICS
~reset
~select_channels
"""
desc = Cpt(EpicsSignal, ".DESC", kind="config")
scan_phase = Cpt(EpicsSignalRO, ".FAZE", kind="config")
data_state = Cpt(EpicsSignalRO, ".DSTATE", kind="config")
data_ready = Cpt(EpicsSignalRO, ".DATA", kind="config")
scan_busy = Cpt(EpicsSignalRO, ".BUSY", kind="config")
alert_flag = Cpt(EpicsSignalRO, ".ALRT", kind="config")
alert_message = Cpt(EpicsSignalRO, ".SMSG", kind="config")
number_points = Cpt(EpicsSignal, ".NPTS", kind="config")
maximum_number_points = Cpt(EpicsSignal, ".MPTS", kind="config")
current_point = Cpt(EpicsSignalRO, ".CPT", kind="normal")
pasm = Cpt(EpicsSignal, ".PASM", kind="config")
execute_scan = Cpt(EpicsSignal, ".EXSC", kind="omitted")
bspv = Cpt(EpicsSignal, ".BSPV", kind="config")
bscd = Cpt(EpicsSignal, ".BSCD", kind="omitted")
bswait = Cpt(EpicsSignal, ".BSWAIT", kind="omitted")
cmnd = Cpt(EpicsSignal, ".CMND", kind="omitted")
detector_delay = Cpt(EpicsSignal, ".DDLY", kind="config")
positioner_delay = Cpt(EpicsSignal, ".PDLY", kind="config")
reference_detector = Cpt(EpicsSignal, ".REFD", kind="config")
wait = Cpt(EpicsSignal, ".WAIT", kind="config")
wcnt = Cpt(EpicsSignalRO, ".WCNT", kind="config")
awct = Cpt(EpicsSignal, ".AWCT", kind="config")
acqt = Cpt(EpicsSignal, ".ACQT", kind="config")
acqm = Cpt(EpicsSignal, ".ACQM", kind="config")
atime = Cpt(EpicsSignal, ".ATIME", kind="config")
copyto = Cpt(EpicsSignal, ".COPYTO", kind="config")
a1pv = Cpt(EpicsSignal, ".A1PV", kind="config")
a1nv = Cpt(EpicsSignal, ".A1NV", kind="config")
a1cd = Cpt(EpicsSignal, ".A1CD", kind="config")
aspv = Cpt(EpicsSignal, ".ASPV", kind="config")
ascd = Cpt(EpicsSignal, ".ASCD", kind="config")
positioners = DDC(_sscan_positioners("1 2 3 4".split()))
detectors = DDC(_sscan_detectors(APS_utils.itemizer("%02d", range(1, 71))))
triggers = DDC(_sscan_triggers("1 2 3 4".split()))
[docs] def set(self, value, **kwargs):
"""interface to use bps.mv()"""
if value != 1:
return
working_status = DeviceStatus(self)
started = False
def execute_scan_cb(value, timestamp, **kwargs):
value = int(value)
if started and value == 0:
working_status._finished()
self.execute_scan.subscribe(execute_scan_cb)
self.execute_scan.set(1)
started = True
return working_status
[docs] def reset(self):
"""set all fields to default values"""
self.desc.put(self.desc.pvname.split(".")[0])
self.number_points.put(1000)
for part in (self.positioners, self.detectors, self.triggers):
for ch_name in part.component_names:
channel = getattr(part, ch_name)
channel.reset()
self.a1pv.put("")
self.acqm.put("NORMAL")
if self.name.find("scanH") > 0:
self.acqt.put("1D ARRAY")
else:
self.acqt.put("SCALAR")
self.aspv.put("")
self.bspv.put("")
self.pasm.put("STAY")
self.bswait.put("Wait")
self.a1cd.put(1)
self.ascd.put(1)
self.bscd.put(1)
self.reference_detector.put(1)
self.atime.put(0)
self.awct.put(0)
self.copyto.put(0)
self.detector_delay.put(0)
self.positioner_delay.put(0)
while self.wcnt.get() > 0:
self.wait.put(0)
[docs] def select_channels(self):
"""
Select channels that are configured in EPICS
"""
for part in (self.positioners, self.detectors, self.triggers):
channel_names = [] # part.get_configured_channels()
# fmt: off
channel_names = [
ch
for ch in part.component_names
if getattr(part, ch).defined_in_EPICS
]
# fmt: on
part.configuration_attrs = channel_names
part.read_attrs = channel_names
part.kind = "normal"
@property
def defined_in_EPICS(self):
"""True if will be used in EPICS"""
self.select_channels()
channels = len(self.positioners.read_attrs)
channels += len(self.detectors.read_attrs)
# channels += len(self.triggers.read_attrs)
return channels > 0
[docs]class SscanDevice(Device):
"""
EPICS synApps XXX IOC setup of sscan records: ``$(P):scan$(N)``
.. index:: Ophyd Device; synApps SscanDevice
.. autosummary::
~reset
~select_channels
"""
scan_dimension = Cpt(EpicsSignalRO, "ScanDim", kind="config")
scan_pause = Cpt(EpicsSignal, "scanPause", kind="omitted")
abort_scans = Cpt(EpicsSignal, "AbortScans", kind="omitted")
scan1 = Cpt(SscanRecord, "scan1")
scan2 = Cpt(SscanRecord, "scan2")
scan3 = Cpt(SscanRecord, "scan3")
scan4 = Cpt(SscanRecord, "scan4")
scanH = Cpt(SscanRecord, "scanH")
resume_delay = Cpt(EpicsSignal, "scanResumeSEQ.DLY1", kind="config")
[docs] def reset(self):
"""set all fields to default values"""
for chnum in "1 2 3 4 H".split():
getattr(self, "scan" + chnum).reset()
[docs] def select_channels(self):
"""
Select only the scans that are configured in EPICS
"""
scans = ["scan" + ch for ch in "1 2 3 4 H".split()]
attrs = []
for nm in self.component_names:
if nm in scans and not getattr(self, nm).defined_in_EPICS:
continue
attrs.append(nm)
self.read_attrs = attrs