Source code for tensorpack.utils.utils

# -*- coding: utf-8 -*-
# File: utils.py


import inspect
import numpy as np
import re
import os
import sys
from contextlib import contextmanager
from datetime import datetime, timedelta
from tqdm import tqdm

from . import logger
from .concurrency import subproc_call

__all__ = ['change_env',
           'get_rng',
           'fix_rng_seed',
           'get_tqdm',
           'execute_only_once',
           'humanize_time_delta'
           ]


def humanize_time_delta(sec):
    """Humanize timedelta given in seconds

    Args:
        sec (float): time difference in seconds. Must be positive.

    Returns:
        str - time difference as a readable string

    Example:

    .. code-block:: python

        print(humanize_time_delta(1))                                   # 1 second
        print(humanize_time_delta(60 + 1))                              # 1 minute 1 second
        print(humanize_time_delta(87.6))                                # 1 minute 27 seconds
        print(humanize_time_delta(0.01))                                # 0.01 seconds
        print(humanize_time_delta(60 * 60 + 1))                         # 1 hour 1 second
        print(humanize_time_delta(60 * 60 * 24 + 1))                    # 1 day 1 second
        print(humanize_time_delta(60 * 60 * 24 + 60 * 2 + 60*60*9 + 3)) # 1 day 9 hours 2 minutes 3 seconds
    """
    if sec < 0:
        logger.warn("humanize_time_delta() obtains negative seconds!")
        return "{:.3g} seconds".format(sec)
    if sec == 0:
        return "0 second"
    time = datetime(2000, 1, 1) + timedelta(seconds=int(sec))
    units = ['day', 'hour', 'minute', 'second']
    vals = [int(sec // 86400), time.hour, time.minute, time.second]
    if sec < 60:
        vals[-1] = sec

    def _format(v, u):
        return "{:.3g} {}{}".format(v, u, "s" if v > 1 else "")

    ans = []
    for v, u in zip(vals, units):
        if v > 0:
            ans.append(_format(v, u))
    return " ".join(ans)


[docs]@contextmanager def change_env(name, val): """ Args: name(str): name of the env var val(str or None): the value, or set to None to clear the env var. Returns: a context where the environment variable ``name`` being set to ``val``. It will be set back after the context exits. """ oldval = os.environ.get(name, None) if val is None: try: del os.environ[name] except KeyError: pass else: os.environ[name] = val yield if oldval is None: try: del os.environ[name] except KeyError: pass else: os.environ[name] = oldval
_RNG_SEED = None
[docs]def fix_rng_seed(seed): """ Call this function at the beginning of program to fix rng seed within tensorpack. Args: seed (int): Note: See https://github.com/tensorpack/tensorpack/issues/196. Example: Fix random seed in both tensorpack and tensorflow. .. code-block:: python seed = 42 utils.fix_rng_seed(seed) tesnorflow.set_random_seed(seed) # run trainer """ global _RNG_SEED _RNG_SEED = int(seed)
[docs]def get_rng(obj=None): """ Get a good RNG seeded with time, pid and the object. Args: obj: some object to use to generate random seed. Returns: np.random.RandomState: the RNG. """ seed = (id(obj) + os.getpid() + int(datetime.now().strftime("%Y%m%d%H%M%S%f"))) % 4294967295 if _RNG_SEED is not None: seed = _RNG_SEED return np.random.RandomState(seed)
_EXECUTE_HISTORY = set() def execute_only_once(): """ Each called in the code to this function is guaranteed to return True the first time and False afterwards. Returns: bool: whether this is the first time this function gets called from this line of code. Example: .. code-block:: python if execute_only_once(): # do something only once """ f = inspect.currentframe().f_back ident = (f.f_code.co_filename, f.f_lineno) if ident in _EXECUTE_HISTORY: return False _EXECUTE_HISTORY.add(ident) return True def _pick_tqdm_interval(file): # Heuristics to pick a update interval for progress bar that's nice-looking for users. isatty = file.isatty() # Jupyter notebook should be recognized as tty. # Wait for https://github.com/ipython/ipykernel/issues/268 try: from ipykernel import iostream if isinstance(file, iostream.OutStream): isatty = True except ImportError: pass if isatty: return 0.5 else: # When run under mpirun/slurm, isatty is always False. # Here we apply some hacky heuristics for slurm. if 'SLURM_JOB_ID' in os.environ: if int(os.environ.get('SLURM_JOB_NUM_NODES', 1)) > 1: # multi-machine job, probably not interactive return 60 else: # possibly interactive, so let's be conservative return 15 if 'OMPI_COMM_WORLD_SIZE' in os.environ: return 60 # If not a tty, don't refresh progress bar that often return 180 def get_tqdm_kwargs(**kwargs): """ Return default arguments to be used with tqdm. Args: kwargs: extra arguments to be used. Returns: dict: """ default = dict( smoothing=0.5, dynamic_ncols=True, ascii=True, bar_format='{l_bar}{bar}|{n_fmt}/{total_fmt}[{elapsed}<{remaining},{rate_noinv_fmt}]' ) try: # Use this env var to override the refresh interval setting interval = float(os.environ['TENSORPACK_PROGRESS_REFRESH']) except KeyError: interval = _pick_tqdm_interval(kwargs.get('file', sys.stderr)) default['mininterval'] = interval default.update(kwargs) return default
[docs]def get_tqdm(*args, **kwargs): """ Similar to :func:`tqdm.tqdm()`, but use tensorpack's default options to have consistent style. """ return tqdm(*args, **get_tqdm_kwargs(**kwargs))
def find_library_full_path(name): """ Similar to `from ctypes.util import find_library`, but try to return full path if possible. """ from ctypes.util import find_library if os.name == "posix" and sys.platform == "darwin": # on Mac, ctypes already returns full path return find_library(name) def _use_proc_maps(name): """ Find so from /proc/pid/maps Only works with libraries that has already been loaded. But this is the most accurate method -- it finds the exact library that's being used. """ procmap = os.path.join('/proc', str(os.getpid()), 'maps') if not os.path.isfile(procmap): return None try: with open(procmap, 'r') as f: for line in f: line = line.strip().split(' ') sofile = line[-1] basename = os.path.basename(sofile) if 'lib' + name + '.so' in basename: if os.path.isfile(sofile): return os.path.realpath(sofile) except IOError: # can fail in certain environment (e.g. chroot) # if the pids are incorrectly mapped pass # The following two methods come from https://github.com/python/cpython/blob/master/Lib/ctypes/util.py def _use_ld(name): """ Find so with `ld -lname -Lpath`. It will search for files in LD_LIBRARY_PATH, but not in ldconfig. """ cmd = "ld -t -l{} -o {}".format(name, os.devnull) ld_lib_path = os.environ.get('LD_LIBRARY_PATH', '') for d in ld_lib_path.split(':'): cmd = cmd + " -L " + d result, ret = subproc_call(cmd + '|| true') expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name) res = re.search(expr, result.decode('utf-8')) if res: res = res.group(0) if not os.path.isfile(res): return None return os.path.realpath(res) def _use_ldconfig(name): """ Find so in `ldconfig -p`. It does not handle LD_LIBRARY_PATH. """ with change_env('LC_ALL', 'C'), change_env('LANG', 'C'): ldconfig, ret = subproc_call("ldconfig -p") ldconfig = ldconfig.decode('utf-8') if ret != 0: return None expr = r'\s+(lib%s\.[^\s]+)\s+\(.*=>\s+(.*)' % (re.escape(name)) res = re.search(expr, ldconfig) if not res: return None else: ret = res.group(2) return os.path.realpath(ret) if sys.platform.startswith('linux'): return _use_proc_maps(name) or _use_ld(name) or _use_ldconfig(name) or find_library(name) return find_library(name) # don't know what to do