# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import logging, warnings, os, time, re, json, k1lib, importlib, urllib.parse, math, base64, dill, inspect, threading, datetime
import numpy as np
from typing import Any, List, Union, Tuple, Iterator, Dict
from functools import partial
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["_docsUrl", "isNumeric",
           "patch", "cron", "wrapMod", "wraps", "squeeze", "raiseEx",
           "numDigits", "limitLines",
           "limitChars", "showLog", "cleanDiv",
           "beep", "beepOnAvailable", "dontWrap",
           "debounce", "scaleSvg", "now", "pushNotification", "dep", "ticks", "digraph", "graph",
           "encode", "decode", "hash", "resolve"]
_docsUrl = "https://k1lib.com"
[docs]def isNumeric(x) -> bool:                                                        # isNumeric
    """Returns whether object is actually a number"""                            # isNumeric
    return isinstance(x, (int, float, np.number))                                # isNumeric 
[docs]def patch(_class:type, name:str=None, docs:Union[str, Any]=None, static=False):  # patch
    """Patches a function to a class/object.
:param _class: object to patch function. Can also be a type
:param name: name of patched function, if different from current
:param docs: docs of patched function. Can be object with defined __doc__ attr
:param static: whether to wrap this inside :class:`staticmethod` or not
:return: modified function just before patching
Intended to be used like this::
    class A:
        def methA(self):
            return "inside methA"
    @k1lib.patch(A)
    def methB(self):
        return "inside methB"
    a = A()
    a.methB() # returns "inside methB"
You can do ``@property`` attributes like this::
    class A: pass
    @k1lib.patch(A, "propC")
    @property
    def propC(self): return self._propC
    @k1lib.patch(A, "propC")
    @propC.setter
    def propC(self, value): self._propC = value
    a = A(); a.propC = "abc"
    a.propC # returns "abc"
The attribute name unfortunately has to be explicitly declared, as I can't
really find a way to extract the original name. You can also do static
methods like this::
    class A: pass
    @k1lib.patch(A, static=True)
    def staticD(arg1): return arg1
    A.staticD("def") # returns "def"
"""                                                                              # patch
    def inner(function):                                                         # patch
        _docs = docs                                                             # patch
        if _docs is not None and not isinstance(_docs, str): _docs = _docs.__doc__ # patch
        _docs = _docs or function.__doc__ or _class.__doc__                      # patch
        _name = name or function.__qualname__.split(".")[-1]                     # patch
                                                                                 # patch
        _function = staticmethod(function) if static else function               # patch
        _function.__doc__ = _docs; setattr(_class, _name, _function); return _function # patch
    return inner                                                                 # patch 
[docs]def cron(f):                                                                     # cron
    """Sets up a cron job in another thread, running the decorated
function whenever ``f`` goes from False to True. Example::
    @k1.cron(lambda minute: minute == 0)
    def f1(): # runs every hour
        ... # do some stuff
    @k1.cron(lambda second: second % 5 == 0)
    def f2(): # runs every 5 seconds
        ... # do some stuff
So, the first function will run every hour, and the second function will
run every 5 seconds. Pretty straightforward. The timing function ``f`` can
be as complicated as you want, but it can only accept the following parameters:
- year
- month: 1-12
- day: 1-31
- weekday: 0-6, 0 for Monday
- hour: 0-23
- minute: 0-59
- second: 0-59"""                                                                # cron
    def inner(func):                                                             # cron
        args = list(inspect.signature(f).parameters.keys())                      # cron
        s = {"year", "month", "day", "weekday", "hour", "minute", "second"}      # cron
        for arg in args:                                                         # cron
            if arg not in s: raise Exception(f"Unknown argument {arg}. Only (year, month, day, weekday, hour, minute, seconds) are allowed") # cron
        def startLoop():                                                         # cron
            last = False; this = last                                            # cron
            while True:                                                          # cron
                a = datetime.datetime.now()                                      # cron
                now = {"year": a.year, "month": a.month, "day": a.day, "weekday": a.weekday(), "hour": a.hour, "minute": a.minute, "second": a.second} # cron
                this = f(*[now[e] for e in args])                                # cron
                if not last and this: func()                                     # cron
                last = this; time.sleep(0.5)                                     # cron
        threading.Thread(target=startLoop).start()                               # cron
        return func                                                              # cron
    return inner                                                                 # cron 
[docs]class wrapMod:                                                                   # wrapMod
[docs]    def __init__(self, m, moduleName=None):                                      # wrapMod
        """Wraps around a module, and only suggest symbols in __all__ list
defined inside the module. Example::
    from . import randomModule
    randomModule = wrapMod(randomModule)
:param m: the imported module
:param moduleName: optional new module name for elements (their __module__ attr)""" # wrapMod
        if moduleName is not None:                                               # wrapMod
            for v in m.__dict__.values():                                        # wrapMod
                v.__module__ = moduleName                                        # wrapMod
        self._wrapMod_m = m                                                      # wrapMod
        self.__dict__.update(m.__dict__)                                         # wrapMod
        self._wrapMod_extraDirs = []                                             # wrapMod 
    def __dir__(self): return self._wrapMod_m.__all__ + self._wrapMod_extraDirs  # wrapMod
    def __str__(self): return str(self._wrapMod_m)                               # wrapMod
    def __repr__(self): return str(self)                                         # wrapMod 
[docs]def wraps(ogF):                                                                  # wraps
    """Kinda like :meth:`functools.wraps`, but don't update __annotations__."""  # wraps
    def inner(f):                                                                # wraps
        f.__doc__ = ogF.__doc__                                                  # wraps
        f.__name__ = ogF.__name__                                                # wraps
        f.__qualname__ = ogF.__qualname__                                        # wraps
        f.__module__ = ogF.__module__                                            # wraps
        return f                                                                 # wraps
    return inner                                                                 # wraps 
[docs]def squeeze(_list:Union[list, tuple, Any], hard=False):                          # squeeze
    """If list only has 1 element, returns that element, else returns original
list.
:param hard: If True, then if list/tuple, filters out None, and takes the first
    element out even if that list/tuple has more than 1 element"""               # squeeze
    if isinstance(_list, (tuple, list)):                                         # squeeze
        if hard: return [e for e in _list if e != None and e != ""][0]           # squeeze
        elif len(_list) == 1: return _list[0]                                    # squeeze
    if hasTorch and isinstance(_list, torch.Tensor): return _list.squeeze()      # squeeze
    return _list                                                                 # squeeze 
[docs]def raiseEx(ex:Exception):                                                       # raiseEx
    """Raises a specific exception. May be useful in lambdas"""                  # raiseEx
    raise ex                                                                     # raiseEx 
[docs]def numDigits(num) -> int:                                                       # numDigits
    """Get the number of digits/characters of this number/object"""              # numDigits
    return len(f"{num}")                                                         # numDigits 
[docs]def limitLines(s:str, limit:int=10) -> str:                                      # limitLines
    """If input string is too long, truncates it and adds ellipsis"""            # limitLines
    splits = s.split("\n")                                                       # limitLines
    if len(splits) > limit: return "\n".join(splits[:limit]) + "\n....."         # limitLines
    else: return s                                                               # limitLines 
[docs]def limitChars(s:str, limit:int=50):                                             # limitChars
    """If input string is too long, truncates to first `limit` characters of the first line""" # limitChars
    if s is None: return ""                                                      # limitChars
    s = f"{s}".split("\n")[0]                                                    # limitChars
    return s[:limit-3] + "..." if len(s) > limit else s                          # limitChars 
[docs]def showLog(loggerName:str="", level:int=logging.DEBUG):                         # showLog
    """Prints out logs of a particular logger at a particular level"""           # showLog
    logger = logging.getLogger(loggerName); logger.setLevel(level)               # showLog
    sh = logging.StreamHandler(); sh.setLevel(logging.DEBUG); logger.addHandler(sh) # showLog 
def cleanDiv(_list:List[float], total:int) -> List[int]:                         # cleanDiv
    """Cleanly divides :total: into int segments with weights specified by
:_list:. Looks like this: ``(_list / _list.sum()) * total``, but
everything is an ``int``"""                                                      # cleanDiv
    _list = np.array(_list)                                                      # cleanDiv
    _list = (_list*total/_list.sum()).astype(int)                                # cleanDiv
    _list[-1] = total - _list[:-1].sum()                                         # cleanDiv
    return _list                                                                 # cleanDiv
[docs]def beep(seconds=0.3):                                                           # beep
    """Plays a beeping sound, may be useful as notification for long-running tasks""" # beep
    try: import IPython; IPython.core.display.display_html(IPython.display.HTML(f"""<script>(new Audio('data:audio/wav;base64,UklGRl9vT19XQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YU'+Array(Math.round(3.3333e3*{seconds})).join(123))).play();</script>""")); # beep
    except: os.system("printf '\a'")                                             # beep 
def beepOnAvailable(url:str, timeout=5, **kwargs):                               # beepOnAvailable
    """Tries to connect with a url repeatedly, and if successful, plays
a beep sound"""                                                                  # beepOnAvailable
    import requests                                                              # beepOnAvailable
    try:                                                                         # beepOnAvailable
        while True:                                                              # beepOnAvailable
            time.sleep(1); successful = False                                    # beepOnAvailable
            try:                                                                 # beepOnAvailable
                if requests.get(url, timeout=timeout, **kwargs).ok:              # beepOnAvailable
                    successful = True                                            # beepOnAvailable
            except: pass                                                         # beepOnAvailable
            if successful:                                                       # beepOnAvailable
                beep(); break                                                    # beepOnAvailable
    except KeyboardInterrupt: print("Still not available")                       # beepOnAvailable
[docs]def dontWrap():                                                                  # dontWrap
    """Don't wrap horizontally when in a notebook. Normally, if you're
displaying something long, like the output of ``print('a'*1000)`` in a notebook,
it will display it in multiple lines. This may be undesirable, so this solves
that by displaying some HTML with css styles so that the notebook doesn't wrap.""" # dontWrap
    try:                                                                         # dontWrap
        from IPython.core.display import display, HTML                           # dontWrap
        display(HTML("""<style>
    div.jp-OutputArea-output pre {white-space: pre;}
    div.output_area pre {white-space: pre;}
    div.CodeMirror > div.highlight {overflow-y: auto;}
</style>"""))                                                                    # dontWrap
    except: pass#print("Can't run dontWrap()")                                   # dontWrap 
import asyncio, functools                                                        # dontWrap
from threading import Timer as ThreadingTimer                                    # dontWrap
class AsyncTimer: # rename if want to use                                        # AsyncTimer
    def __init__(self, timeout, callback):                                       # AsyncTimer
        self._timeout = timeout; self._callback = callback                       # AsyncTimer
    async def _job(self):                                                        # AsyncTimer
        await asyncio.sleep(self._timeout); self._callback()                     # AsyncTimer
    def start(self): self._task = asyncio.ensure_future(self._job())             # AsyncTimer
    def cancel(self): self._task.cancel()                                        # AsyncTimer
[docs]def debounce(wait, threading=False):                                             # debounce
    """Decorator that will postpone a function's execution until after
``wait`` seconds have elapsed since the last time it was invoked. Taken
from `ipywidgets <https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20Events.html>`_.
Example::
    import k1lib, time; value = 0
    @k1lib.debounce(0.5, True)
    def f(x): global value; value = x**2
    f(2); time.sleep(0.3); f(3)
    print(value) # prints "0"
    time.sleep(0.7)
    print(value) # prints "9"
:param wait: wait time in seconds
:param threading: if True, use multiple threads, else just use async stuff"""    # debounce
    Timer = ThreadingTimer if threading else AsyncTimer                          # debounce
    def decorator(fn):                                                           # debounce
        timer = None                                                             # debounce
        def debounced(*args, **kwargs):                                          # debounce
            nonlocal timer                                                       # debounce
            if timer is not None: timer.cancel()                                 # debounce
            timer = Timer(wait, lambda: fn(*args, **kwargs))                     # debounce
            timer.start()                                                        # debounce
        functools.update_wrapper(debounced, fn); return debounced                # debounce
    return decorator                                                             # debounce 
[docs]def scaleSvg(svg:str, scale:float=None) -> str:                                  # scaleSvg
    """Scales an svg xml string by some amount."""                               # scaleSvg
    if scale is None: scale = k1lib.settings.svgScale                            # scaleSvg
    wS = w = re.findall("width=\"\d*pt\"", svg)[0]                               # scaleSvg
    hS = w = re.findall("height=\"\d*pt\"", svg)[0]                              # scaleSvg
    w = int(int(re.findall("\d+", wS)[0])*scale)                                 # scaleSvg
    h = int(int(re.findall("\d+", hS)[0])*scale)                                 # scaleSvg
    svg = re.sub(wS, f'width="{w}pt"', svg)                                      # scaleSvg
    svg = re.sub(hS, f'height="{h}pt"', svg)                                     # scaleSvg
    return svg                                                                   # scaleSvg 
import datetime                                                                  # scaleSvg
[docs]def now():                                                                       # now
    """Convenience function for returning a simple time
string, with timezone and whatnot."""                                            # now
    return datetime.datetime.now().astimezone().isoformat()                      # now 
now()                                                                            # now
[docs]def pushNotification(title="Some title", content="Some content", url="https://k1lib.com"): # pushNotification
    """Sends push notification to your device.
Setting things up:
- Download this app: https://play.google.com/store/apps/details?id=net.xdroid.pn
- Set the `settings.pushNotificationKey` key obtained from the app. Example key: `k-967fe9...`
- Alternatively, set the environment variable `k1lib_pushNotificationKey` instead
- Run the function as usual"""                                                   # pushNotification
    import requests                                                              # pushNotification
    key = k1lib.settings.pushNotificationKey                                     # pushNotification
    requests.get("http://xdroid.net/api/message?" + urllib.parse.urlencode({'k': key, 't': title, 'c': content, 'u': url})) # pushNotification
    print("Pushed!")                                                             # pushNotification 
class Dependency:                                                                # Dependency
    def __init__(self, s, alt:str=None, url:str=None): self.s = s; self.alt = alt; self.url = url # Dependency
    def __getattr__(self, attr):                                                 # Dependency
        after = f"More information is available on {self.url}" if self.url else f"" # Dependency
        raise Exception(f"Python package `{self.alt or self.s}` not found. Please install it. {after}") # Dependency
[docs]def dep(s, alt:str=None, url:str=None):                                          # dep
    """Imports a potentially unavailable package
Example::
    graphviz = k1.dep("graphviz")
    # executes as normal, if graphviz is available, else throws an error
    g = graphviz.Digraph()
I don't imagine this would be useful for everyday use though.
This is mainly for writing this library, so that it can use
optional dependencies.
:param s: name of the package. Can be nested, like `matplotlib.pyplot`
:param alt: (optional) name of the package to display in error message if package is not found
:param url: (optional) url of the package's website, so that they know where to get official docs""" # dep
    try: return importlib.import_module(s)                                       # dep
    except: return Dependency(s, alt, url)                                       # dep 
tickCheckpoints = np.array([1, 2, 2.5, 5, 10])/10                                # dep
[docs]def ticks(x:float, y:float, rounding:int=6):                                     # ticks
    """Get tick locations in a plot that look reasonable.
Example::
    ticks(-5, 40)     # returns [-10.0, -5.0, 0.0, 5.0, 10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0]
    ticks(0.05, 0.07) # returns [0.05, 0.0525, 0.055, 0.0575, 0.06, 0.0625, 0.065, 0.0675, 0.07, 0.0725]
    ticks(-5, 5)      # returns [-6.0, -5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
So essentially, whenever you try to plot something, you want both the x and y axis
to not have too many lines, and that the tick values snap to a nice number.
Normally you don't have to do this, as matplotlib does this automatically behind
the scenes, but sometimes you need to implement plotting again, in strange
situations, so this function comes in handy
:param x: start of interval
:param y: end of interval
:param rounding: internally, it rounds tick values to this number of
    digits, to fix tiny float overflows that make numbers ugly. So
    you can disable it if you're working with really small numbers"""            # ticks
    cli = k1lib.cli; ld = math.log10(y-x); scale = ld-ld%1 # log delta           # ticks
    sel = ((ld%1 - tickCheckpoints)**2).argmin() # selecting the best fit tick checkpoint # ticks
    interval = 10**scale*tickCheckpoints[sel] # interval between ticks           # ticks
    seed = int(y/10**scale)*10**scale # seed tick                                # ticks
    f1 = cli.applySerial(cli.op()-interval) | cli.breakIf(cli.op()<x-interval)   # ticks
    f2 = cli.applySerial(cli.op()+interval) | cli.breakIf(cli.op()>y+interval)   # ticks
    # finally, use the seed to expand in both directions to get all the ticks    # ticks
    return seed | f1 & f2 | cli.joinStreams() | cli.aS(set) | cli.sort(None) | cli.apply(round, ndigits=rounding) | cli.deref() | cli.aS(np.array) # ticks 
try:                                                                             # ticks
    import graphviz                                                              # ticks
[docs]    def digraph():                                                               # ticks
        """Convenience method for creating a new graphviz Digraph.
Example::
    g = k1lib.graph()
    g("a", "b", "c")
    g # displays arrows from "a" to "b" and "a" to "c"
"""                                                                              # ticks
        return graphviz.Digraph(graph_attr={"rankdir":"TB"})                     # ticks 
[docs]    def graph():                                                                 # ticks
        """Convenience method for creating a new graphviz Graph. See also: :meth:`digraph`""" # ticks
        return graphviz.Graph(graph_attr={"rankdir":"TB"})                       # ticks 
except ImportError:                                                              # ticks
    digraph = graph = lambda: print("Module `graphviz` not found! Please install it first, something like `pip install graphviz`") # ticks
def encode(obj:object) -> str:                                                   # encode
    """Serialize random objects into bytes, then turn those bytes to
normal strings using base64. Example::
    a = {"b": 3}
    encode(a) # returns "gASVCgAAAAAAAAB9lIwBYpRLA3Mu"
    decode(encode(a)) # returns {"b"}: 3
See also: :meth:`decode`
"""                                                                              # encode
    return base64.b64encode(dill.dumps(obj)).decode()                            # encode
def decode(s:str) -> object:                                                     # decode
    """Turns a string produced by :meth:`encode` back into a random object."""   # decode
    return dill.loads(base64.b64decode(s.encode()))                              # decode
import hashlib                                                                   # decode
def hash(msg:str) -> int:                                                        # hash
    """A universal hash function. Why not just use the builtin hash function?
Because it actually changes for different interpreter instances, it won't be
good for code that runs on multiple computers, so this is sort of like a
drop-in replacement. Although it does output an integer, don't rely on it having
the same numeric properties as a normal hash function."""                        # hash
    if not isinstance(msg, str): msg = f"{msg}"                                  # hash
    m = hashlib.sha256(); m.update(f"{msg}".encode()); return int.from_bytes(m.digest(), "big") # hash
hash(34)                                                                         # hash
import traceback                                                                 # hash
try:                                                                             # hash
    import asyncio, threading, time; from collections import deque               # hash
    _coroutineQueue = deque() # deque of (idx, coroutine)                        # hash
    _coroutineAns = dict() # Dict[idx -> coroutine ans]                          # hash
    _coroutineAutoIdx = 0                                                        # hash
    def _coroutineResolvingThread():                                             # hash
        loop = asyncio.new_event_loop()                                          # hash
        while True:                                                              # hash
            if len(_coroutineQueue) == 0: time.sleep(0.01)                       # hash
            else:                                                                # hash
                idx, coroutine = _coroutineQueue.popleft()                       # hash
                # important to recover from exceptions                           # hash
                try: ans = loop.run_until_complete(coroutine); _coroutineAns[idx] = {"type": "success", "ans": ans} # hash
                except Exception as e: _coroutineAns[idx] = {"type": "failure", "e": f"{e}", "tb": traceback.format_exc()} # hash
    threading.Thread(target=_coroutineResolvingThread, daemon=True).start()      # hash
    _resolve_err = None                                                          # hash
except Exception as e: _resolve_err = f"{e}"; _resolve_tb = traceback.format_exc() # hash
def resolve(coroutine):                                                          # resolve
    """Resolves coroutines without having to use await.
Example::
    async def f(x):
        await asyncio.sleep(1) # simulates heavy processing
        return x + 4
    k1.resolve(f(5))
This kinda feels just like ``asyncio.run(f(5))``, so why does this exist?
Here's the docstring of that method:
.. code-block:: text
    This function cannot be called when another asyncio event loop is running in the same thread.
    This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once.
This has more limitations that I found annoying to deal with day-to-day. I want
a function that always work, no matter my setup. So, how this function work
internally is that it spins up a new (permanent, daemon) thread, creates a new
event loop in that thread, then whenever a coroutine comes in, it runs it in
that event loop, returns, then pass control back to whatever thread that called
:meth:`resolve`. I'm sure this can still be messed up in some way, but seems
more useful than the builtin method.
This is just meant as a quick and dirty way to force resolving coroutines. Use
this sparingly, as performance won't be as good as a proper async application.
If you find yourself using this way too often, then I'd suggest reviewing how
:mod:`asyncio` works"""                                                          # resolve
    global _coroutineAutoIdx                                                     # resolve
    if _resolve_err: raise Exception(f"k1lib.resolve() not available, encoutered this error while starting up: {_resolve_err}. Traceback:\n{_resolve_tb}") # resolve
    idx = _coroutineAutoIdx; _coroutineAutoIdx += 1; _coroutineQueue.append([idx, coroutine]) # resolve
    while idx not in _coroutineAns: time.sleep(0.01)                             # resolve
    ans = _coroutineAns[idx]; del _coroutineAns[idx]                             # resolve
    if ans["type"] == "success": return ans["ans"]                               # resolve
    else: raise Exception(f"Exception occured while trying to k1lib.resolve(): {ans['e']}. Traceback:\n{ans['tb']}") # resolve