Source code for apstools.utils.list_runs

"""
Directory of bluesky runs
+++++++++++++++++++++++++++++++++++++++

.. autosummary::

   ~getRunData
   ~getRunDataValue
   ~listRunKeys
   ~ListRuns
   ~listruns
   ~summarize_runs
"""

import databroker
import databroker._drivers.mongo_normalized
import databroker._drivers.msgpack
import databroker.queries
import dataclasses
import datetime
import logging
import pandas as pd
import pyRestTable
import time
import typing

from collections import defaultdict

from .query import db_query

from ._core import FIRST_DATA
from ._core import LAST_DATA
from ._core import MONGO_CATALOG_CLASSES


logger = logging.getLogger(__name__)


[docs]def getRunData(scan_id, db=None, stream="primary", query=None, use_v1=True): """ Convenience function to get the run's data. Default is the ``primary`` stream. PARAMETERS scan_id *int* or *str* : Scan (run) identifier. Positive integer value is ``scan_id`` from run's metadata. Negative integer value is since most recent run in databroker. String is run's ``uid`` unique identifier (can abbreviate to the first characters needed to assure it is unique). db *object* : Bluesky database, an instance of ``databroker.catalog``. Default: will search existing session for instance. stream *str* : Name of the bluesky data stream to obtain the data. Default: 'primary' query *dict* : mongo query dictionary, used to filter the results Default: ``{}`` see: https://docs.mongodb.com/manual/reference/operator/query/ use_v1 *bool* : Chooses databroker API version between 'v1' or 'v2'. Default: ``True`` (meaning use the v1 API) (new in apstools 1.5.1) """ from . import getCatalog cat = getCatalog(db) if query: cat = db_query(cat, query) stream = stream or "primary" if use_v1 is None or use_v1: run = cat.v1[scan_id] if stream in run.stream_names: return run.table(stream_name=stream) else: run = cat.v2[scan_id] if hasattr(run, stream): return run[stream].read().to_dataframe() raise AttributeError(f"No such stream '{stream}' in run '{scan_id}'.")
[docs]def getRunDataValue( scan_id, key, db=None, stream="primary", query=None, idx=-1, use_v1=True ): """ Convenience function to get value of key in run stream. Defaults are last value of key in primary stream. PARAMETERS scan_id *int* or *str* : Scan (run) identifier. Positive integer value is ``scan_id`` from run's metadata. Negative integer value is since most recent run in databroker. String is run's ``uid`` unique identifier (can abbreviate to the first characters needed to assure it is unique). key *str* : Name of the key (data column) in the table of the stream's data. Must match *identically*. db *object* : Bluesky database, an instance of ``databroker.catalog``. Default: will search existing session for instance. stream *str* : Name of the bluesky data stream to obtain the data. Default: 'primary' query *dict* : mongo query dictionary, used to filter the results Default: ``{}`` see: https://docs.mongodb.com/manual/reference/operator/query/ idx *int* or *str* : List index of value to be returned from column of table. Can be ``0`` for first value, ``-1`` for last value, ``"mean"`` for average value, or ``"all"`` for the full list of values. Default: ``-1`` use_v1 *bool* : Chooses databroker API version between 'v1' or 'v2'. Default: ``True`` (meaning use the v1 API) (new in apstools 1.5.1) """ if idx is None: idx = -1 try: _idx = int(idx) except ValueError: _idx = str(idx).lower() if isinstance(_idx, str) and _idx not in "all mean".split(): raise KeyError( f"Did not understand 'idx={idx}', use integer, 'all', or 'mean'." ) stream = stream or "primary" table = getRunData(scan_id, db=db, stream=stream, query=query) if key not in table: raise KeyError(f"'{key}' not found in scan {scan_id} stream '{stream}'.") data = table[key] if _idx == "all": return data.values elif _idx == "mean": return data.mean() elif (0 <= _idx < len(data)) or (_idx < 0): return data.values[_idx] raise KeyError( f"Cannot reference idx={idx} in scan {scan_id} stream'{stream}' key={key}." )
[docs]def listRunKeys( scan_id, key_fragment="", db=None, stream="primary", query=None, strict=False, use_v1=True, ): """ Convenience function to list all keys (column names) in the scan's stream (default: primary). PARAMETERS scan_id *int* or *str* : Scan (run) identifier. Positive integer value is ``scan_id`` from run's metadata. Negative integer value is since most recent run in databroker. String is run's ``uid`` unique identifier (can abbreviate to the first characters needed to assure it is unique). key_fragment *str* : Part or all of key name to be found in selected stream. For instance, if you specify ``key_fragment="lakeshore"``, it will return all the keys that include ``lakeshore``. db *object* : Bluesky database, an instance of ``databroker.catalog``. Default: will search existing session for instance. stream *str* : Name of the bluesky data stream to obtain the data. Default: 'primary' query *dict* : mongo query dictionary, used to filter the results Default: ``{}`` see: https://docs.mongodb.com/manual/reference/operator/query/ strict *bool* : Should the ``key_fragment`` be matched identically (``strict=True``) or matched by lower case comparison (``strict=False``)? Default: ``False`` use_v1 *bool* : Chooses databroker API version between 'v1' or 'v2'. Default: ``True`` (meaning use the v1 API) (new in apstools 1.5.1) """ table = getRunData(scan_id, db=db, stream=stream, query=query, use_v1=use_v1) # fmt: off if len(key_fragment): output = [ col for col in table.columns if ( (strict and key_fragment in col) or (not strict and key_fragment.lower() in col.lower()) ) ] else: output = list(table.columns) # fmt: on return output
[docs]@dataclasses.dataclass class ListRuns: """ List the runs from the given catalog according to some options. EXAMPLE:: ListRuns(cat).to_dataframe() PUBLIC METHODS .. autosummary:: ~to_dataframe ~to_table ~parse_runs INTERNAL METHODS .. autosummary:: ~_get_by_key ~_check_cat ~_apply_search_filters ~_check_keys """ cat: object = None query: dict = None keys: str = None missing: str = "" num: int = 20 reverse: bool = True since: str = None sortby: str = "time" timefmt: str = "%Y-%m-%d %H:%M:%S" until: str = None ids: "typing.Any" = None _default_keys = "scan_id time plan_name detectors" def _get_by_key(self, md, key): """ Get run's metadata value by key. Look in ``start`` document first. If not found, look in ``stop`` document. If not found, report using ``self.missing``. If ``key`` is found but value is None, report as ``self.missing``. The ``time`` key will be formatted by the ``self.timefmt`` value. See https://strftime.org/ for examples. The special ``timefmt="raw"`` is used to report time as the raw value (floating point time as used in python's ``time.time()``). A special syntax of ``key`` allows reporting of keys in other metadata subdictionaries. The syntax is ``doc.key`` (such as specifying the ``time`` key from the ``stop`` document: ``stop.time``) where a ``.`` is used to separate the subdictionary name (``doc``) from the ``key``. **Note**: It is not possible to ``sortby`` the dotted-key syntax at this time. """ v = None if key == "time": v = md["start"][key] if self.timefmt != "raw": ts = datetime.datetime.fromtimestamp(v) v = ts.strftime(self.timefmt) elif key in md["start"]: v = md["start"].get(key, self.missing) elif md["stop"] and key in md["stop"]: v = md["stop"].get(key, self.missing) elif len(key.split(".")) == 2: # dotted-key syntax doc, key = key.split(".") if md[doc] is not None: v = md[doc].get(key, self.missing) if key == "time" and self.timefmt != "raw": ts = datetime.datetime.fromtimestamp(v) v = ts.strftime(self.timefmt) return v or self.missing def _check_cat(self): from . import getCatalog if self.cat is None: self.cat = getCatalog() def _apply_search_filters(self): """Search for runs from the catalog.""" since = self.since or FIRST_DATA until = self.until or LAST_DATA self._check_cat() query = {} query.update(databroker.queries.TimeRange(since=since, until=until)) query.update(self.query or {}) cat = self.cat.v2.search(query) return cat
[docs] def parse_runs(self): """Parse the runs for the given metadata keys. Return a dict.""" self._check_keys() cat = self._apply_search_filters() def _sort(uid): """Sort runs in desired order based on metadata key.""" md = self.cat[uid].metadata for doc in "start stop".split(): if md[doc] and self.sortby in md[doc]: return md[doc][self.sortby] or self.missing return self.missing num_runs_requested = min(abs(self.num), len(cat)) results = {k: [] for k in self.keys} sequence = () # iterable of run uids if self.ids is not None: sequence = [] for k in self.ids: try: cat[k] # try to access the run using `k` sequence.append(k) except Exception as exc: logger.warning( "Could not find run %s in search of catalog %s: %s", k, self.cat.name, exc, ) else: if isinstance(cat, MONGO_CATALOG_CLASSES) and self.sortby == "time": if self.reverse: # the default rendering: from MongoDB in reverse time order sequence = iter(cat) else: # by increasing time order sequence = [uid for uid in cat][::-1] else: # full search in Python sequence = sorted(cat.keys(), key=_sort, reverse=self.reverse) count = 0 for uid in sequence: run = cat[uid] for k in self.keys: results[k].append(self._get_by_key(run.metadata, k)) count += 1 if count >= num_runs_requested: break return results
def _check_keys(self): """Check that self.keys is a list of strings.""" self.keys = self.keys or self._default_keys if isinstance(self.keys, str) and self.keys.find(" ") >= 0: # convert a space-delimited string of names self.keys = self.keys.split()
[docs] def to_dataframe(self): """Output as pandas DataFrame object""" dd = self.parse_runs() return pd.DataFrame(dd, columns=self.keys)
[docs] def to_table(self, fmt=None): """Output as pyRestTable object.""" dd = self.parse_runs() table = pyRestTable.Table() rows = [] for label, values in dd.items(): table.addLabel(label) rows.append(values) table.rows = list(zip(*rows)) return table.reST(fmt=fmt or "simple")
[docs]def listruns( cat=None, keys=None, missing="", num=20, printing="smart", reverse=True, since=None, sortby="time", tablefmt="dataframe", timefmt="%Y-%m-%d %H:%M:%S", until=None, ids=None, **query, ): """ List runs from catalog. This function provides a thin interface to the highly-reconfigurable ``ListRuns()`` class in this package. PARAMETERS cat *object* : Instance of databroker v1 or v2 catalog. keys *str* or *[str]* or None: Include these additional keys from the start document. (default: ``None`` means ``"scan_id time plan_name detectors"``) missing *str*: Test to report when a value is not available. (default: ``""``) ids *[int]* or *[str]*: List of ``uid`` or ``scan_id`` value(s). Can mix different kinds in the same list. Also can specify offsets (e.g., ``-1``). According to the rules for ``databroker`` catalogs, a string is a ``uid`` (partial representations allowed), an int is ``scan_id`` if positive or an offset if negative. (default: ``None``) num *int* : Make the table include the ``num`` most recent runs. (default: ``20``) printing *bool* or ``"smart"``: If ``True``, print the table to stdout. If ``"smart"``, then act as shown below. (default: ``True``) ================ =================== session action(s) ================ =================== python session print and return ``None`` Ipython console return ``DataFrame`` object Jupyter notebook return ``DataFrame`` object ================ =================== reverse *bool* : If ``True``, sort in descending order by ``sortby``. (default: ``True``) since *str* : include runs that started on or after this ISO8601 time (default: ``"1995-01-01"``) sortby *str* : Sort columns by this key, found by exact match in either the ``start`` or ``stop`` document. (default: ``"time"``) tablefmt *str* : When returning an object, specify which type of object to return. (default: ``"dataframe",``) ========== ============== value object ========== ============== dataframe ``pandas.DataFrame`` table ``str(pyRestTable.Table)`` ========== ============== timefmt *str* : The ``time`` key (also includes keys ``"start.time"`` and ``"stop.time"``) will be formatted by the ``self.timefmt`` value. See https://strftime.org/ for examples. The special ``timefmt="raw"`` is used to report time as the raw value (floating point time as used in python's ``time.time()``). (default: ``"%Y-%m-%d %H:%M:%S",``) until *str* : include runs that started before this ISO8601 time (default: ``2100-12-31``) ``**query`` *dict* : Any additional keyword arguments will be passed to the databroker to refine the search for matching runs using the ``mongoquery`` package. RETURNS object: ``None`` or ``str`` or ``pd.DataFrame()`` object EXAMPLE:: TODO (new in release 1.5.0) """ lr = ListRuns( cat=cat, keys=keys, missing=missing, num=num, reverse=reverse, since=since, sortby=sortby, timefmt=timefmt, until=until, ids=ids, ) tablefmt = tablefmt or "dataframe" if tablefmt == "dataframe": obj = lr.to_dataframe() else: obj = lr.to_table() if printing: if lr.cat is not None: print(f"catalog: {lr.cat.name}") print(obj) return return obj
[docs]def summarize_runs(since=None, db=None): """ Report bluesky run metrics from the databroker. * How many different plans? * How many runs? * How many times each run was used? * How frequently? (TODO:) PARAMETERS since *str* : Report all runs since this ISO8601 date & time (default: ``1995``) db *object* : Instance of ``databroker.Broker()`` (default: ``db`` from the IPython shell) """ from . import ipython_shell_namespace db = db or ipython_shell_namespace()["db"] # no APS X-ray experiment data before 1995! since = since or "1995" cat = db.v2.search(databroker.queries.TimeRange(since=since)) plans = defaultdict(list) t0 = time.time() for n, uid in enumerate(cat): t1 = time.time() # next step is very slow (0.01 - 0.5 seconds each!) run = cat[uid] t2 = time.time() plan_name = run.metadata["start"].get("plan_name", "unknown") # fmt:off dt = datetime.datetime.fromtimestamp( run.metadata["start"]["time"] ).isoformat() # fmt:on scan_id = run.metadata["start"].get("scan_id", "unknown") # fmt: off plans[plan_name].append( dict( plan_name=plan_name, dt=dt, time_start=dt, uid=uid, scan_id=scan_id, ) ) # fmt: on logger.debug( "%s %s dt1=%4.01fus dt2=%5.01fms %s", scan_id, dt, (t1 - t0) * 1e6, (t2 - t1) * 1e3, plan_name, ) t0 = time.time() def sorter(plan_name): return len(plans[plan_name]) table = pyRestTable.Table() table.labels = "plan quantity".split() for k in sorted(plans.keys(), key=sorter, reverse=True): table.addRow((k, sorter(k))) table.addRow(("TOTAL", n + 1)) print(table)
# ----------------------------------------------------------------------------- # :author: Pete R. Jemian # :email: jemian@anl.gov # :copyright: (c) 2017-2022, UChicago Argonne, LLC # # Distributed under the terms of the Creative Commons Attribution 4.0 International Public License. # # The full license is in the file LICENSE.txt, distributed with this software. # -----------------------------------------------------------------------------