Source code for pyrolite.util.general

import datetime
import operator
import os
import shutil
import time
from collections.abc import Mapping
from pathlib import Path
from tempfile import mkdtemp

from .log import Handle

logger = Handle(__name__)

_FLAG_FIRST = object()


[docs]class Timewith: def __init__(self, name=""): """Timewith context manager.""" self.name = name self.start = time.time() self.checkpoints = [] @property def elapsed(self): return time.time() - self.start
[docs] def checkpoint(self, name=""): elapsed = self.elapsed msg = "{time} {timer}: {checkpoint} in {elapsed:.3f} s.".format( timer=self.name, time=datetime.datetime.now().strftime("%H:%M:%S"), checkpoint=name, elapsed=elapsed, ).strip() logger.info(msg) self.checkpoints.append((name, elapsed))
def __enter__(self): """Object returned on entry.""" return self def __exit__(self, exittype, value, traceback): """Code to execute on exit.""" self.checkpoint("Finished") self.checkpoints.append(("Finished", self.elapsed))
[docs]def temp_path(suffix=""): """Return the path of a temporary directory.""" directory = mkdtemp(suffix=suffix) return Path(directory)
[docs]def flatten_dict(d, climb=False, safemode=False): """ Flattens a nested dictionary containing only string keys. This will work for dictionaries which don't have two equivalent keys at the same level. If you're worried about this, use safemode=True. Partially taken from https://stackoverflow.com/a/6043835. Parameters ---------- climb: :class:`bool`, :code:`False` Whether to keep trunk or leaf-values, for items with the same key. safemode: :class:`bool`, :code:`True` Whether to keep all keys as a tuple index, to avoid issues with conflicts. Returns ------- :class:`dict` Flattened dictionary. """ lift = lambda x: (x,) join = operator.add results = [] def visit(subdict, results, partialKey): for k, v in subdict.items(): if partialKey == _FLAG_FIRST: newKey = lift(k) else: newKey = join(partialKey, lift(k)) if isinstance(v, Mapping): visit(v, results, newKey) else: results.append((newKey, v)) visit(d, results, _FLAG_FIRST) if safemode: pick_key = lambda keys: keys else: pick_key = lambda keys: keys[-1] sort = map( lambda x: x[:2], sorted([(pick_key(k), v, len(k)) for k, v in results], key=lambda x: x[-1]), ) # sorted by depth if not climb: # We go down the tree, and prioritise the trunk values items = sort else: # We prioritise the leaf values items = [i for i in sort][::-1] return dict(items)
[docs]def swap_item(startlist: list, pull: object, push: object): """ Swap a specified item in a list for another. Parameters ---------- startlist : :class:`list` List to replace item within. pull Item to replace in the list. push Item to add into the list. Returns ------- list """ return [[i, push][i == pull] for i in startlist]
[docs]def copy_file(src, dst, ext=None, permissions=None): """ Copy a file from one place to another. Uses the full filepath including name. Parameters ---------- src : :class:`str` | :class:`pathlib.Path` Source filepath. dst : :class:`str` | :class:`pathlib.Path` Destination filepath or directory. ext : :class:`str`, :code:`None` Optional file extension specification. """ src = Path(src) dst = Path(dst) if dst.is_dir(): dst = dst / src.name if ext is not None: src = src.with_suffix(ext) dst = dst.with_suffix(ext) logger.debug("Copying from {} to {}".format(src, dst)) with open(str(src), "rb") as fin: with open(str(dst), "wb") as fout: shutil.copyfileobj(fin, fout) if permissions is not None: os.chmod(str(dst), permissions)
[docs]def remove_tempdir(directory): """ Remove a specific directory, contained files and sub-directories. Parameters ---------- directory: str, Path Path to directory. """ directory = Path(directory) try: shutil.rmtree(str(directory)) assert not directory.exists() except PermissionError: pass