Source code for apstools.callbacks


"""
Callbacks that might be useful at the APS using BlueSky

.. autosummary::
   
   ~document_contents_callback
   ~DocumentCollectorCallback
   ~SnapshotReport

FILE WRITER CALLBACK

see :class:`SpecWriterCallback()`

"""

#-----------------------------------------------------------------------------
# :author:    Pete R. Jemian
# :email:     jemian@anl.gov
# :copyright: (c) 2017-2019, UChicago Argonne, LLC
#
# Distributed under the terms of the Creative Commons Attribution 4.0 International Public License.
#
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------

import datetime
import logging
import pyRestTable
from bluesky.callbacks.core import CallbackBase


logger = logging.getLogger(__name__).addHandler(logging.NullHandler())


[docs]def document_contents_callback(key, doc): """ prints document contents -- use for diagnosing a document stream """ print(key) for k, v in doc.items(): print(f"\t{k}\t{v}")
[docs]class DocumentCollectorCallback(object): """ BlueSky callback to collect *all* documents from most-recent plan Will reset when it receives a *start* document. EXAMPLE:: from apstools.callbacks import DocumentCollector doc_collector = DocumentCollectorCallback() RE.subscribe(doc_collector.receiver) ... RE(some_plan()) print(doc_collector.uids) print(doc_collector.documents["stop"]) """ data_event_names = "descriptor event resource datum bulk_events".split() def __init__(self): self.documents = {} # key: name, value: document self.uids = [] # chronological list of UIDs as-received
[docs] def receiver(self, key, document): """keep all documents from recent plan in memory""" uid = document.get("uid") or document.get("datum_id") if "uid" is None: raise KeyError("No uid in '{}' document".format(key)) self.uids.append(uid) logger = logging.getLogger(__name__) logger.debug("%s document uid=%s", key, str(uid)) if key == "start": self.documents = {key: document} elif key in self.data_event_names: if key not in self.documents: self.documents[key] = [] self.documents[key].append(document) elif key == "stop": self.documents[key] = document print("exit status:", document["exit_status"]) for item in self.data_event_names: if item in self.documents: print( "# {}(s):".format(item), len(self.documents[item]) ) else: txt = "custom_callback encountered: %s\n%s" logger.warning(txt, key, document) if key not in self.documents: self.documents[key] = [] self.documents[key].append(document) return
class SnapshotReport(CallbackBase): """ show the data from a ``apstools.plans.snapshot()`` Find most recent snapshot between certain dates:: headers = db(plan_name="snapshot", since="2018-12-15", until="2018-12-21") h = list(headers)[0] # pick the first one, it's the most recent apstools.callbacks.SnapshotReport().print_report(h) Use as callback to a snapshot plan:: RE( apstools.plans.snapshot(ophyd_objects_list), apstools.callbacks.SnapshotReport() ) """ xref = None def start(self, doc): if doc.get("plan_name", "nope") == "snapshot": self.xref = {} # key=source, value=dict(value, iso8601 timestamp) else: self.xref = None def descriptor(self, doc): """ special case: the data is both in the descriptor AND the event docs due to the way our plan created it """ if self.xref is None: # not from a snapshot plan return # The only way we have a stream that is not "primary" # is when the snapshot has been made from python code. # The command line tool will not create additional streams. if doc["name"] == "primary": for k, v in doc["configuration"].items(): ts = v["timestamps"][k] dt = datetime.datetime.fromtimestamp(ts).isoformat().replace("T", " ") pvname = v["data_keys"][k]["source"] value = v["data"][k] self.xref[pvname] = dict(value=value, timestamp=dt) def stop(self, doc): if self.xref is None: # not from a snapshot plan return t = pyRestTable.Table() t.addLabel("timestamp") t.addLabel("source") t.addLabel("name") t.addLabel("value") for k, v in sorted(self.xref.items()): p = k.find(":") t.addRow((v["timestamp"], k[:p], k[p+1:], v["value"])) print(t) for k, v in sorted(doc.items()): print(f"{k}: {v}") def print_report(self, header): """ simplify the job of writing our custom data table method: play the entire document stream through this callback """ print() print("="*40) print("snapshot:", header.start["iso8601"]) print("="*40) print() for k, v in sorted(header.start.items()): print(f"{k}: {v}") print() for key, doc in header.documents(): self(key, doc) print()