Plans¶
Plans that might be useful at the APS when using BlueSky
|
Scan over |
|
plan: run blocking function |
|
(decorator) run |
|
bluesky plan: record current values of list of ophyd signals |
|
simple 1-D scan using EPICS synApps sscan record |
|
tune an axis with a signal |
|
BlueSky plan to tune a list of axes in sequence |
-
class
apstools.plans.
TuneAxis
(signals, axis, signal_name=None)[source]¶ tune an axis with a signal
This class provides a tuning object so that a Device or other entity may gain its own tuning process, keeping track of the particulars needed to tune this device again. For example, one could add a tuner to a motor stage:
motor = EpicsMotor("xxx:motor", "motor") motor.tuner = TuneAxis([det], motor)
Then the
motor
could be tuned individually:RE(motor.tuner.tune(md={"activity": "tuning"}))
or the
tune()
could be part of a plan with other steps.Example:
tuner = TuneAxis([det], axis) live_table = LiveTable(["axis", "det"]) RE(tuner.multi_pass_tune(width=2, num=9), live_table) RE(tuner.tune(width=0.05, num=9), live_table)
Also see the jupyter notebook referenced here: Example: TuneAxis().
tune
([width, num, md])BlueSky plan to execute one pass through the current scan range
multi_pass_tune
([width, step_factor, num, …])BlueSky plan for tuning this axis with this signal
returns True if a peak was detected, otherwise False
-
multi_pass_tune
(width=None, step_factor=None, num=None, pass_max=None, snake=None, md=None)[source]¶ BlueSky plan for tuning this axis with this signal
Execute multiple passes to refine the centroid determination. Each subsequent pass will reduce the width of scan by
step_factor
. Ifsnake=True
then the scan direction will reverse with each subsequent pass.PARAMETERS
- widthfloat
width of the tuning scan in the units of
self.axis
Default value inself.width
(initially 1)- numint
number of steps Default value in
self.num
(initially 10)- step_factorfloat
This reduces the width of the next tuning scan by the given factor. Default value in
self.step_factor
(initially 4)- pass_maxint
Maximum number of passes to be executed (avoids runaway scans when a centroid is not found). Default value in
self.pass_max
(initially 10)- snakebool
If
True
, reverse scan direction on next pass. Default value inself.snake
(initially True)- mddict, optional
metadata
-
peak_detected
()[source]¶ returns True if a peak was detected, otherwise False
The default algorithm identifies a peak when the maximum value is four times the minimum value. Change this routine by subclassing
TuneAxis
and overridepeak_detected()
.
-
tune
(width=None, num=None, md=None)[source]¶ BlueSky plan to execute one pass through the current scan range
Scan self.axis centered about current position from
-width/2
to+width/2
withnum
observations. If a peak was detected (default check is that max >= 4*min), then setself.tune_ok = True
.PARAMETERS
- widthfloat
width of the tuning scan in the units of
self.axis
Default value inself.width
(initially 1)- numint
number of steps Default value in
self.num
(initially 10)- mddict, optional
metadata
-
-
apstools.plans.
nscan
(detectors, *motor_sets, num=11, per_step=None, md=None)[source]¶ Scan over
n
variables moved together, each in equally spaced steps.PARAMETERS
- detectorslist
list of ‘readable’ objects
- motor_setslist
sequence of one or more groups of: motor, start, finish
- motorobject
any ‘settable’ object (motor, temp controller, etc.)
- startfloat
starting position of motor
- finishfloat
ending position of motor
- numint
number of steps (default = 11)
- per_stepcallable, optional
hook for customizing action of inner loop (messages per step) Expected signature:
f(detectors, step_cache, pos_cache)
- mddict, optional
metadata
See the nscan() example in a Jupyter notebook: https://github.com/BCDA-APS/apstools/blob/master/docs/source/resources/demo_nscan.ipynb
-
apstools.plans.
run_blocker_in_plan
(blocker, *args, _poll_s_=0.01, _timeout_s_=None, **kwargs)[source]¶ plan: run blocking function
blocker_(*args, **kwargs)
from a Bluesky planPARAMETERS
- blockerfunc
function object to be called in a Bluesky plan
- _poll_s_float
sleep interval in loop while waiting for completion (default: 0.01)
- _timeout_s_float
maximum time for completion (default: None which means no timeout)
Example: use
time.sleep
as blocking function:RE(run_blocker_in_plan(time.sleep, 2.14))
Example: in a plan, use
time.sleep
as blocking function:def my_sleep(t=1.0): yield from run_blocker_in_plan(time.sleep, t) RE(my_sleep())
-
apstools.plans.
run_in_thread
(func)[source]¶ (decorator) run
func
in threadUSAGE:
@run_in_thread def progress_reporting(): logger.debug("progress_reporting is starting") # ... #... progress_reporting() # runs in separate thread #...
-
apstools.plans.
snapshot
(obj_list, stream='primary', md=None)[source]¶ bluesky plan: record current values of list of ophyd signals
PARAMETERS
- obj_listlist
list of ophyd Signal or EpicsSignal objects
- streamstr
document stream, default: “primary”
- mddict
metadata
-
apstools.plans.
sscan_1D
(sscan, poll_delay_s=0.001, phase_timeout_s=60.0, running_stream='primary', final_array_stream=None, device_settings_stream='settings', md={})[source]¶ simple 1-D scan using EPICS synApps sscan record
assumes the sscan record has already been setup properly for a scan
PARAMETERS
- sscanDevice
one EPICS sscan record (instance of apstools.synApps_ophyd.sscanRecord)
- running_streamstr or None
(default:
"primary"
) Name of document stream to write positioners and detectors data made available while the sscan is running. This is typically the scan data, row by row. If set to None, this stream will not be written.- final_array_streamstr or None
(default:
None
) Name of document stream to write positioners and detectors data posted after the sscan has ended. If set to None, this stream will not be written.- device_settings_streamstr or None
(default:
"settings"
) Name of document stream to write settings of the sscan device. This is all the information returned bysscan.read()
. If set to None, this stream will not be written.- poll_delay_sfloat
(default: 0.001 seconds) How long to sleep during each polling loop while collecting interim data values and waiting for sscan to complete. Must be a number between zero and 0.1 seconds.
- phase_timeout_sfloat
(default: 60 seconds) How long to wait after last update of the
sscan.FAZE
. When scanning, we expect the scan phase to update regularly as positioners move and detectors are triggered. If the scan hangs for some reason, this is a way to end the plan early. To cancel this feature, set it toNone
.
NOTE about the document stream names
Make certain the names for the document streams are different from each other. If you make them all the same (such as
primary
), you will have difficulty when reading your data later on.Don’t cross the streams!
EXAMPLE
Assume that the chosen sscan record has already been setup.
from apstools.devices import sscanDevice scans = sscanDevice(P, name=”scans”)
from apstools.plans import sscan_1D RE(sscan_1D(scans.scan1), md=dict(purpose=”demo”))