# AUTOGENERATED FILE! PLEASE DON'T EDIT
import logging, warnings, os, time, re, json, k1lib, importlib, urllib.parse, math, base64, dill
import numpy as np, matplotlib.pyplot as plt, matplotlib as mpl
from typing import Any, List, Union, Tuple, Iterator, Dict
from functools import partial
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["_docsUrl", "isNumeric",
"patch", "wrapMod", "wraps", "squeeze", "raiseEx",
"numDigits", "limitLines",
"limitChars", "showLog", "cleanDiv",
"beep", "beepOnAvailable", "dontWrap",
"debounce", "scaleSvg", "now", "pushNotification", "dep", "ticks", "digraph", "graph",
"encode", "decode"]
_docsUrl = "https://k1lib.com"
[docs]def isNumeric(x) -> bool:
"""Returns whether object is actually a number"""
return isinstance(x, (int, float, np.number))
[docs]def patch(_class:type, name:str=None, docs:Union[str, Any]=None, static=False):
"""Patches a function to a class/object.
:param _class: object to patch function. Can also be a type
:param name: name of patched function, if different from current
:param docs: docs of patched function. Can be object with defined __doc__ attr
:param static: whether to wrap this inside :class:`staticmethod` or not
:return: modified function just before patching
Intended to be used like this::
class A:
def methA(self):
return "inside methA"
@k1lib.patch(A)
def methB(self):
return "inside methB"
a = A()
a.methB() # returns "inside methB"
You can do ``@property`` attributes like this::
class A: pass
@k1lib.patch(A, "propC")
@property
def propC(self): return self._propC
@k1lib.patch(A, "propC")
@propC.setter
def propC(self, value): self._propC = value
a = A(); a.propC = "abc"
a.propC # returns "abc"
The attribute name unfortunately has to be explicitly declared, as I can't
really find a way to extract the original name. You can also do static
methods like this::
class A: pass
@k1lib.patch(A, static=True)
def staticD(arg1): return arg1
A.staticD("def") # returns "def"
"""
def inner(function):
_docs = docs
if _docs is not None and not isinstance(_docs, str): _docs = _docs.__doc__
_docs = _docs or function.__doc__ or _class.__doc__
_name = name or function.__qualname__.split(".")[-1]
_function = staticmethod(function) if static else function
_function.__doc__ = _docs; setattr(_class, _name, _function); return _function
return inner
[docs]class wrapMod:
[docs] def __init__(self, m, moduleName=None):
"""Wraps around a module, and only suggest symbols in __all__ list
defined inside the module. Example::
from . import randomModule
randomModule = wrapMod(randomModule)
:param m: the imported module
:param moduleName: optional new module name for elements (their __module__ attr)"""
if moduleName is not None:
for v in m.__dict__.values():
v.__module__ = moduleName
self._wrapMod_m = m
self.__dict__.update(m.__dict__)
self._wrapMod_extraDirs = []
def __dir__(self): return self._wrapMod_m.__all__ + self._wrapMod_extraDirs
def __str__(self): return str(self._wrapMod_m)
def __repr__(self): return str(self)
[docs]def wraps(ogF):
"""Kinda like :meth:`functools.wraps`, but don't update __annotations__."""
def inner(f):
f.__doc__ = ogF.__doc__
f.__name__ = ogF.__name__
f.__qualname__ = ogF.__qualname__
f.__module__ = ogF.__module__
return f
return inner
[docs]def squeeze(_list:Union[list, tuple, Any], hard=False):
"""If list only has 1 element, returns that element, else returns original
list.
:param hard: If True, then if list/tuple, filters out None, and takes the first
element out even if that list/tuple has more than 1 element"""
if isinstance(_list, (tuple, list)):
if hard: return [e for e in _list if e != None and e != ""][0]
elif len(_list) == 1: return _list[0]
if hasTorch and isinstance(_list, torch.Tensor): return _list.squeeze()
return _list
[docs]def raiseEx(ex:Exception):
"""Raises a specific exception. May be useful in lambdas"""
raise ex
[docs]def numDigits(num) -> int:
"""Get the number of digits/characters of this number/object"""
return len(f"{num}")
[docs]def limitLines(s:str, limit:int=10) -> str:
"""If input string is too long, truncates it and adds ellipsis"""
splits = s.split("\n")
if len(splits) > limit: return "\n".join(splits[:limit]) + "\n....."
else: return s
[docs]def limitChars(s:str, limit:int=50):
"""If input string is too long, truncates to first `limit` characters of the first line"""
if s is None: return ""
s = f"{s}".split("\n")[0]
return s[:limit-3] + "..." if len(s) > limit else s
[docs]def showLog(loggerName:str="", level:int=logging.DEBUG):
"""Prints out logs of a particular logger at a particular level"""
logger = logging.getLogger(loggerName); logger.setLevel(level)
sh = logging.StreamHandler(); sh.setLevel(logging.DEBUG); logger.addHandler(sh)
def cleanDiv(_list:List[float], total:int) -> List[int]:
"""Cleanly divides :total: into int segments with weights specified by
:_list:. Looks like this: ``(_list / _list.sum()) * total``, but
everything is an ``int``"""
_list = np.array(_list)
_list = (_list*total/_list.sum()).astype(int)
_list[-1] = total - _list[:-1].sum()
return _list
[docs]def beep(seconds=0.3):
"""Plays a beeping sound, may be useful as notification for long-running tasks"""
try: import IPython; IPython.core.display.display_html(IPython.display.HTML(f"""<script>(new Audio('data:audio/wav;base64,UklGRl9vT19XQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YU'+Array(Math.round(3.3333e3*{seconds})).join(123))).play();</script>"""));
except: os.system("printf '\a'")
def beepOnAvailable(url:str, timeout=5, **kwargs):
"""Tries to connect with a url repeatedly, and if successful, plays
a beep sound"""
import requests
try:
while True:
time.sleep(1); successful = False
try:
if requests.get(url, timeout=timeout, **kwargs).ok:
successful = True
except: pass
if successful:
beep(); break
except KeyboardInterrupt: print("Still not available")
[docs]def dontWrap():
"""Don't wrap horizontally when in a notebook. Normally, if you're
displaying something long, like the output of ``print('a'*1000)`` in a notebook,
it will display it in multiple lines. This may be undesirable, so this solves
that by displaying some HTML with css styles so that the notebook doesn't wrap."""
try:
from IPython.core.display import display, HTML
display(HTML("""<style>
div.jp-OutputArea-output pre {white-space: pre;}
div.output_area pre {white-space: pre;}
div.CodeMirror > div.highlight {overflow-y: auto;}
</style>"""))
except: pass#print("Can't run dontWrap()")
import asyncio, functools
from threading import Timer as ThreadingTimer
class AsyncTimer: # rename if want to use
def __init__(self, timeout, callback):
self._timeout = timeout; self._callback = callback
async def _job(self):
await asyncio.sleep(self._timeout); self._callback()
def start(self): self._task = asyncio.ensure_future(self._job())
def cancel(self): self._task.cancel()
[docs]def debounce(wait, threading=False):
"""Decorator that will postpone a function's execution until after
``wait`` seconds have elapsed since the last time it was invoked. Taken
from `ipywidgets <https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20Events.html>`_.
Example::
import k1lib, time; value = 0
@k1lib.debounce(0.5, True)
def f(x): global value; value = x**2
f(2); time.sleep(0.3); f(3)
print(value) # prints "0"
time.sleep(0.7)
print(value) # prints "9"
:param wait: wait time in seconds
:param threading: if True, use multiple threads, else just use async stuff"""
Timer = ThreadingTimer if threading else AsyncTimer
def decorator(fn):
timer = None
def debounced(*args, **kwargs):
nonlocal timer
if timer is not None: timer.cancel()
timer = Timer(wait, lambda: fn(*args, **kwargs))
timer.start()
functools.update_wrapper(debounced, fn); return debounced
return decorator
[docs]def scaleSvg(svg:str, scale:float=None) -> str:
"""Scales an svg xml string by some amount."""
if scale is None: scale = k1lib.settings.svgScale
wS = w = re.findall("width=\"\d*pt\"", svg)[0]
hS = w = re.findall("height=\"\d*pt\"", svg)[0]
w = int(int(re.findall("\d+", wS)[0])*scale)
h = int(int(re.findall("\d+", hS)[0])*scale)
svg = re.sub(wS, f'width="{w}pt"', svg)
svg = re.sub(hS, f'height="{h}pt"', svg)
return svg
try:
from scipy import stats
__all__.append("pValue")
[docs] def pValue(zScore):
"""2-sided p value of a particular z score. Requires :mod:`scipy`."""
return stats.norm.sf(abs(zScore))*2
except: pass
import datetime
[docs]def now():
"""Convenience function for returning a simple time
string, with timezone and whatnot."""
return datetime.datetime.now().astimezone().isoformat()
now()
[docs]def pushNotification(title="Some title", content="Some content", url="https://k1lib.com"):
"""Sends push notification to your device.
Setting things up:
- Download this app: https://play.google.com/store/apps/details?id=net.xdroid.pn
- Set the `settings.pushNotificationKey` key obtained from the app. Example key: `k-967fe9...`
- Alternatively, set the environment variable `k1lib_pushNotificationKey` instead
- Run the function as usual"""
import requests
key = k1lib.settings.pushNotificationKey
requests.get("http://xdroid.net/api/message?" + urllib.parse.urlencode({'k': key, 't': title, 'c': content, 'u': url}))
print("Pushed!")
class Dependency:
def __init__(self, s): self.s = s
def __getattr__(self, attr): raise Exception(f"Python package `{self.s}` not found. Please install it")
[docs]def dep(s):
"""Imports a potentially unavailable package
Example::
graphviz = k1.dep("graphviz")
# executes as normal, if graphviz is available, else throws an error
g = graphviz.Digraph()
I don't imagine this would be useful for everyday use though.
This is mainly for writing this library, so that it can use
optional dependencies."""
try: return importlib.import_module(s)
except: return Dependency(s)
tickCheckpoints = np.array([1, 2, 2.5, 5, 10])/10
[docs]def ticks(x:float, y:float, rounding:int=6):
"""Get tick locations in a plot that look reasonable.
Example::
ticks(-5, 40) # returns [-10.0, -5.0, 0.0, 5.0, 10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0]
ticks(0.05, 0.07) # returns [0.05, 0.0525, 0.055, 0.0575, 0.06, 0.0625, 0.065, 0.0675, 0.07, 0.0725]
ticks(-5, 5) # returns [-6.0, -5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
So essentially, whenever you try to plot something, you want both the x and y axis
to not have too many lines, and that the tick values snap to a nice number.
Normally you don't have to do this, as matplotlib does this automatically behind
the scenes, but sometimes you need to implement plotting again, in strange
situations, so this function comes in handy
:param x: start of interval
:param y: end of interval
:param rounding: internally, it rounds tick values to this number of
digits, to fix tiny float overflows that make numbers ugly. So
you can disable it if you're working with really small numbers"""
cli = k1lib.cli; ld = math.log10(y-x); scale = ld-ld%1 # log delta
sel = ((ld%1 - tickCheckpoints)**2).argmin() # selecting the best fit tick checkpoint
interval = 10**scale*tickCheckpoints[sel] # interval between ticks
seed = int(y/10**scale)*10**scale # seed tick
f1 = cli.applySerial(cli.op()-interval) | cli.breakIf(cli.op()<x-interval)
f2 = cli.applySerial(cli.op()+interval) | cli.breakIf(cli.op()>y+interval)
# finally, use the seed to expand in both directions to get all the ticks
return seed | f1 & f2 | cli.joinStreams() | cli.aS(set) | cli.sort(None) | cli.apply(round, ndigits=rounding) | cli.deref() | cli.aS(np.array)
try:
import graphviz
[docs] def digraph():
"""Convenience method for creating a new graphviz Digraph.
Example::
g = k1lib.graph()
g("a", "b", "c")
g # displays arrows from "a" to "b" and "a" to "c"
"""
return graphviz.Digraph(graph_attr={"rankdir":"TB"})
[docs] def graph():
"""Convenience method for creating a new graphviz Graph. See also: :meth:`digraph`"""
return graphviz.Graph(graph_attr={"rankdir":"TB"})
except ImportError:
digraph = graph = lambda: print("Module `graphviz` not found! Please install it first, something like `pip install graphviz`")
def encode(obj:object) -> str:
"""Serialize random objects into bytes, then turn those bytes to
normal strings using base64. Example::
a = {"b": 3}
encode(a) # returns "gASVCgAAAAAAAAB9lIwBYpRLA3Mu"
decode(encode(a)) # returns {"b"}: 3
See also: :meth:`decode`
"""
return base64.b64encode(dill.dumps(obj)).decode()
def decode(s:str) -> object:
"""Turns a string produced by :meth:`encode` back into a random object."""
return dill.loads(base64.b64decode(s.encode()))