"""
BlueSky callback that writes SPEC data files
.. autosummary::
~SpecWriterCallback
~spec_comment
EXAMPLE : the :ref:`specfile_example() <example_specfile>` writes one or more scans to a SPEC data file using a jupyter notebook.
EXAMPLE : use as BlueSky callback::
from apstools.filewriters import SpecWriterCallback
specwriter = SpecWriterCallback()
RE.subscribe(specwriter.receiver)
EXAMPLE : use as writer from Databroker::
from apstools.filewriters import SpecWriterCallback
specwriter = SpecWriterCallback()
for key, doc in db.get_documents(db[-1]):
specwriter.receiver(key, doc)
print("Look at SPEC data file: "+specwriter.spec_filename)
EXAMPLE : use as writer from Databroker with customizations::
from apstools.filewriters import SpecWriterCallback
# write into file: /tmp/cerium.spec
specwriter = SpecWriterCallback(filename="/tmp/cerium.spec")
for key, doc in db.get_documents(db[-1]):
specwriter.receiver(key, doc)
# write into file: /tmp/barium.dat
specwriter.newfile("/tmp/barium.dat")
for key, doc in db.get_documents(db["b46b63d4"]):
specwriter.receiver(key, doc)
"""
#-----------------------------------------------------------------------------
# :author: Pete R. Jemian
# :email: jemian@anl.gov
# :copyright: (c) 2017-2019, UChicago Argonne, LLC
#
# Distributed under the terms of the Creative Commons Attribution 4.0 International Public License.
#
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------
from collections import OrderedDict
from datetime import datetime
import getpass
import logging
import os
import socket
import time
logger = logging.getLogger(__name__).addHandler(logging.NullHandler())
# Programmer's Note: subclassing from `object` avoids the need
# to import `bluesky.callbacks.core.CallbackBase`.
# One less import when only accessing the Databroker.
# The *only* advantage to subclassing from CallbackBase
# seems to be a simpler setup call to RE.subscribe().
#
# superclass | subscription code
# ------------ | -------------------------------
# object | RE.subscribe(specwriter.receiver)
# CallbackBase | RE.subscribe(specwriter)
SPEC_TIME_FORMAT = "%a %b %d %H:%M:%S %Y"
SCAN_ID_RESET_VALUE = 1
def _rebuild_scan_command(doc):
"""reconstruct the scan command for SPEC data file #S line"""
def get_name(src):
"""
get name field from object representation
given: EpicsMotor(prefix='xxx:m1', name='m1', settle_time=0.0,
timeout=None, read_attrs=['user_readback', 'user_setpoint'],
configuration_attrs=['motor_egu', 'velocity', 'acceleration',
'user_offset', 'user_offset_dir'])
return: "m1"
"""
s = str(src)
p = s.find("(")
if p > 0: # only if an open parenthesis is found
parts = s[p+1:].rstrip(")").split(",")
for item in parts:
# should be key=value pairs
item = item.strip()
p = item.find("=")
if item[:p] == "name":
s = item[p+1:] # get the name value
break
return s
s = []
if "plan_args" in doc:
for _k, _v in doc['plan_args'].items():
if _k == "detectors":
_v = doc[_k]
elif _k.startswith("motor"):
_v = doc["motors"]
elif _k == "args":
_v = "[" + ", ".join(map(get_name, _v)) + "]"
s.append(f"{_k}={_v}")
cmd = "{}({})".format(doc.get("plan_name", ""), ", ".join(s))
scan_id = doc.get("scan_id") or 1
return f"{scan_id} {cmd}"
[docs]class SpecWriterCallback(object):
"""
collect data from BlueSky RunEngine documents to write as SPEC data
This gathers data from all documents and appends scan to the file
when the *stop* document is received.
Parameters
filename : string, optional
Local, relative or absolute name of SPEC data file to be used.
If `filename=None`, defaults to format of YYYmmdd-HHMMSS.dat
derived from the current system time.
auto_write : boolean, optional
If True (default), `write_scan()` is called when *stop* document
is received.
If False, the caller is responsible for calling `write_scan()`
before the next *start* document is received.
RE : instance of bluesky.RunEngine or None
reset_scan_id : boolean, optional
If True, and filename exists, then sets RE.md.scan_id to
highest scan number in existing SPEC data file.
default: False
User Interface methods
.. autosummary::
~receiver
~newfile
~usefile
~make_default_filename
~clear
~prepare_scan_contents
~write_scan
Internal methods
.. autosummary::
~write_header
~start
~descriptor
~event
~bulk_events
~datum
~resource
~stop
"""
def __init__(self, filename=None, auto_write=True, RE=None, reset_scan_id=False):
self.clear()
self.buffered_comments = self._empty_comments_dict()
self.spec_filename = filename
self.auto_write = auto_write
self.uid_short_length = 8
self.write_file_header = False
self.spec_epoch = None # for both #E & #D line in header, also offset for all scans
self.spec_host = None
self.spec_user = None
self._datetime = None # most recent document time as datetime object
self._streams = {} # descriptor documents, keyed by uid
self.RE = RE
self.reset_scan_id = reset_scan_id
if filename is None or not os.path.exists(filename):
self.newfile(filename)
else:
max_scan_id = self.usefile(filename)
if RE is not None and reset_scan_id:
RE.md["scan_id"] = max_scan_id
[docs] def clear(self):
"""reset all scan data defaults"""
self.uid = None
self.scan_epoch = None # absolute epoch to report in scan #D line
self.time = None # full time from document
self.comments = self._empty_comments_dict()
self.data = OrderedDict() # data in the scan
self.detectors = OrderedDict() # names of detectors in the scan
self.hints = OrderedDict() # why?
self.metadata = OrderedDict() # #MD lines in header
self.motors = OrderedDict() # names of motors in the scan
self.positioners = OrderedDict() # names in #O, values in #P
self.num_primary_data = 0
#
# note: for one scan, #O & #P information is not provided
# unless collecting baseline data
# wait for case with baseline data that needs #O/#P lines
#
self.columns = OrderedDict() # #L in scan
self.scan_command = None # #S line
self.scanning = False
def _empty_comments_dict(self):
return dict(
start=[],
event=[],
descriptor=[],
resource=[],
datum=[],
stop=[])
def _cmt(self, key, text):
"""enter a comment"""
dt = self._datetime or datetime.now()
ts = datetime.strftime(dt, SPEC_TIME_FORMAT)
if self.scanning:
dest = self.comments
else:
dest = self.buffered_comments
dest[key].append(f"{ts}. {text}")
[docs] def receiver(self, key, document):
"""BlueSky callback: receive all documents for handling"""
xref = dict(
start = self.start,
descriptor = self.descriptor,
event = self.event,
bulk_events = self.bulk_events,
datum = self.datum,
resource = self.resource,
stop = self.stop,
)
logger = logging.getLogger(__name__)
if key in xref:
uid = document.get("uid") or document.get("datum_id")
logger.debug("%s document, uid=%s", key, str(uid))
ts = document.get("time")
if ts is None:
ts = datetime.now()
else:
ts = datetime.fromtimestamp(document["time"])
self._datetime = ts
xref[key](document)
else:
msg = f"custom_callback encountered: {key} : {document}"
# raise ValueError(msg)
logger.warning(msg)
[docs] def start(self, doc):
"""handle *start* documents"""
known_properties = """
uid time project sample scan_id group owner
detectors hints
plan_type plan_name plan_args
""".split()
self.clear()
self.scanning = True
self.uid = doc["uid"]
self._cmt("start", f"uid = {self.uid}")
for d, cl in self.buffered_comments.items():
# bring in any comments collected when not scanning
self.comments[d] += cl
self.buffered_comments = self._empty_comments_dict()
self.time = doc["time"]
self.scan_epoch = int(self.time)
self.scan_id = doc["scan_id"] or 0
# Which reference? fixed counting time or fixed monitor count?
# Can this be omitted?
self.T_or_M = None # for now
# self.T_or_M = "T" # TODO: how to get this from the document stream?
# self.T_or_M_value = 1
# self._cmt("start", "!!! #T line not correct yet !!!")
# metadata
for key in sorted(doc.keys()):
if key not in known_properties:
self.metadata[key] = doc[key]
# various dicts
for item in "detectors hints motors".split():
if item in doc:
obj = self.__getattribute__(item)
for key in doc.get(item):
obj[key] = None
cmt = "plan_type = " + doc["plan_type"]
ts = datetime.strftime(self._datetime, SPEC_TIME_FORMAT)
self.comments["start"].insert(0, f"{ts}. {cmt}")
self.scan_command = _rebuild_scan_command(doc)
[docs] def descriptor(self, doc):
"""
handle *descriptor* documents
prepare for primary scan data, ignore any other data stream
"""
# TODO: log descriptor documents by uid
# for reference from event and bulk_events documents
if doc["uid"] in self._streams:
fmt = "duplicate descriptor UID {} found"
raise KeyError(fmt.format(doc["uid"]))
# log descriptor documents by uid
# referenced by event and bulk_events documents
self._streams[doc["uid"]] = doc
if doc["name"] == "primary":
keyset = list(doc["data_keys"].keys())
# independent variable(s) first, dependent variable(s) last, others in middle
first_keys = [k for k in self.motors if k in keyset]
last_keys = []
for d in self.detectors:
if d in doc["hints"]: # seems general but first place to look if trouble
last_keys += doc["hints"][d]["fields"]
# get remaining keys from keyset
middle_keys = [k for k in keyset if k not in first_keys + last_keys]
epoch_keys = "Epoch_float Epoch".split()
self.data.update({k: [] for k in first_keys+epoch_keys+middle_keys+last_keys})
[docs] def event(self, doc):
"""
handle *event* documents
"""
stream_doc = self._streams.get(doc["descriptor"])
if stream_doc is None:
fmt = "descriptor UID {} not found"
raise KeyError(fmt.format(doc["descriptor"]))
if stream_doc["name"] == "primary":
for k in doc["data"].keys():
if k not in self.data.keys():
msg = f"unexpected failure here, key {k} not found"
raise KeyError(msg)
#return # not our expected event data
for k in self.data.keys():
if k == "Epoch":
v = int(doc["time"] - self.time + 0.5)
elif k == "Epoch_float":
v = doc["time"] - self.time
else:
v = doc["data"][k]
self.data[k].append(v)
self.num_primary_data += 1
[docs] def bulk_events(self, doc):
"""handle *bulk_events* documents"""
pass
[docs] def datum(self, doc):
"""handle *datum* documents"""
self._cmt("datum", "datum " + str(doc))
[docs] def resource(self, doc):
"""handle *resource* documents"""
self._cmt("resource", "resource " + str(doc))
[docs] def stop(self, doc):
"""handle *stop* documents"""
if "num_events" in doc:
for k, v in doc["num_events"].items():
self._cmt("stop", f"num_events_{k} = {v}")
if "exit_status" in doc:
self._cmt("stop", "exit_status = " + doc["exit_status"])
else:
self._cmt("stop", "exit_status = not available")
if self.auto_write:
self.write_scan()
self.scanning = False
[docs] def prepare_scan_contents(self):
"""
format the scan for a SPEC data file
:returns: [str] a list of lines to append to the data file
"""
dt = datetime.fromtimestamp(self.scan_epoch)
lines = []
lines.append("")
lines.append("#S " + self.scan_command)
lines.append("#D " + datetime.strftime(dt, SPEC_TIME_FORMAT))
if self.T_or_M is not None:
lines.append(f"#{self.T_or_M} {self.T_or_M_value}")
for v in self.comments["start"]:
#C Wed Feb 03 16:51:38 2016. do ./usaxs.mac.
lines.append("#C " + v) # TODO: add time/date stamp as SPEC does
for v in self.comments["descriptor"]:
lines.append("#C " + v)
for k, v in self.metadata.items():
# "#MD" is our ad hoc SPEC data tag
lines.append(f"#MD {k} = {v}")
lines.append("#N " + str(self.num_primary_data))
if len(self.data.keys()) > 0:
lines.append("#L " + " ".join(self.data.keys()))
for i in range(self.num_primary_data):
str_data = OrderedDict()
s = []
for k in self.data.keys():
datum = self.data[k][i]
if isinstance(datum, str):
# SPEC scan data is expected to be numbers
# this is text, substitute the row number
# and report after this line in a #U line
str_data[k] = datum
datum = i
s.append(str(datum))
lines.append(" ".join(s))
for k in str_data.keys():
# report the text data
lines.append(f"#U {i} {k} {str_data[k]}")
else:
lines.append("#C no data column labels identified")
for v in self.comments["event"]:
lines.append("#C " + v)
for v in self.comments["resource"]:
lines.append("#C " + v)
for v in self.comments["datum"]:
lines.append("#C " + v)
for v in self.comments["stop"]:
lines.append("#C " + v)
return lines
def _write_lines_(self, lines, mode="a"):
"""write (more) lines to the file"""
with open(self.spec_filename, mode) as f:
f.write("\n".join(lines))
[docs] def write_scan(self):
"""
write the most recent (completed) scan to the file
* creates file if not existing
* writes header if needed
* appends scan data
note: does nothing if there are no lines to be written
"""
if os.path.exists(self.spec_filename):
with open(self.spec_filename) as f:
buf = f.read()
if buf.find(self.uid) >= 0:
# raise exception if uid is already in the file!
msg = f"{self.spec_filename} already contains uid={self.uid}"
raise ValueError(msg)
logger = logging.getLogger(__name__)
lines = self.prepare_scan_contents()
lines.append("")
if lines is not None:
if self.write_file_header:
self.write_header()
logger.info("wrote header to SPEC file: %s", self.spec_filename)
self._write_lines_(lines, mode="a")
logger.info("wrote scan %d to SPEC file: %s", self.scan_id, self.spec_filename)
[docs] def make_default_filename(self):
"""generate a file name to be used as default"""
now = datetime.now()
return datetime.strftime(now, "%Y%m%d-%H%M%S")+".dat"
[docs] def newfile(self, filename=None, scan_id=None, RE=None):
"""
prepare to use a new SPEC data file
but don't create it until we have data
"""
self.clear()
filename = filename or self.make_default_filename()
if os.path.exists(filename):
ValueError(f"file {filename} exists")
self.spec_filename = filename
self.spec_epoch = int(time.time()) # ! no roundup here!!!
self.spec_host = socket.gethostname() or 'localhost'
self.spec_user = getpass.getuser() or 'BlueSkyUser'
self.write_file_header = True # don't write the file yet
# backwards-compatibility
if scan_id == True:
scan_id = SCAN_ID_RESET_VALUE
elif scan_id == False:
scan_id = None
if scan_id is not None and RE is not None:
# assume isinstance(RE, bluesky.run_engine.RunEngine)
RE.md["scan_id"] = scan_id
print(f"scan ID set to {scan_id}")
return self.spec_filename
[docs] def usefile(self, filename):
"""read from existing SPEC data file"""
if not os.path.exists(self.spec_filename):
IOError(f"file {filename} does not exist")
scan_id = None
with open(filename, "r") as f:
key = "#F"
line = f.readline().strip()
if not line.startswith(key+" "):
raise ValueError(f"first line does not start with {key}")
key = "#E"
line = f.readline().strip()
if not line.startswith(key+" "):
raise ValueError(f"first line does not start with {key}")
epoch = int(line.split()[-1])
key = "#D"
line = f.readline().strip()
if not line.startswith(key+" "):
raise ValueError("first line does not start with "+key)
# ignore content, it is derived from #E line
key = "#C"
line = f.readline().strip()
if not line.startswith(key+" "):
raise ValueError("first line does not start with "+key)
p = line.split()
username = "BlueSkyUser"
if len(p) > 4 and p[2] == "user":
username = p[4]
# find the highest scan number used
key = "#S"
scan_ids = []
for line in f.readlines():
if line.startswith(key+" ") and len(line.split())>1:
scan_id = int(line.split()[1])
scan_ids.append(scan_id)
scan_id = max(scan_ids)
self.spec_filename = filename
self.spec_epoch = epoch
self.spec_user = username
return scan_id