Source code for k1lib._basics

# AUTOGENERATED FILE! PLEASE DON'T EDIT
import torch, logging, warnings, os, time, re, json, k1lib
import numpy as np, matplotlib.pyplot as plt, matplotlib as mpl
from typing import Any, List, Union, Tuple, Iterator, Dict
from functools import partial
__all__ = ["_docsUrl",
           "textToHtml", "clearLine", "isNumeric", "close",
           "patch", "wrapMod", "wraps", "squeeze", "raiseEx",
           "numDigits", "limitLines",
           "limitChars", "showLog", "cleanDiv", "graph", "digraph",
           "beep", "beepOnAvailable", "dontWrap",
           "debounce", "scaleSvg", "now", "sameStorage"]
_docsUrl = "https://k1lib.github.io"
[docs]def textToHtml(text:str) -> str: """Transform a string so that it looks the same on browsers as in `print()`""" return text.replace("\n", "<br>").replace(" ", "&nbsp;")
[docs]def clearLine(): """Prints a character that clears the current line""" print("\r" + " "*80 + "\r", end="")
[docs]def isNumeric(x) -> bool: """Returns whether object is actually a number""" return isinstance(x, (int, float, np.number))
[docs]def close(a, b): """Returns whether 2 values are really close to each other""" if isNumeric(a) and isNumeric(b): return abs(a - b) < 1e-6 return torch.allclose(torch.tensor(a), torch.tensor(b))
[docs]def patch(_class:type, name:str=None, docs:Union[str, Any]=None, static=False): """Patches a function to a class/object. :param _class: object to patch function. Can also be a type :param name: name of patched function, if different from current :param docs: docs of patched function. Can be object with defined __doc__ attr :param static: whether to wrap this inside :class:`staticmethod` or not :return: modified function just before patching Intended to be used like this:: class A: def methA(self): return "inside methA" @k1lib.patch(A) def methB(self): return "inside methB" a = A() a.methB() # returns "inside methB" You can do ``@property`` attributes like this:: class A: pass @k1lib.patch(A, "propC") @property def propC(self): return self._propC @k1lib.patch(A, "propC") @propC.setter def propC(self, value): self._propC = value a = A(); a.propC = "abc" a.propC # returns "abc" The attribute name unfortunately has to be explicitly declared, as I can't really find a way to extract the original name. You can also do static methods like this:: class A: pass @k1lib.patch(A, static=True) def staticD(arg1): return arg1 A.staticD("def") # returns "def" """ def inner(function): _docs = docs if _docs is not None and not isinstance(_docs, str): _docs = _docs.__doc__ _docs = _docs or function.__doc__ or _class.__doc__ _name = name or function.__qualname__.split(".")[-1] _function = staticmethod(function) if static else function _function.__doc__ = _docs; setattr(_class, _name, _function); return _function return inner
[docs]class wrapMod:
[docs] def __init__(self, m, moduleName=None): """Wraps around a module, and only suggest symbols in __all__ list defined inside the module. Example:: from . import randomModule randomModule = wrapMod(randomModule) :param m: the imported module :param moduleName: optional new module name for elements (their __module__ attr)""" if moduleName is not None: for v in m.__dict__.values(): v.__module__ = moduleName self._wrapMod_m = m self.__dict__.update(m.__dict__) self._wrapMod_extraDirs = []
def __dir__(self): return self._wrapMod_m.__all__ + self._wrapMod_extraDirs def __str__(self): return str(self._wrapMod_m) def __repr__(self): return str(self)
[docs]def wraps(ogF): """Kinda like :meth:`functools.wraps`, but don't update __annotations__.""" def inner(f): f.__doc__ = ogF.__doc__ f.__name__ = ogF.__name__ f.__qualname__ = ogF.__qualname__ f.__module__ = ogF.__module__ return f return inner
[docs]def squeeze(_list:Union[list, tuple, torch.Tensor, Any], hard=False): """If list only has 1 element, returns that element, else returns original list. :param hard: If True, then if list/tuple, filters out None, and takes the first element out even if that list/tuple has more than 1 element""" if isinstance(_list, (tuple, list)): if hard: return [e for e in _list if e != None and e != ""][0] elif len(_list) == 1: return _list[0] if isinstance(_list, torch.Tensor): return _list.squeeze() return _list
[docs]def raiseEx(ex:Exception): """Raises a specific exception. May be useful in lambdas""" raise ex
[docs]def numDigits(num) -> int: """Get the number of digits/characters of this number/object""" return len(f"{num}")
[docs]def limitLines(s:str, limit:int=10) -> str: """If input string is too long, truncates it and adds ellipsis""" splits = s.split("\n") if len(splits) > limit: return "\n".join(splits[:limit]) + "\n....." else: return s
[docs]def limitChars(s:str, limit:int=50): """If input string is too long, truncates to first `limit` characters of the first line""" if s is None: return "" s = f"{s}".split("\n")[0] return s[:limit-3] + "..." if len(s) > limit else s
[docs]def showLog(loggerName:str="", level:int=logging.DEBUG): """Prints out logs of a particular logger at a particular level""" logger = logging.getLogger(loggerName); logger.setLevel(level) sh = logging.StreamHandler(); sh.setLevel(logging.DEBUG); logger.addHandler(sh)
def cleanDiv(_list:List[float], total:int) -> List[int]: """Cleanly divides :total: into int segments with weights specified by :_list:. Looks like this: ``(_list / _list.sum()) * total``, but everything is an ``int``""" _list = np.array(_list) _list = (_list*total/_list.sum()).astype(int) _list[-1] = total - _list[:-1].sum() return _list try: import graphviz
[docs] def digraph(): """Convenience method for creating a new graphviz Digraph. Example:: g = k1lib.graph() g("a", "b", "c") g # displays arrows from "a" to "b" and "a" to "c" """ return graphviz.Digraph(graph_attr={"rankdir":"TB"})
[docs] def graph(): """Convenience method for creating a new graphviz Graph. See also: :meth:`digraph`""" return graphviz.Graph(graph_attr={"rankdir":"TB"})
except ImportError: digraph = graph = lambda: print("Module `graphviz` not found! Please install it first, something like `pip install graphviz`")
[docs]def beep(seconds=0.3): """Plays a beeping sound, may be useful as notification for long-running tasks""" try: import IPython; IPython.core.display.display_html(IPython.display.HTML(f"""<script>(new Audio('data:audio/wav;base64,UklGRl9vT19XQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YU'+Array(Math.round(3.3333e3*{seconds})).join(123))).play();</script>""")); except: os.system("printf '\a'")
def beepOnAvailable(url:str, timeout=5, **kwargs): """Tries to connect with a url repeatedly, and if successful, plays a beep sound""" import requests try: while True: time.sleep(1); successful = False try: if requests.get(url, timeout=timeout, **kwargs).ok: successful = True except: pass if successful: beep(); break except KeyboardInterrupt: print("Still not available")
[docs]def dontWrap(): """Don't wrap horizontally when in a notebook. Normally, if you're displaying something long, like the output of ``print('a'*1000)`` in a notebook, it will display it in multiple lines. This may be undesirable, so this solves that by displaying some HTML with css styles so that the notebook doesn't wrap.""" try: from IPython.core.display import display, HTML display(HTML("""<style> div.jp-OutputArea-output pre {white-space: pre;} div.output_area pre {white-space: pre;} div.CodeMirror > div.highlight {overflow-y: auto;} </style>""")) except: print("Can't run dontWrap()")
import asyncio, functools from threading import Timer as ThreadingTimer class AsyncTimer: # rename if want to use def __init__(self, timeout, callback): self._timeout = timeout; self._callback = callback async def _job(self): await asyncio.sleep(self._timeout); self._callback() def start(self): self._task = asyncio.ensure_future(self._job()) def cancel(self): self._task.cancel()
[docs]def debounce(wait, threading=False): """Decorator that will postpone a function's execution until after ``wait`` seconds have elapsed since the last time it was invoked. Taken from `ipywidgets <https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20Events.html>`_. Example:: import k1lib, time; value = 0 @k1lib.debounce(0.5, True) def f(x): global value; value = x**2 f(2); time.sleep(0.3); f(3) print(value) # prints "0" time.sleep(0.7) print(value) # prints "9" :param wait: wait time in seconds :param threading: if True, use multiple threads, else just use async stuff""" Timer = ThreadingTimer if threading else AsyncTimer def decorator(fn): timer = None def debounced(*args, **kwargs): nonlocal timer if timer is not None: timer.cancel() timer = Timer(wait, lambda: fn(*args, **kwargs)) timer.start() functools.update_wrapper(debounced, fn); return debounced return decorator
[docs]def scaleSvg(svg:str, scale:float=None) -> str: """Scales an svg xml string by some amount.""" if scale is None: scale = k1lib.settings.svgScale wS = w = re.findall("width=\"\d*pt\"", svg)[0] hS = w = re.findall("height=\"\d*pt\"", svg)[0] w = int(int(re.findall("\d+", wS)[0])*scale) h = int(int(re.findall("\d+", hS)[0])*scale) svg = re.sub(wS, f'width="{w}pt"', svg) svg = re.sub(hS, f'height="{h}pt"', svg) return svg
try: from scipy import stats __all__.append("pValue")
[docs] def pValue(zScore): """2-sided p value of a particular z score. Requires :mod:`scipy`.""" return stats.norm.sf(abs(zScore))*2
except: pass import datetime
[docs]def now(): """Convenience function for returning a simple time string, with timezone and whatnot.""" return datetime.datetime.now().astimezone().isoformat()
now()
[docs]def sameStorage(a, b): """Check whether 2 (:class:`np.ndarray` or :class:`torch.Tensor`) has the same storage or not. Example:: a = np.linspace(2, 3, 50) # returns True k1lib.sameStorage(a, a[:5]) # returns True k1lib.sameStorage(a[:10], a[:5]) returns false k1lib.sameStorage(a[:10], np.linspace(3, 4)) All examples above should work with PyTorch tensors as well.""" if isinstance(a, torch.Tensor) and isinstance(b, torch.Tensor): return a.data_ptr() == b.data_ptr() if isinstance(a, np.ndarray) and isinstance(b, np.ndarray): return a.base is b or b.base is a or a.base is b.base return a is b