Source code for k1lib._basics

# AUTOGENERATED FILE! PLEASE DON'T EDIT
import logging, warnings, os, time, re, json, k1lib, importlib, urllib.parse, math
import numpy as np, matplotlib.pyplot as plt, matplotlib as mpl
from typing import Any, List, Union, Tuple, Iterator, Dict
from functools import partial
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["_docsUrl", "isNumeric",
           "patch", "wrapMod", "wraps", "squeeze", "raiseEx",
           "numDigits", "limitLines",
           "limitChars", "showLog", "cleanDiv", "graph", "digraph",
           "beep", "beepOnAvailable", "dontWrap",
           "debounce", "scaleSvg", "now", "pushNotification", "dep", "ticks"]
_docsUrl = "https://k1lib.com"
[docs]def isNumeric(x) -> bool: """Returns whether object is actually a number""" return isinstance(x, (int, float, np.number))
[docs]def patch(_class:type, name:str=None, docs:Union[str, Any]=None, static=False): """Patches a function to a class/object. :param _class: object to patch function. Can also be a type :param name: name of patched function, if different from current :param docs: docs of patched function. Can be object with defined __doc__ attr :param static: whether to wrap this inside :class:`staticmethod` or not :return: modified function just before patching Intended to be used like this:: class A: def methA(self): return "inside methA" @k1lib.patch(A) def methB(self): return "inside methB" a = A() a.methB() # returns "inside methB" You can do ``@property`` attributes like this:: class A: pass @k1lib.patch(A, "propC") @property def propC(self): return self._propC @k1lib.patch(A, "propC") @propC.setter def propC(self, value): self._propC = value a = A(); a.propC = "abc" a.propC # returns "abc" The attribute name unfortunately has to be explicitly declared, as I can't really find a way to extract the original name. You can also do static methods like this:: class A: pass @k1lib.patch(A, static=True) def staticD(arg1): return arg1 A.staticD("def") # returns "def" """ def inner(function): _docs = docs if _docs is not None and not isinstance(_docs, str): _docs = _docs.__doc__ _docs = _docs or function.__doc__ or _class.__doc__ _name = name or function.__qualname__.split(".")[-1] _function = staticmethod(function) if static else function _function.__doc__ = _docs; setattr(_class, _name, _function); return _function return inner
[docs]class wrapMod:
[docs] def __init__(self, m, moduleName=None): """Wraps around a module, and only suggest symbols in __all__ list defined inside the module. Example:: from . import randomModule randomModule = wrapMod(randomModule) :param m: the imported module :param moduleName: optional new module name for elements (their __module__ attr)""" if moduleName is not None: for v in m.__dict__.values(): v.__module__ = moduleName self._wrapMod_m = m self.__dict__.update(m.__dict__) self._wrapMod_extraDirs = []
def __dir__(self): return self._wrapMod_m.__all__ + self._wrapMod_extraDirs def __str__(self): return str(self._wrapMod_m) def __repr__(self): return str(self)
[docs]def wraps(ogF): """Kinda like :meth:`functools.wraps`, but don't update __annotations__.""" def inner(f): f.__doc__ = ogF.__doc__ f.__name__ = ogF.__name__ f.__qualname__ = ogF.__qualname__ f.__module__ = ogF.__module__ return f return inner
[docs]def squeeze(_list:Union[list, tuple, Any], hard=False): """If list only has 1 element, returns that element, else returns original list. :param hard: If True, then if list/tuple, filters out None, and takes the first element out even if that list/tuple has more than 1 element""" if isinstance(_list, (tuple, list)): if hard: return [e for e in _list if e != None and e != ""][0] elif len(_list) == 1: return _list[0] if hasTorch and isinstance(_list, torch.Tensor): return _list.squeeze() return _list
[docs]def raiseEx(ex:Exception): """Raises a specific exception. May be useful in lambdas""" raise ex
[docs]def numDigits(num) -> int: """Get the number of digits/characters of this number/object""" return len(f"{num}")
[docs]def limitLines(s:str, limit:int=10) -> str: """If input string is too long, truncates it and adds ellipsis""" splits = s.split("\n") if len(splits) > limit: return "\n".join(splits[:limit]) + "\n....." else: return s
[docs]def limitChars(s:str, limit:int=50): """If input string is too long, truncates to first `limit` characters of the first line""" if s is None: return "" s = f"{s}".split("\n")[0] return s[:limit-3] + "..." if len(s) > limit else s
[docs]def showLog(loggerName:str="", level:int=logging.DEBUG): """Prints out logs of a particular logger at a particular level""" logger = logging.getLogger(loggerName); logger.setLevel(level) sh = logging.StreamHandler(); sh.setLevel(logging.DEBUG); logger.addHandler(sh)
def cleanDiv(_list:List[float], total:int) -> List[int]: """Cleanly divides :total: into int segments with weights specified by :_list:. Looks like this: ``(_list / _list.sum()) * total``, but everything is an ``int``""" _list = np.array(_list) _list = (_list*total/_list.sum()).astype(int) _list[-1] = total - _list[:-1].sum() return _list try: import graphviz
[docs] def digraph(): """Convenience method for creating a new graphviz Digraph. Example:: g = k1lib.graph() g("a", "b", "c") g # displays arrows from "a" to "b" and "a" to "c" """ return graphviz.Digraph(graph_attr={"rankdir":"TB"})
[docs] def graph(): """Convenience method for creating a new graphviz Graph. See also: :meth:`digraph`""" return graphviz.Graph(graph_attr={"rankdir":"TB"})
except ImportError: digraph = graph = lambda: print("Module `graphviz` not found! Please install it first, something like `pip install graphviz`")
[docs]def beep(seconds=0.3): """Plays a beeping sound, may be useful as notification for long-running tasks""" try: import IPython; IPython.core.display.display_html(IPython.display.HTML(f"""<script>(new Audio('data:audio/wav;base64,UklGRl9vT19XQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YU'+Array(Math.round(3.3333e3*{seconds})).join(123))).play();</script>""")); except: os.system("printf '\a'")
def beepOnAvailable(url:str, timeout=5, **kwargs): """Tries to connect with a url repeatedly, and if successful, plays a beep sound""" import requests try: while True: time.sleep(1); successful = False try: if requests.get(url, timeout=timeout, **kwargs).ok: successful = True except: pass if successful: beep(); break except KeyboardInterrupt: print("Still not available")
[docs]def dontWrap(): """Don't wrap horizontally when in a notebook. Normally, if you're displaying something long, like the output of ``print('a'*1000)`` in a notebook, it will display it in multiple lines. This may be undesirable, so this solves that by displaying some HTML with css styles so that the notebook doesn't wrap.""" try: from IPython.core.display import display, HTML display(HTML("""<style> div.jp-OutputArea-output pre {white-space: pre;} div.output_area pre {white-space: pre;} div.CodeMirror > div.highlight {overflow-y: auto;} </style>""")) except: print("Can't run dontWrap()")
import asyncio, functools from threading import Timer as ThreadingTimer class AsyncTimer: # rename if want to use def __init__(self, timeout, callback): self._timeout = timeout; self._callback = callback async def _job(self): await asyncio.sleep(self._timeout); self._callback() def start(self): self._task = asyncio.ensure_future(self._job()) def cancel(self): self._task.cancel()
[docs]def debounce(wait, threading=False): """Decorator that will postpone a function's execution until after ``wait`` seconds have elapsed since the last time it was invoked. Taken from `ipywidgets <https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20Events.html>`_. Example:: import k1lib, time; value = 0 @k1lib.debounce(0.5, True) def f(x): global value; value = x**2 f(2); time.sleep(0.3); f(3) print(value) # prints "0" time.sleep(0.7) print(value) # prints "9" :param wait: wait time in seconds :param threading: if True, use multiple threads, else just use async stuff""" Timer = ThreadingTimer if threading else AsyncTimer def decorator(fn): timer = None def debounced(*args, **kwargs): nonlocal timer if timer is not None: timer.cancel() timer = Timer(wait, lambda: fn(*args, **kwargs)) timer.start() functools.update_wrapper(debounced, fn); return debounced return decorator
[docs]def scaleSvg(svg:str, scale:float=None) -> str: """Scales an svg xml string by some amount.""" if scale is None: scale = k1lib.settings.svgScale wS = w = re.findall("width=\"\d*pt\"", svg)[0] hS = w = re.findall("height=\"\d*pt\"", svg)[0] w = int(int(re.findall("\d+", wS)[0])*scale) h = int(int(re.findall("\d+", hS)[0])*scale) svg = re.sub(wS, f'width="{w}pt"', svg) svg = re.sub(hS, f'height="{h}pt"', svg) return svg
try: from scipy import stats __all__.append("pValue")
[docs] def pValue(zScore): """2-sided p value of a particular z score. Requires :mod:`scipy`.""" return stats.norm.sf(abs(zScore))*2
except: pass import datetime
[docs]def now(): """Convenience function for returning a simple time string, with timezone and whatnot.""" return datetime.datetime.now().astimezone().isoformat()
now()
[docs]def pushNotification(title="Some title", content="Some content", url="https://k1lib.com"): """Sends push notification to your device. Setting things up: - Download this app: https://play.google.com/store/apps/details?id=net.xdroid.pn - Set the `settings.pushNotificationKey` key obtained from the app. Example key: `k-967fe9...` - Alternatively, set the environment variable `k1lib_pushNotificationKey` instead - Run the function as usual""" import requests key = k1lib.settings.pushNotificationKey requests.get("http://xdroid.net/api/message?" + urllib.parse.urlencode({'k': key, 't': title, 'c': content, 'u': url})) print("Pushed!")
class Dependency: def __init__(self, s): self.s = s def __getattr__(self, attr): raise Exception(f"Python package `{self.s}` not found. Please install it")
[docs]def dep(s): """Imports a potentially unavailable package Example:: graphviz = k1.dep("graphviz") # executes as normal, if graphviz is available, else throws an error g = graphviz.Digraph() I don't imagine this would be useful for everyday use though. This is mainly for writing this library, so that it can use optional dependencies.""" try: return importlib.import_module(s) except: return Dependency(s)
tickCheckpoints = np.array([1, 2, 2.5, 5, 10])/10
[docs]def ticks(x:float, y:float, rounding:int=6): """Get tick locations in a plot that look reasonable. Example:: ticks(-5, 40) # returns [-10.0, -5.0, 0.0, 5.0, 10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0] ticks(0.05, 0.07) # returns [0.05, 0.0525, 0.055, 0.0575, 0.06, 0.0625, 0.065, 0.0675, 0.07, 0.0725] ticks(-5, 5) # returns [-6.0, -5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0] So essentially, whenever you try to plot something, you want both the x and y axis to not have too many lines, and that the tick values snap to a nice number. Normally you don't have to do this, as matplotlib does this automatically behind the scenes, but sometimes you need to implement plotting again, in strange situations, so this function comes in handy :param x: start of interval :param y: end of interval :param rounding: internally, it rounds tick values to this number of digits, to fix tiny float overflows that make numbers ugly. So you can disable it if you're working with really small numbers""" cli = k1lib.cli; ld = math.log10(y-x); scale = ld-ld%1 # log delta sel = ((ld%1 - tickCheckpoints)**2).argmin() # selecting the best fit tick checkpoint interval = 10**scale*tickCheckpoints[sel] # interval between ticks seed = int(y/10**scale)*10**scale # seed tick f1 = cli.applySerial(cli.op()-interval) | cli.breakIf(cli.op()<x-interval) f2 = cli.applySerial(cli.op()+interval) | cli.breakIf(cli.op()>y+interval) # finally, use the seed to expand in both directions to get all the ticks return seed | f1 & f2 | cli.joinStreams() | cli.aS(set) | cli.sort(None) | cli.apply(round, ndigits=rounding) | cli.deref() | cli.aS(np.array)