Source code for k1lib._basics

# AUTOGENERATED FILE! PLEASE DON'T EDIT
import torch, math, logging, warnings, random
import torch.nn as nn, numpy as np, matplotlib.pyplot as plt
from typing import Dict, Any, List, Union, Callable, overload, Tuple, Iterator\
, ContextManager
__all__ = ["_docsUrl", "Object", "Range", "Domain", "AutoIncrement", "Wrapper",
           "CancelRunException", "CancelEpochException", "CancelBatchException",
           "textToHtml", "clearLine", "tab", "isNumeric", "close", "stats",
           "patch", "squeeze", "raiseEx", "smooth", "numDigits", "limitLines",
           "limitChars", "showLog", "cleanDiv", "beep", "executeNb", "dontWrap",
           "Func", "polyfit", "derivative", "optimize", "inverse", "integrate",
           "ignoreWarnings", "positionalEncode", "debounce"]
_docsUrl = "https://k1lib.github.io"
[docs]class Object: """Convenience class that acts like ``defaultdict``. You can use it like a normal object:: a = Object() a.b = 3 print(a.b) # outputs "3" ``__repr__()`` output is pretty nice too: .. code-block:: text <class '__main__.Object'>, with attrs: - b You can instantiate it from a dict:: a = Object.fromDict({"b": 3, "c": 4}) print(a.c) # outputs "4" And you can specify a default value, just like defaultdict:: a = Object().withAutoDeclare(lambda: []) a.texts.extend(["factorio", "world of warcraft"]) print(a.texts[0]) # outputs "factorio" .. warning:: Default values only work with variables that don't start with an underscore "_". Treating it like defaultdict is okay too:: a = Object().withAutoDeclare(lambda: []) a["movies"].append("dune") print(a.movies[0]) # outputs "dune" """ def __init__(self): self._defaultValueGenerator = None; self.repr = None
[docs] @staticmethod def fromDict(_dict:Dict[str, Any]): """Creates an object with attributes from a dictionary""" answer = Object(); answer.__dict__.update(_dict); return answer
@property def state(self) -> dict: """Essentially ``__dict__``, but only outputs the fields you defined. If your framework intentionally set some attributes, those will be reported too, so beware""" answer = dict(self.__dict__); del answer["_defaultValueGenerator"] del answer["repr"]; return answer
[docs] def withAutoDeclare(self, defaultValueGenerator): """Sets this Object up so that if a field doesn't exist, it will automatically create it with a default value.""" self._defaultValueGenerator = defaultValueGenerator; return self
def __getitem__(self, idx): return getattr(self, idx) def __setitem__(self, idx, value): setattr(self, idx, value) def __iter__(self): yield from self.state.values() def __contains__(self, item:str): return item in self.__dict__ def __getattr__(self, attr): if attr.startswith("_"): raise AttributeError() if attr == "getdoc": raise AttributeError("This param is used internally in module `IPython.core.oinspect`, so you kinda have to set it specifically yourself instead of relying on auto declare") if self._defaultValueGenerator != None: self.__dict__[attr] = self._defaultValueGenerator() return self.__dict__[attr] raise AttributeError
[docs] def withRepr(self, _repr:str): """Specify output of ``__repr__()``. Legacy code. You can just monkey patch it instead.""" self.repr = _repr; return self
def __repr__(self): _dict = "\n".join([f"- {k}" for k in self.state.keys()]) return self.repr or f"{type(self)}, with attrs:\n{_dict}"
[docs]class CancelRunException(Exception): """Used in core training loop, to skip the run entirely""" pass
[docs]class CancelEpochException(Exception): """Used in core training loop, to skip to next epoch""" pass
[docs]class CancelBatchException(Exception): """Used in core training loop, to skip to next batch""" pass
[docs]def textToHtml(text:str) -> str: """Transform a string so that it looks the same on browsers as in `print()`""" return text.replace("\n", "<br>").replace(" ", "&nbsp;")
[docs]def clearLine(): """Prints a character that clears the current line""" print("\r" + " "*80 + "\r", end="")
[docs]def tab(text:Union[list, str]) -> Union[list, str]: """Adds a tab before each line. str in, str out. List in, list out""" if isinstance(text, str): return "\n".join([" " + line for line in text.split("\n")]) else: return [" " + line for line in text]
[docs]def isNumeric(x) -> bool: """Returns whether object is actually a number""" return isinstance(x, (int, float, np.number))
[docs]def close(a, b): """Returns whether 2 values are really close to each other""" if isNumeric(a) and isNumeric(b): return abs(a - b) < 1e-6 return torch.allclose(torch.tensor(a), torch.tensor(b))
[docs]def stats(x) -> Tuple[float, float]: """Returns the mean and standard deviation of the input""" return x.mean(), x.std()
[docs]def patch(_class:type, name:str=None, docs:Union[str, Any]=None, static=False): """Patches a function to a class/object. :param _class: object to patch function. Can also be a type :param name: name of patched function, if different from current :param docs: docs of patched function. Can be object with defined __doc__ attr :param static: whether to wrap this inside :class:`staticmethod` or not Intended to be used like this:: class A: def methA(self): return "inside methA" @k1lib.patch(A) def methB(self): return "inside methB" a = A() a.methB() # returns "inside methB" You can do ``@property`` attributes like this:: class A: pass @k1lib.patch(A, "propC") @property def propC(self): return self._propC @k1lib.patch(A, "propC") @propC.setter def propC(self, value): self._propC = value a = A(); a.propC = "abc" a.propC # returns "abc" The attribute name unfortunately has to be explicitly declared, as I can't really find a way to extract the original name. You can also do static methods like this:: class A: pass @k1lib.patch(A, static=True) def staticD(arg1): return arg1 A.staticD("def") # returns "def" """ def inner(function): _docs = docs if _docs is not None and not isinstance(_docs, str): _docs = _docs.__doc__ _docs = _docs or function.__doc__ or _class.__doc__ _name = name or function.__qualname__.split(".")[-1] _function = staticmethod(function) if static else function _function.__doc__ = _docs; setattr(_class, _name, _function); return _function return inner
[docs]def squeeze(_list:Union[list, tuple, torch.Tensor, Any], hard=False): """If list only has 1 element, rethrn that element, else return original list :param hard: If True, then if list/tuple, filters out None, and takes the first element out even if that list/tuple has more than 1 element""" if isinstance(_list, (tuple, list)): if hard: return [e for e in _list if e != None and e != ""][0] elif len(_list) == 1: return _list[0] if isinstance(_list, torch.Tensor): return _list.squeeze() return _list
[docs]def raiseEx(ex:Exception): """Raises a specific exception. May be useful in lambdas""" raise ex
[docs]def smooth(arr:List[float], consecutives:int=5) -> List[float]: """Smoothes out array, so that y values are averages of the neighbors""" answer = []; s = 0 for i, elem in enumerate(arr): s += elem if (i + 1) % consecutives == 0: answer.append(s / consecutives); s = 0 return answer
[docs]def numDigits(num) -> int: """Get the number of digits/characters of this number/object""" return len(f"{num}")
[docs]def limitLines(s:str, limit:int=10) -> str: """If input string is too long, truncates it and adds ellipsis""" if len(splits := s.split("\n")) > limit: return "\n".join(splits[:limit]) + "\n....." else: return s
[docs]def limitChars(s:str, limit:int=50): """If input string is too long, truncates to first `limit` characters of the first line""" if s is None: return "" s = f"{s}".split("\n")[0] return s[:limit] + "..." if len(s) > limit-3 else s
[docs]def showLog(loggerName:str="", level:int=logging.DEBUG): """Prints out logs of a particular logger at a particular level""" logger = logging.getLogger(loggerName); logger.setLevel(level) sh = logging.StreamHandler(); sh.setLevel(logging.DEBUG); logger.addHandler(sh)
def cleanDiv(_list:List[float], total:int) -> List[int]: """Cleanly divides :total: into int segments with weights specified by :_list:. Looks like this: ``(_list / _list.sum()) * total``, but everything is an ``int``""" _list = np.array(_list) _list = (_list*total/_list.sum()).astype(int) _list[-1] = total - _list[:-1].sum() return _list ninf = float("-inf"); inf = float("inf")
[docs]class Range: """A range of numbers. It's just 2 numbers really: start and stop This is essentially a convenience class to provide a nice, clean abstraction and to eliminate errors. You can transform values:: Range(10, 20).toUnit(13) # returns 0.3 Range(10, 20).fromUnit(0.3) # returns 13 Range(10, 20).toRange(Range(20, 10), 13) # returns 17 You can also do random math operations on it:: (Range(10, 20) * 2 + 3) == Range(23, 43) # returns True Range(10, 20) == ~Range(20, 10) # returns True"""
[docs] def __init__(self, start=0, stop=None): """Creates a new Range. There are different ``__init__`` functions for many situations: - Range(2, 11.1): create range [2, 11.1] - Range(15.2): creates range [0, 15.2] - Range(Range(2, 3)): create range [2, 3]. This serves as sort of a catch-all - Range(slice(2, 5, 2)): creates range [2, 5]. Can also be a :class:`range` - Range(slice(2, -1), 10): creates range [2, 9] - Range([1, 2, 7, 5]): creates range [1, 5]. Can also be a tuple """ if (isNumeric(start) and isNumeric(stop)): self.start, self.stop = start, stop elif isNumeric(start) and stop == None: self.start, self.stop = 0, start elif stop == None and isinstance(start, (range, slice, Range)): self.start, self.stop = start.start, start.stop elif isNumeric(stop) and isinstance(start, slice): r = range(stop)[start]; self.start, self.stop = r.start, r.stop elif isinstance(start, (list, tuple)): self.start, self.stop = start[0], start[-1] else: raise AttributeError(f"Don't understand {start} and {stop}") self.delta = self.stop - self.start
[docs] def __getitem__(self, index): """0 for start, 1 for stop You can also pass in a :class:`slice` object, in which case, a range subset will be returned. Code kinda looks like this:: range(start, stop)[index] """ if index == 0: return self.start if index == 1: return self.stop if type(index) == slice: return Range(range(self.start, self.stop)[index]) raise Exception(f"Can't get index {index} of range [{self.start}, {self.stop}]")
[docs] def fixOrder(self) -> "Range": """If start greater than stop, switch the 2, else do nothing""" if self.start > self.stop: self.start, self.stop = self.stop, self.start return self
def _common(self, x, f:Callable[[float], float]): if isNumeric(x): return f(x) if isinstance(x, (list, tuple)): return [self._common(elem, f) for elem in x] if isinstance(x, (range, slice, Range)): return Range(self._common(x.start if x.start != None else 0, f), self._common(x.stop if x.stop != None else 1, f)) raise AttributeError(f"Doesn't understand {x}") def __iter__(self): yield self.start; yield self.stop
[docs] def intIter(self, step:int=1) -> Iterator[int]: """Returns integers within this Range""" return range(int(self.start), int(self.stop), step)
[docs] def toUnit(self, x): """Converts x from current range to [0, 1] range. Example:: r = Range(2, 10) r.toUnit(5) # will return 0.375, as that is (5-2)/(10-2) You can actually pass in a lot in place of x:: r = Range(0, 10) r.toUnit([5, 3, 6]) # will be [0.5, 0.3, 0.6]. Can also be a tuple r.toUnit(slice(5, 6)) # will be slice(0.5, 0.6). Can also be a range, or Range .. note:: In the last case, if ``start`` is None, it gets defaulted to 0, and if ``end`` is None, it gets defaulted to 1 """ def f(x): if self.delta == 0: return float("nan") return (x - self.start) / self.delta return self._common(x, lambda x: float("nan") if self.delta == 0 else (x - self.start) / self.delta)
[docs] def fromUnit(self, x): """Converts x from [0, 1] range to this range. Example:: r = Range(0, 10) r.fromUnit(0.3) # will return 3 x can be a lot of things, see :meth:`toUnit` for more""" return self._common(x, lambda x: x * self.delta + self.start)
[docs] def toRange(self, _range:"Range", x): """Converts x from current range to another range. Example:: r = Range(0, 10) r.toRange(Range(0, 100), 6) # will return 60 x can be a lot of things, see :meth:`toUnit` for more.""" return self._common(x, lambda x: Range(_range).fromUnit(self.toUnit(x)))
@property def range_(self): """Returns a :class:`range` object with start and stop values rounded off""" return range(math.floor(self.start+0.001), math.floor(self.stop+0.001)) @property def slice_(self): """Returns a :class:`slice` object with start and stop values rounded off""" return slice(math.floor(self.start+0.001), math.floor(self.stop+0.001))
[docs] @staticmethod def proportionalSlice(r1, r2, r1Slice:slice) -> Tuple["Range", "Range"]: """Slices r1 and r2 proportionally. Best to explain using an example. Let's say you have 2 arrays created from a time-dependent procedure like this:: a = []; b = [] for t in range(100): if t % 3 == 0: a.append(t) if t % 5 == 0: b.append(1 - t) len(a), len(b) # returns (34, 20) a and b are of different lengths, but you want to plot both from 30% mark to 50% mark (for a, it's elements 10 -> 17, for b it's 6 -> 10), as they are time-dependent. As you can probably tell, to get the indicies 10, 17, 6, 10 is messy. So, you can do something like this instead:: r1, r2 = Range.proportionalSlice(Range(len(a)), Range(len(b)), slice(10, 17)) This will return the Ranges [10, 17] and [5.88, 10] Then, you can plot both of them side by side like this:: fig, axes = plt.subplots(ncols=2) axes[0].plot(r1.range_, a[r1.slice_]) axes[1].plot(r2.range_, a[r2.slice_]) """ r1, r2 = Range(r1), Range(r2) ar2 = r1.toRange(r2, (ar1 := r1[r1Slice])) return ar1, ar2
[docs] def bound(self, rs:Union[range, slice]) -> Union[range, slice]: """If input range|slice's stop and start is missing, then use this range's start and stop instead.""" start = rs.start or self.start stop = rs.stop or self.stop return type(rs)(start, stop)
[docs] def copy(self): return Range(self.start, self.stop)
def __str__(self): return f"[{self.start}, {self.stop}]" def __eq__(self, _range): _range = Range(_range) return (_range.start == self.start or abs(_range.start - self.start) < 1e-9) and\ (_range.stop == self.stop or abs(_range.stop - self.stop) < 1e-9) def __contains__(self, x:float): return x >= self.start and x < self.stop def __neg__(self): return Range(-self.start, -self.stop)
[docs] def __invert__(self): return Range(self.stop, self.start)
def __add__(self, num): return Range(self.start + num, self.stop + num) def __radd__(self, num): return self + num def __mul__(self, num): return Range(self.start * num, self.stop * num) def __rmul__(self, num): return self * num def __truediv__(self, num): return num * (1/num) def __rtruediv__(self, num): raise "Doesn't make sense to do this!" def __round__(self): return Range(round(self.start), round(self.stop)) def __ceil__(self): return Range(math.ceil(self.start), math.ceil(self.stop)) def __floor__(self): return Range(math.floor(self.start), math.floor(self.stop)) def __repr__(self): return f"""A range of numbers: [{self.start}, {self.stop}]. Can do: - r.toUnit(x): will convert x from range [{self.start}, {self.stop}] to [0, 1] - r.fromUnit(x): will convert x from range [0, 1] to range [{self.start}, {self.stop}] - r.toRange([a, b], x): will convert x from range [{self.start}, {self.stop}] to range [a, b] - r[0], r[1], r.start, r.stop: get start and stop values of range Note: for conversion methods, you can pass in"""
def yieldLowest(r1s:Iterator[Range], r2s:Iterator[Range]): """Given 2 :class:`Range` generators with lengths a and b, yield every object (a + b) so that :class:`Range`s with smaller start point gets yielded first. Assumes that each generator: - Does not intersect with itself - Is sorted by start point already .. warning:: This method will sometimes yield the same objects given by the Iterators. Make sure you copy each :class:`Range` if your use case requires""" r1s = iter(r1s); r2s = iter(r2s) if (r1 := next(r1s, None)) is None: yield from r2s; return if (r2 := next(r2s, None)) is None: yield r1; yield from r1s; return while True: while r1.start <= r2.start: yield r1 r1 = next(r1s, None) if r1 is None: yield r2; yield from r2s; return while r2.start <= r1.start: yield r2 r2 = next(r2s, None) if r2 is None: yield r1; yield from r1s; return def join(r1s:Iterator[Range], r2s:Iterator[Range]): """Joins 2 :class:`Range` generators, so that overlaps gets merged together. .. warning:: This method will sometimes yield the same objects given by the Iterators. Make sure you copy each :class:`Range` if your use case requires""" it = yieldLowest(r1s, r2s) if (r := next(it, None)) is None: return while True: nr = next(it, None) if nr is None: yield r; return if r.stop >= nr.start: r = r.copy(); r.stop = max(r.stop, nr.stop) else: yield r; r = nr def neg(rs:List[Range]): """Returns R - rs, where R is the set of real numbers.""" rs = iter(rs) if (r := next(rs, None)) is None: yield Range(ninf, inf); return if ninf < r.start: yield Range(ninf, r.start) # check -inf case while True: start = r.stop if (r := next(rs, None)) is None: if start < inf: yield Range(start, inf) return yield Range(start, r.start)
[docs]class Domain:
[docs] def __init__(self, *ranges, dontCheck:bool=False): """Creates a new domain. :param ranges: each element is a :class:`Range`, although any format will be fine as this selects for that :param dontCheck: don't sanitize inputs, intended to boost perf internally only A domain is just an array of :class:`Range` that represents what intervals on the real number line is chosen. Some examples:: inf = float("inf") # shorthand for infinity Domain([5, 7.5], [2, 3]) # represents "[2, 3) U [5, 7.5)" Domain([2, 3.2], [3, 8]) # represents "[2, 8)" as overlaps are merged -Domain([2, 3]) # represents "(-inf, 2) U [3, inf)", so essentially R - d, with R being the set of real numbers -Domain([-inf, 3]) # represents "[3, inf)" Domain.fromInts(2, 3, 6) # represents "[2, 4) U [6, 7)" You can also do arithmetic on them, and check "in" oeprator:: Domain([2, 3]) + Domain([4, 5]) # represents "[2, 3) U [4, 5)" Domain([2, 3]) + Domain([2.9, 5]) # represents "[2, 5)", also merges overlaps 3 in Domain([2, 3]) # returns False 2 in Domain([2, 3]) # returns True""" if dontCheck: self.ranges = list(ranges); return # convert all to Range type, fix its order, and sort based on .start ranges = [(r if isinstance(r, Range) else Range(r)).fixOrder() for r in ranges] ranges = sorted(ranges, key=lambda r: r.start) # merges overlapping segments self.ranges = list(join(ranges, []))
[docs] @staticmethod def fromInts(*ints:List[int]): """Returns a new :class:`Domain` which has ranges [i, i+1] for each int given.""" return Domain(*(Range(i, i+1) for i in ints))
[docs] def copy(self): return Domain(*(r.copy() for r in self.ranges))
[docs] def intIter(self, step:int=1, start:int=0): """Yields ints in all ranges of this domain. If first range's domain is :math:`(-\inf, a)`, then starts at the specified integer""" if len(self.ranges) == 0: return for r in self.ranges: x = int(start) if r.start == -inf else int(r.start) while x < r.stop: yield x; x += step
def __neg__(self): return Domain(*neg(self.ranges), dontCheck=True) def __add__(self, domain): return Domain(*(r.copy() for r in join(self.ranges, domain.ranges)), dontCheck=True) def __sub__(self, domain): return self + (-domain) def __eq__(self, domain): return self.ranges == domain.ranges def __str__(self): return f"Domain: {', '.join(r for r in self.ranges)}" def __contains__(self, x): return any(x in r for r in self.ranges) def __repr__(self): rs = '\n'.join(f"- {r}" for r in self.ranges) return f"""Domain:\n{rs}\n\nCan: - 3 in d: check whether a number is in this domain or not - d1 + d2: joins 2 domain - -d: excludes the domain from R - d1 - d2: same as d1 + (-d2)"""
[docs]class AutoIncrement:
[docs] def __init__(self, initialValue:int=0): """Creates a new AutoIncrement object. Every time :attr:`value` is read, it gets incremented by 1 automatically.""" self.value = initialValue
[docs] @staticmethod def random() -> "AutoIncrement": """Creates a new AutoIncrement object that has a random integer initial value""" return AutoIncrement(random.randint(0, 1e9))
@property def value(self): self._value += 1; return self._value @value.setter def value(self, value): self._value = value
[docs]class Wrapper:
[docs] def __init__(self, value): """Creates a wrapper for some value. Get that value using :meth:`__call__`. This exists just so that Jupyter Lab's contextual help won't automatically display the (possibly humongous) value.""" self.value = value
def __call__(self): return self.value
Func = Callable[[float], float]
[docs]def polyfit(x:List[float], y:List[float], deg:int=6) -> Func: """Returns a function that approximate :math:`f(x) = y`. :param deg: degree of the polynomial of the returned function """ params = np.polyfit(x, y, deg) def _inner(_x): answer = np.zeros_like(_x, dtype=np.float) for expo, param in enumerate(params): answer += param * _x**(len(params)-expo-1) return answer return _inner
[docs]def derivative(f:Func, delta:float=1e-6) -> Func: """Returns the derivative of a function""" return lambda x: (f(x + delta) - f(x)) / delta
[docs]def optimize(f:Func, v:float=1, threshold:float=1e-6) -> float: """Given :math:`f(x) = 0`, solves for x, using initial value `v`""" fD = derivative(f) while abs(f(v)) > threshold: v = v - f(v)/fD(v) return v
[docs]def inverse(f:Func) -> Func: """Returns the inverse of a function. The inverse function takes a long time to run, so don't use this where you need lots of speed.""" return lambda y: optimize(lambda x: f(x) - y)
[docs]def integrate(f:Func, _range:Range) -> float: """Integrates a function over a range""" n = 1000; xs = np.linspace(*_range, n) return sum([f(x)*_range.delta/n for x in xs])
@patch(nn.Module) def importParams(self:nn.Module, params:List[nn.Parameter]): """Given a list of :class:`torch.nn.parameter.Parameter`/:class:`torch.Tensor`, update the current :class:`torch.nn.Module`'s parameters with it'""" for oldParam, newParam in zip(self.parameters(), params): oldParam.data = newParam.data.clone() @patch(nn.Module) def exportParams(self:nn.Module) -> List[torch.Tensor]: """Gets the list of :class:`torch.nn.parameter.Parameter` data""" return [param.data.clone() for param in self.parameters()] class ParamsContext: def __init__(self, m:nn.Module): self.m = m def __enter__(self): self.params = self.m.exportParams(); return self.params def __exit__(self, *ignored): self.m.importParams(self.params) @patch(nn.Module) def paramsContext(self:nn.Module) -> ContextManager: """A nice context manager for :meth:`importParams` and :meth:`exportParams`. Returns the old parameters on enter context.""" return ParamsContext(self) @patch(nn.Module) def getParamsVector(model:nn.Module) -> List[torch.Tensor]: """For each parameter, returns a normal distributed random tensor with the same standard deviation as the original parameter""" answer = [] for param in model.parameters(): a = torch.randn(param.shape).to(param.device) b = param.std() if param.numel() > 1 else 1 answer.append(a * b) return answer class _NnModuleDeviceContext: def __init__(self, nnModule): self.nnModule = nnModule def __enter__(self): self.devices = [p.device for p in self.nnModule.parameters()] def __exit__(self, *ignored): for p, device in zip(self.nnModule.parameters(), self.devices): p.data = p.to(device=device) @patch(nn.Module) def preserveDevice(self:nn.Module) -> ContextManager: """Preserves the device of whatever operation is inside this. Example:: import torch.nn as nn m = nn.Linear(3, 4) with m.preserveDevice(): m.cuda() # moves whole model to cuda # automatically moves model to cpu This will work even if the model has many tensors that live on 10 different devices.""" return _NnModuleDeviceContext(self)
[docs]def beep(): """Plays a beeping sound, may be useful as notification for long-running tasks""" try: import IPython; IPython.core.display.display_html(IPython.display.HTML("""<script>(new Audio('data:audio/wav;base64,UklGRl9vT19XQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YU'+Array(1e3).join(123))).play();</script>""")); except: import os; os.system("printf '\a'")
[docs]def executeNb(fileName:str, _globals:dict=None, preserveDir=False): """Executes a specified IPython notebook. Can make all variables defined in the notebook appear in the __main__ context by passing `globals()` in :param preserveDir: if True, don't change working directory to that of the notebook's""" import json, os if _globals is None: _globals = {} if not preserveDir: oldPath = os.getcwd() os.chdir(os.path.dirname(fileName)) def execute(): for cell in json.loads(open(fileName).read())["cells"]: if cell["cell_type"] == "code": lines = [] for line in cell["source"]: line = line.rstrip() if line.startswith("!"): continue if line.startswith("%%"): continue lines.append(line) lines = "\n".join(lines) try: exec(lines, _globals); plt.show() # clears plots except Exception as e: print("Problematic code block:\n") print(lines); raise e execute() if not preserveDir: os.chdir(oldPath)
[docs]def dontWrap(): """Don't wrap horizontally when in a notebook""" try: from IPython.core.display import display, HTML display(HTML("<style>div.jp-OutputArea-output pre {white-space: pre;}</style>")) display(HTML("<style>div.output_area pre {white-space: pre;}</style>")) except: print("Can't run dontWrap()")
[docs]class ignoreWarnings: """Context manager to ignore every warning. Example:: import warnings with k1lib.ignoreWarnings(): warnings.warn("some random stuff") # will not show anything """ def __enter__(self): self.ctx = warnings.catch_warnings() self.ctx.__enter__() warnings.simplefilter("ignore") def __exit__(self, *ignored): self.ctx.__exit__(*ignored)
[docs]def positionalEncode(t:torch.Tensor, richFactor:float=2) -> torch.Tensor: r"""Position encode a tensor of shape :math:`(L, F)`, where :math:`L` is the sequence length, :math:`F` is the encoded features. Will add the encodings directly to the input tensor and return it. This is a bit different from the standard implementations that ppl use. This is exactly: .. math:: p = \frac{i}{F\cdot richFactor} .. math:: w = 1/10000^p .. math:: pe = sin(w * L) With i from range [0, F). Example:: import matplotlib.pyplot as plt, torch, k1lib plt.figure(dpi=150) plt.imshow(k1lib.positionalEncode(torch.zeros(100, 10)).T) .. image:: images/positionalEncoding.png :param richFactor: the bigger, the richer the features are. A lot of times, I observe that the features that are meant to cover huge scales are pretty empty and don't really contribute anything useful. So this is to dump up the usefulness of those features""" seqN, featsN = t.shape feats = torch.tensor(range(featsN)); w = (1/10000**(feats/featsN/richFactor))[None, :].expand(t.shape) times = torch.tensor(range(seqN))[:, None].expand(t.shape) t[:] = torch.sin(w * times); return t
import asyncio, functools from threading import Timer as ThreadingTimer class AsyncTimer: # rename if want to use def __init__(self, timeout, callback): self._timeout = timeout; self._callback = callback async def _job(self): await asyncio.sleep(self._timeout); self._callback() def start(self): self._task = asyncio.ensure_future(self._job()) def cancel(self): self._task.cancel()
[docs]def debounce(wait, threading=False): """Decorator that will postpone a function's execution until after ``wait`` seconds have elapsed since the last time it was invoked. Taken from `ipywidgets <https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20Events.html>`_. Example:: import k1lib, time; value = 0 @k1lib.debounce(0.5, True) def f(x): global value; value = x**2 f(2); time.sleep(0.3); f(3) print(value) # prints "0" time.sleep(0.7) print(value) # prints "9" :param wait: wait time in seconds :param threading: if True, use multiple threads, else just use async stuff""" Timer = ThreadingTimer if threading else AsyncTimer def decorator(fn): timer = None def debounced(*args, **kwargs): nonlocal timer if timer is not None: timer.cancel() timer = Timer(wait, lambda: fn(*args, **kwargs)) timer.start() functools.update_wrapper(debounced, fn); return debounced return decorator