Source code for apstools.utils

"""
Various utilities

.. autosummary::
   
   ~cleanupText
   ~connect_pvlist
   ~EmailNotifications
   ~ExcelDatabaseFileBase
   ~ExcelDatabaseFileGeneric
   ~ipython_profile_name
   ~pairwise
   ~print_snapshot_list
   ~text_encode
   ~to_unicode_or_bust
   ~trim_string_for_EPICS
   ~unix_cmd

"""

#-----------------------------------------------------------------------------
# :author:    Pete R. Jemian
# :email:     jemian@anl.gov
# :copyright: (c) 2017-2019, UChicago Argonne, LLC
#
# Distributed under the terms of the Creative Commons Attribution 4.0 International Public License.
#
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------

from collections import OrderedDict
import datetime
from email.mime.text import MIMEText
import logging
import math
import os
import pandas
import pyRestTable
import re
import smtplib
import subprocess
import time

from .plans import run_in_thread


logger = logging.getLogger(__name__)

MAX_EPICS_STRINGOUT_LENGTH = 40


[docs]def cleanupText(text): """ convert text so it can be used as a dictionary key Given some input text string, return a clean version remove troublesome characters, perhaps other cleanup as well. This is best done with regular expression pattern matching. """ pattern = "[a-zA-Z0-9_]" def mapper(c): if re.match(pattern, c) is not None: return c return "_" return "".join([mapper(c) for c in text])
[docs]def pairwise(iterable): """ break a list (or other iterable) into pairs :: s -> (s0, s1), (s2, s3), (s4, s5), ... In [71]: for item in pairwise("a b c d e fg".split()): ...: print(item) ...: ('a', 'b') ('c', 'd') ('e', 'fg') """ a = iter(iterable) return zip(a, a)
[docs]def text_encode(source): """encode ``source`` using the default codepoint""" return source.encode(errors='ignore')
[docs]def to_unicode_or_bust(obj, encoding='utf-8'): """from: http://farmdev.com/talks/unicode/""" if isinstance(obj, str): if not isinstance(obj, str): obj = str(obj, encoding) return obj
[docs]def trim_string_for_EPICS(msg): """string must not be too long for EPICS PV""" if len(msg) > MAX_EPICS_STRINGOUT_LENGTH: msg = msg[:MAX_EPICS_STRINGOUT_LENGTH] return msg
[docs]def unix_cmd(command_list): """run a UNIX command, returns (stdout, stderr)""" process = subprocess.Popen(command_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() return stdout, stderr
[docs]def connect_pvlist(pvlist, wait=True, timeout=2, poll_interval=0.1): """ given a list of EPICS PV names, return a dictionary of EpicsSignal objects PARAMETERS pvlist : list(str) list of EPICS PV names wait : bool should wait for EpicsSignal objects to connect, default: True timeout : float maximum time to wait for PV connections, seconds, default: 2.0 poll_interval : float time to sleep between checks for PV connections, seconds, default: 0.1 """ from ophyd import EpicsSignal obj_dict = OrderedDict() for item in pvlist: if len(item.strip()) == 0: continue pvname = item.strip() oname = "signal_{}".format(len(obj_dict)) obj = EpicsSignal(pvname, name=oname) obj_dict[oname] = obj if wait: times_up = time.time() + min(0, timeout) poll_interval = min(0.01, poll_interval) waiting = True while waiting and time.time() < times_up: time.sleep(poll_interval) waiting = False in [o.connected for o in obj_dict.values()] if waiting: n = OrderedDict() for k, v in obj_dict.items(): if v.connected: n[k] = v else: print(f"Could not connect {v.pvname}") if len(n) == 0: raise RuntimeError("Could not connect any PVs in the list") obj_dict = n return obj_dict
[docs]class EmailNotifications(object): """ send email notifications when requested use default OS mail utility (so no credentials needed) """ def __init__(self, sender=None): self.addresses = [] self.notify_on_feedback = True self.sender = sender or "nobody@localhost" self.smtp_host = "localhost" def add_addresses(self, *args): for address in args: self.addresses.append(address) @run_in_thread def send(self, subject, message): """send ``message`` to all addresses""" msg = MIMEText(message) msg['Subject'] = subject msg['From'] = self.sender msg['To'] = ",".join(self.addresses) s = smtplib.SMTP(self.smtp_host) s.sendmail(self.sender, self.addresses, msg.as_string()) s.quit()
[docs]class ExcelDatabaseFileBase(object): """ base class: read-only support for Excel files, treat them like databases EXAMPLE Show how to read an Excel file where one of the columns contains a unique key. This allows for random access to each row of data by use of the *key*. :: class ExhibitorsDB(ExcelDatabaseFileBase): ''' content for Exhibitors, vendors, and Sponsors from the Excel file ''' EXCEL_FILE = os.path.join("resources", "exhibitors.xlsx") LABELS_ROW = 2 def handle_single_entry(self, entry): '''any special handling for a row from the Excel file''' pass def handleExcelRowEntry(self, entry): '''identify the unique key for this entry (row of the Excel file)''' key = entry["Name"] self.db[key] = entry """ EXCEL_FILE = None # subclass MUST define # EXCEL_FILE = os.path.join("abstracts", "index of abstracts.xlsx") LABELS_ROW = 3 # labels are on line LABELS_ROW+1 in the Excel file def __init__(self): self.db = OrderedDict() self.data_labels = None if self.EXCEL_FILE is None: raise ValueError("subclass must define EXCEL_FILE") self.fname = os.path.join(os.getcwd(), self.EXCEL_FILE) self.parse() def handle_single_entry(self, entry): # subclass MUST override raise NotImplementedError("subclass must override handle_single_entry() method") def handleExcelRowEntry(self, entry): # subclass MUST override raise NotImplementedError("subclass must override handleExcelRowEntry() method") def parse(self, labels_row_num=None, data_start_row_num=None): labels_row_num = labels_row_num or self.LABELS_ROW xl = pandas.read_excel(self.fname, sheet_name=0, header=None) self.data_labels = list(xl.iloc[labels_row_num,:]) data_start_row_num = data_start_row_num or labels_row_num+1 grid = xl.iloc[data_start_row_num:,:] # grid is a pandas DataFrame # logger.info(type(grid)) # logger.info(grid.iloc[:,1]) for row_number, _ignored in enumerate(grid.iloc[:,0]): row_data = grid.iloc[row_number,:] entry = {} for _col, label in enumerate(self.data_labels): entry[label] = self._getExcelColumnValue(row_data, _col) self.handle_single_entry(entry) self.handleExcelRowEntry(entry) def _getExcelColumnValue(self, row_data, col): v = row_data.values[col] if self._isExcel_nan(v): v = None else: v = to_unicode_or_bust(v) if isinstance(v, str): v = v.strip() return v def _isExcel_nan(self, value): if not isinstance(value, float): return False return math.isnan(value)
[docs]class ExcelDatabaseFileGeneric(ExcelDatabaseFileBase): """ Generic (read-only) handling of Excel spreadsheet-as-database Table labels are given on Excel row ``N``, ``self.labels_row = N-1`` """ def __init__(self, filename, labels_row=3): self._index_ = 0 self.EXCEL_FILE = self.EXCEL_FILE or filename self.LABELS_ROW = labels_row ExcelDatabaseFileBase.__init__(self) def handle_single_entry(self, entry): pass
[docs] def handleExcelRowEntry(self, entry): """use row number as the unique key""" key = str(self._index_) self.db[key] = entry self._index_ += 1
[docs]def ipython_profile_name(): """ return the name of the current ipython profile or `None` Example (add to default RunEngine metadata):: RE.md['ipython_profile'] = str(ipython_profile_name()) print("using profile: " + RE.md['ipython_profile']) """ from IPython import get_ipython return get_ipython().profile