Plans

Plans that might be useful at the APS when using BlueSky

nscan(detectors, *motor_sets[, num, …])

Scan over n variables moved together, each in equally spaced steps.

run_blocker_in_plan(blocker, *args[, …])

plan: run blocking function blocker_(*args, **kwargs) from a Bluesky plan

run_in_thread(func)

(decorator) run func in thread

snapshot(obj_list[, stream, md])

bluesky plan: record current values of list of ophyd signals

sscan_1D(sscan[, poll_delay_s, …])

simple 1-D scan using EPICS synApps sscan record

TuneAxis(signals, axis[, signal_name])

tune an axis with a signal

tune_axes(axes)

BlueSky plan to tune a list of axes in sequence

class apstools.plans.TuneAxis(signals, axis, signal_name=None)[source]

tune an axis with a signal

This class provides a tuning object so that a Device or other entity may gain its own tuning process, keeping track of the particulars needed to tune this device again. For example, one could add a tuner to a motor stage:

motor = EpicsMotor("xxx:motor", "motor")
motor.tuner = TuneAxis([det], motor)

Then the motor could be tuned individually:

RE(motor.tuner.tune(md={"activity": "tuning"}))

or the tune() could be part of a plan with other steps.

Example:

tuner = TuneAxis([det], axis)
live_table = LiveTable(["axis", "det"])
RE(tuner.multi_pass_tune(width=2, num=9), live_table)
RE(tuner.tune(width=0.05, num=9), live_table)

Also see the jupyter notebook referenced here: Example: TuneAxis().

tune([width, num, md])

BlueSky plan to execute one pass through the current scan range

multi_pass_tune([width, step_factor, num, …])

BlueSky plan for tuning this axis with this signal

peak_detected()

returns True if a peak was detected, otherwise False

multi_pass_tune(width=None, step_factor=None, num=None, pass_max=None, snake=None, md=None)[source]

BlueSky plan for tuning this axis with this signal

Execute multiple passes to refine the centroid determination. Each subsequent pass will reduce the width of scan by step_factor. If snake=True then the scan direction will reverse with each subsequent pass.

PARAMETERS

widthfloat

width of the tuning scan in the units of self.axis Default value in self.width (initially 1)

numint

number of steps Default value in self.num (initially 10)

step_factorfloat

This reduces the width of the next tuning scan by the given factor. Default value in self.step_factor (initially 4)

pass_maxint

Maximum number of passes to be executed (avoids runaway scans when a centroid is not found). Default value in self.pass_max (initially 10)

snakebool

If True, reverse scan direction on next pass. Default value in self.snake (initially True)

mddict, optional

metadata

peak_detected()[source]

returns True if a peak was detected, otherwise False

The default algorithm identifies a peak when the maximum value is four times the minimum value. Change this routine by subclassing TuneAxis and override peak_detected().

tune(width=None, num=None, md=None)[source]

BlueSky plan to execute one pass through the current scan range

Scan self.axis centered about current position from -width/2 to +width/2 with num observations. If a peak was detected (default check is that max >= 4*min), then set self.tune_ok = True.

PARAMETERS

widthfloat

width of the tuning scan in the units of self.axis Default value in self.width (initially 1)

numint

number of steps Default value in self.num (initially 10)

mddict, optional

metadata

apstools.plans.nscan(detectors, *motor_sets, num=11, per_step=None, md=None)[source]

Scan over n variables moved together, each in equally spaced steps.

PARAMETERS

detectorslist

list of ‘readable’ objects

motor_setslist

sequence of one or more groups of: motor, start, finish

motorobject

any ‘settable’ object (motor, temp controller, etc.)

startfloat

starting position of motor

finishfloat

ending position of motor

numint

number of steps (default = 11)

per_stepcallable, optional

hook for customizing action of inner loop (messages per step) Expected signature: f(detectors, step_cache, pos_cache)

mddict, optional

metadata

See the nscan() example in a Jupyter notebook: https://github.com/BCDA-APS/apstools/blob/master/docs/source/resources/demo_nscan.ipynb

apstools.plans.run_blocker_in_plan(blocker, *args, _poll_s_=0.01, _timeout_s_=None, **kwargs)[source]

plan: run blocking function blocker_(*args, **kwargs) from a Bluesky plan

PARAMETERS

blockerfunc

function object to be called in a Bluesky plan

_poll_s_float

sleep interval in loop while waiting for completion (default: 0.01)

_timeout_s_float

maximum time for completion (default: None which means no timeout)

Example: use time.sleep as blocking function:

RE(run_blocker_in_plan(time.sleep, 2.14))

Example: in a plan, use time.sleep as blocking function:

def my_sleep(t=1.0):
    yield from run_blocker_in_plan(time.sleep, t)

RE(my_sleep())
apstools.plans.run_in_thread(func)[source]

(decorator) run func in thread

USAGE:

@run_in_thread
def progress_reporting():
    logger.debug("progress_reporting is starting")
    # ...

#...
progress_reporting()   # runs in separate thread
#...
apstools.plans.snapshot(obj_list, stream='primary', md=None)[source]

bluesky plan: record current values of list of ophyd signals

PARAMETERS

obj_listlist

list of ophyd Signal or EpicsSignal objects

streamstr

document stream, default: “primary”

mddict

metadata

apstools.plans.sscan_1D(sscan, poll_delay_s=0.001, phase_timeout_s=60.0, running_stream='primary', final_array_stream=None, device_settings_stream='settings', md={})[source]

simple 1-D scan using EPICS synApps sscan record

assumes the sscan record has already been setup properly for a scan

PARAMETERS

sscanDevice

one EPICS sscan record (instance of apstools.synApps_ophyd.sscanRecord)

running_streamstr or None

(default: "primary") Name of document stream to write positioners and detectors data made available while the sscan is running. This is typically the scan data, row by row. If set to None, this stream will not be written.

final_array_streamstr or None

(default: None) Name of document stream to write positioners and detectors data posted after the sscan has ended. If set to None, this stream will not be written.

device_settings_streamstr or None

(default: "settings") Name of document stream to write settings of the sscan device. This is all the information returned by sscan.read(). If set to None, this stream will not be written.

poll_delay_sfloat

(default: 0.001 seconds) How long to sleep during each polling loop while collecting interim data values and waiting for sscan to complete. Must be a number between zero and 0.1 seconds.

phase_timeout_sfloat

(default: 60 seconds) How long to wait after last update of the sscan.FAZE. When scanning, we expect the scan phase to update regularly as positioners move and detectors are triggered. If the scan hangs for some reason, this is a way to end the plan early. To cancel this feature, set it to None.

NOTE about the document stream names

Make certain the names for the document streams are different from each other. If you make them all the same (such as primary), you will have difficulty when reading your data later on.

Don’t cross the streams!

EXAMPLE

Assume that the chosen sscan record has already been setup.

from apstools.devices import sscanDevice scans = sscanDevice(P, name=”scans”)

from apstools.plans import sscan_1D RE(sscan_1D(scans.scan1), md=dict(purpose=”demo”))

apstools.plans.tune_axes(axes)[source]

BlueSky plan to tune a list of axes in sequence

EXAMPLE

Sequentially, tune a list of preconfigured axes:

RE(tune_axes([mr, m2r, ar, a2r])