Source code for k1lib._basics

# AUTOGENERATED FILE! PLEASE DON'T EDIT
import torch, math, logging
import torch.nn as nn, numpy as np
from typing import Dict, Any, List, Union, Callable, overload, Tuple, ContextManager
__all__ = ["_docsUrl", "Object", "Range",
           "CancelRunException", "CancelEpochException", "CancelBatchException",
           "textToHtml", "clearLine", "tab", "isNumeric", "close", "stats",
           "patch", "squeeze", "raiseEx", "smooth", "numDigits", "limitLines",
           "limitChars", "showLog", "cleanDiv", "beep", "executeNb", "dontWrap",
           "Func", "polyfit", "derivative", "optimize", "inverse", "integrate"]
_docsUrl = "https://k1lib.github.io"
[docs]class Object: """Convenience class that acts like ``defaultdict``. You can use it like a normal object:: a = Object() a.b = 3 print(a.b) # outputs "3" ``__repr__()`` output is pretty nice too: .. code-block:: text <class '__main__.Object'>, with attrs: - b You can instantiate it from a dict:: a = Object.fromDict({"b": 3, "c": 4}) print(a.c) # outputs "4" And you can specify a default value, just like defaultdict:: a = Object().withAutoDeclare(lambda: []) a.texts.extend(["factorio", "world of warcraft"]) print(a.texts[0]) # outputs "factorio" .. warning:: Default values only work with variables that don't start with an underscore "_". Treating it like defaultdict is okay too:: a = Object().withAutoDeclare(lambda: []) a["movies"].append("dune") print(a.movies[0]) # outputs "dune" """ def __init__(self): self._defaultValueGenerator = None; self.repr = None
[docs] @staticmethod def fromDict(_dict:Dict[str, Any]): """Creates an object with attributes from a dictionary""" answer = Object(); answer.__dict__.update(_dict); return answer
@property def state(self) -> dict: """Essentially ``__dict__``, but only outputs the fields you defined. If your framework intentionally set some attributes, those will be reported too, so beware""" answer = dict(self.__dict__); del answer["_defaultValueGenerator"] del answer["repr"]; answer.pop("getdoc", None); return answer
[docs] def withAutoDeclare(self, defaultValueGenerator): """Sets this Object up so that if a field doesn't exist, it will automatically create it with a default value.""" self._defaultValueGenerator = defaultValueGenerator; return self
def __getitem__(self, idx): return getattr(self, idx) def __setitem__(self, idx, value): setattr(self, idx, value) def __contains__(self, item:str): return item in self.__dict__ def __getattr__(self, attr): if attr.startswith("_"): raise AttributeError() if self._defaultValueGenerator != None: self.__dict__[attr] = self._defaultValueGenerator() return self.__dict__[attr] raise AttributeError
[docs] def withRepr(self, _repr:str): """Specify output of ``__repr__()``. Legacy code. You can just monkey patch it instead.""" self.repr = _repr; return self
def __repr__(self): _dict = "\n".join([f"- {k}" for k in self.state.keys()]) return self.repr or f"{type(self)}, with attrs:\n{_dict}"
[docs]class CancelRunException(Exception): """Used in core training loop, to skip the run entirely""" pass
[docs]class CancelEpochException(Exception): """Used in core training loop, to skip to next epoch""" pass
[docs]class CancelBatchException(Exception): """Used in core training loop, to skip to next batch""" pass
[docs]def textToHtml(text:str) -> str: """Transform a string so that it looks the same on browsers as in `print()`""" return text.replace("\n", "<br>").replace(" ", "&nbsp;")
[docs]def clearLine(): """Prints a character that clears the current line""" print("\r" + " "*80 + "\r", end="")
[docs]def tab(text:Union[list, str]) -> Union[list, str]: """Adds a tab before each line. str in, str out. List in, list out""" if isinstance(text, str): return "\n".join([" " + line for line in text.split("\n")]) else: return [" " + line for line in text]
[docs]def isNumeric(x) -> bool: """Returns whether object is actually a number""" return isinstance(x, (int, float, np.float, np.int))
[docs]def close(a, b): """Returns whether 2 values are really close to each other""" if isNumeric(a) and isNumeric(b): return abs(a - b) < 1e-6 return torch.allclose(torch.tensor(a), torch.tensor(b))
[docs]def stats(x) -> Tuple[float, float]: """Returns the mean and standard deviation of the input""" return x.mean(), x.std()
[docs]def patch(_class:type, name:str=None, docs:Union[str, Any]=None, static=False): """Patches a function to a class/object. :param _class: object to patch function. Can also be a type :param name: name of patched function, if different from current :param docs: docs of patched function. Can be object with __doc__ attr :param static: whether to wrap this inside :class:`staticmethod` or not Intended to be used like this:: class A: def methA(self): return "inside methA" @patch(A) def methB(self): return "inside methB" a = A() print(a.methB()) # will print "inside methB" """ def inner(function): _docs = docs if _docs is not None and not isinstance(_docs, str): _docs = _docs.__doc__ _docs = _docs or function.__doc__ or _class.__doc__ _name = name or function.__qualname__.split(".")[-1] _function = staticmethod(function) if static else function _function.__doc__ = _docs; setattr(_class, _name, _function); return _function return inner
[docs]def squeeze(_list:Union[list, tuple, torch.Tensor, Any], hard=False): """If list only has 1 element, rethrn that element, else return original list :param hard: If True, then if list/tuple, filters out None, and takes the first element out even if that list/tuple has more than 1 element""" if isinstance(_list, (tuple, list)): if hard: return [e for e in _list if e != None and e != ""][0] elif len(_list) == 1: return _list[0] if isinstance(_list, torch.Tensor): return _list.squeeze() return _list
[docs]def raiseEx(ex:Exception): """Raises a specific exception. May be useful in lambdas""" raise ex
[docs]def smooth(arr:List[float], consecutives:int=5) -> List[float]: """Smoothes out array, so that y values are averages of the neighbors""" answer = []; s = 0 for i, elem in enumerate(arr): s += elem if (i + 1) % consecutives == 0: answer.append(s / consecutives); s = 0 return answer
[docs]def numDigits(num) -> int: """Get the number of digits/characters of this number/object""" return len(f"{num}")
[docs]def limitLines(s:str, limit:int=10) -> str: """If input string is too long, truncates it and adds ellipsis""" if len(splits := s.split("\n")) > limit: return "\n".join(splits[:limit]) + "\n....." else: return s
[docs]def limitChars(s:str, limit:int=50): """If input string is too long, truncates to first `limit` characters of the first line""" if s is None: return "" s = f"{s}".split("\n")[0] return s[:limit] + "..." if len(s) > limit-3 else s
[docs]def showLog(loggerName:str="", level:int=logging.DEBUG): """Prints out logs of a particular logger at a particular level""" logger = logging.getLogger(loggerName); logger.setLevel(level) sh = logging.StreamHandler(); sh.setLevel(logging.DEBUG); logger.addHandler(sh)
def cleanDiv(_list:List[float], total:int) -> List[int]: """Cleanly divides :total: into int segments with weights specified by :_list:. Looks like this: ``(_list / _list.sum()) * total``, but everything is an ``int``""" _list = np.array(_list) _list = (_list*total/_list.sum()).astype(int) _list[-1] = total - _list[:-1].sum() return _list
[docs]class Range: """A range of numbers. It's just 2 numbers really: start and stop This is essentially a convenience class to provide a nice, clean abstraction and to eliminate errors. You can transform values:: Range(10, 20).toUnit(13) # returns 0.3 Range(10, 20).fromUnit(0.3) # returns 13 Range(10, 20).toRange(Range(20, 10), 13) # returns 17 You can also do random math operations on it:: (Range(10, 20) * 2 + 3) == Range(23, 43) # returns True Range(10, 20) == ~Range(20, 10) # returns True"""
[docs] def __init__(self, start=0, stop=None): """Creates a new Range. There are different ``__init__`` functions for many situations: - Range(2, 11.1): create range [2, 11.1] - Range(15.2): creates range [0, 15.2] - Range(Range(2, 3)): create range [2, 3]. This serves as sort of a catch-all - Range(slice(2, 5, 2)): creates range [2, 5]. Can also be a :class:`range` - Range(slice(2, -1), 10): creates range [2, 9] - Range([1, 2, 7, 5]): creates range [1, 5]. Can also be a tuple """ if (isNumeric(start) and isNumeric(stop)): self.start, self.stop = start, stop elif isNumeric(start) and stop == None: self.start, self.stop = 0, start elif stop == None and isinstance(start, (range, slice, Range)): self.start, self.stop = start.start, start.stop elif isNumeric(stop) and isinstance(start, slice): r = range(stop)[start]; self.start, self.stop = r.start, r.stop elif isinstance(start, (list, tuple)): self.start, self.stop = start[0], start[-1] else: raise AttributeError(f"Don't understand {start} and {stop}") self.delta = self.stop - self.start
[docs] def __getitem__(self, index): """0 for start, 1 for stop You can also pass in a :class:`slice` object, in which case, a range subset will be returned. Code kinda looks like this:: range(start, stop)[index] """ if index == 0: return self.start if index == 1: return self.stop if type(index) == slice: return Range(range(self.start, self.stop)[index]) raise Exception(f"Can't get index {index} of range [{self.start}, {self.stop}]")
def _common(self, x, f:Callable[[float], float]): if isNumeric(x): return f(x) if isinstance(x, (list, tuple)): return [self._common(elem, f) for elem in x] if isinstance(x, (range, slice, Range)): return Range(self._common(x.start if x.start != None else 0, f), self._common(x.stop if x.stop != None else 1, f)) raise AttributeError(f"Doesn't understand {x}") def __iter__(self): yield self.start; yield self.stop
[docs] def toUnit(self, x): """Converts x from current range to [0, 1] range. Example:: r = Range(2, 10) r.toUnit(5) # will return 0.375, as that is (5-2)/(10-2) You can actually pass in a lot in place of x:: r = Range(0, 10) r.toUnit([5, 3, 6]) # will be [0.5, 0.3, 0.6]. Can also be a tuple r.toUnit(slice(5, 6)) # will be slice(0.5, 0.6). Can also be a range, or Range .. note:: In the last case, if ``start`` is None, it gets defaulted to 0, and if ``end`` is None, it gets defaulted to 1 """ def f(x): if self.delta == 0: return float("nan") return (x - self.start) / self.delta return self._common(x, lambda x: float("nan") if self.delta == 0 else (x - self.start) / self.delta)
[docs] def fromUnit(self, x): """Converts x from [0, 1] range to this range. Example:: r = Range(0, 10) r.fromUnit(0.3) # will return 3 x can be a lot of things, see :meth:`toUnit` for more""" return self._common(x, lambda x: x * self.delta + self.start)
[docs] def toRange(self, _range:"Range", x): """Converts x from current range to another range. Example:: r = Range(0, 10) r.toRange(Range(0, 100), 6) # will return 60 x can be a lot of things, see :meth:`toUnit` for more.""" return self._common(x, lambda x: Range(_range).fromUnit(self.toUnit(x)))
@property def range_(self): """Returns a :class:`range` object with start and stop values rounded off""" return range(math.floor(self.start+0.001), math.floor(self.stop+0.001)) @property def slice_(self): """Returns a :class:`slice` object with start and stop values rounded off""" return slice(math.floor(self.start+0.001), math.floor(self.stop+0.001))
[docs] @staticmethod def proportionalSlice(r1, r2, r1Slice:slice) -> Tuple["Range", "Range"]: """Slices r1 and r2 proportionally. Best to explain using an example. Let's say you have 2 arrays created from a time-dependent procedure like this:: a = []; b = [] for t in range(100): if t % 3 == 0: a.append(t) if t % 5 == 0: b.append(1 - t) len(a), len(b) # returns (34, 20) a and b are of different lengths, but you want to plot both from 30% mark to 50% mark (for a, it's elements 10 -> 17, for b it's 6 -> 10), as they are time-dependent. As you can probably tell, to get the indicies 10, 17, 6, 10 is messy. So, you can do something like this instead:: r1, r2 = Range.proportionalSlice(Range(len(a)), Range(len(b)), slice(10, 17)) This will return the Ranges [10, 17] and [5.88, 10] Then, you can plot both of them side by side like this:: fig, axes = plt.subplots(ncols=2) axes[0].plot(r1.range_, a[r1.slice_]) axes[1].plot(r2.range_, a[r2.slice_]) """ r1, r2 = Range(r1), Range(r2) ar2 = r1.toRange(r2, (ar1 := r1[r1Slice])) return ar1, ar2
[docs] def bound(self, rs:Union[range, slice]) -> Union[range, slice]: """If input range|slice's stop and start is missing, then use this range's start and stop instead.""" start = rs.start or self.start stop = rs.stop or self.stop return type(rs)(start, stop)
[docs] def copy(self): return Range(self.start, self.stop)
def __str__(self): return f"[{self.start}, {self.stop}]" def __eq__(self, _range): _range = Range(_range) return abs(_range.start - self.start) < 1e-9 and abs(_range.stop - self.stop) < 1e-9 def __contains__(self, x:float): return x >= self.start and x < self.stop def __neg__(self): return Range(-self.start, -self.stop)
[docs] def __invert__(self): return Range(self.stop, self.start)
def __add__(self, num): return Range(self.start + num, self.stop + num) def __radd__(self, num): return self + num def __mul__(self, num): return Range(self.start * num, self.stop * num) def __rmul__(self, num): return self * num def __truediv__(self, num): return num * (1/num) def __rtruediv__(self, num): raise "Doesn't make sense to do this!" def __round__(self): return Range(round(self.start), round(self.stop)) def __ceil__(self): return Range(math.ceil(self.start), math.ceil(self.stop)) def __floor__(self): return Range(math.floor(self.start), math.floor(self.stop)) def __repr__(self): return f"""A range of numbers: [{self.start}, {self.stop}]. Can do: - r.toUnit(x): will convert x from range [{self.start}, {self.stop}] to [0, 1] - r.fromUnit(x): will convert x from range [0, 1] to range [{self.start}, {self.stop}] - r.toRange([a, b], x): will convert x from range [{self.start}, {self.stop}] to range [a, b] - r[0], r[1], r.start, r.stop: get start and stop values of range Note: for conversion methods, you can pass in"""
Func = Callable[[float], float]
[docs]def polyfit(x:List[float], y:List[float], deg:int=6) -> Func: """Returns a function that approximate :math:`f(x) = y`. :param deg: degree of the polynomial of the returned function """ params = np.polyfit(x, y, deg) def _inner(_x): answer = np.zeros_like(_x, dtype=np.float) for expo, param in enumerate(params): answer += param * _x**(len(params)-expo-1) return answer return _inner
[docs]def derivative(f:Func, delta:float=1e-6) -> Func: """Returns the derivative of a function""" return lambda x: (f(x + delta) - f(x)) / delta
[docs]def optimize(f:Func, v:float=1, threshold:float=1e-6) -> float: """Given :math:`f(x) = 0`, solves for x, using initial value `v`""" fD = derivative(f) while abs(f(v)) > threshold: v = v - f(v)/fD(v) return v
[docs]def inverse(f:Func) -> Func: """Returns the inverse of a function. The inverse function takes a long time to run, so don't use this where you need lots of speed.""" return lambda y: optimize(lambda x: f(x) - y)
[docs]def integrate(f:Func, _range:Range) -> float: """Integrates a function over a range""" n = 1000; xs = np.linspace(*_range, n) return sum([f(x)*_range.delta/n for x in xs])
@patch(nn.Module, name="importParams") def _importParams(self:nn.Module, params:List[nn.Parameter]): """Given a list of :class:`torch.nn.parameter.Parameter`/:class:`torch.Tensor`, update the current :class:`torch.nn.Module`'s parameters with it'""" for oldParam, newParam in zip(self.parameters(), params): oldParam.data = newParam.data.clone() @patch(nn.Module, name="exportParams") def _exportParams(self:nn.Module) -> List[torch.Tensor]: """Gets the list of :class:`torch.nn.parameter.Parameter` data""" return [param.data.clone() for param in self.parameters()] @patch(nn.Module, name="getParamsVector") def _getParamsVector(model:nn.Module) -> List[torch.Tensor]: """For each parameter, returns a normal distributed random tensor with the same standard deviation as the original parameter""" answer = [] for param in model.parameters(): a = torch.randn(param.shape).to(param.device) b = param.std() if param.numel() > 1 else 1 answer.append(a * b) return answer class _NnModuleDeviceContext: def __init__(self, nnModule): self.nnModule = nnModule def __enter__(self): self.devices = [p.device for p in self.nnModule.parameters()] def __exit__(self, *ignored): for p, device in zip(self.nnModule.parameters(), self.devices): p.data = p.to(device=device) @patch(nn.Module, name="preserveDevice") def _preserveDevice(self:nn.Module) -> ContextManager: """Preserves the device of whatever operation is inside this. Example:: import torch.nn as nn m = nn.Linear(3, 4) with m.preserveDevice(): m.cuda() # moves whole model to cuda # automatically moves model to cpu This will work even if the model has many tensors that live on 10 different devices.""" return _NnModuleDeviceContext(self)
[docs]def beep(): """Plays a beeping sound, may be useful as notification for long-running tasks""" try: import IPython; IPython.core.display.display_html(IPython.display.HTML("""<script>(new Audio('data:audio/wav;base64,UklGRl9vT19XQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YU'+Array(1e3).join(123))).play();</script>""")); except: import os; os.system("printf '\a'")
[docs]def executeNb(fileName:str, _globals:dict={}): """Executes a specified IPython notebook. Can make all variables defined in the notebook appear in the __main__ context by passing `globals()` in""" import json try: exec("\n".join([line.rstrip() for cell in json.loads(open(fileName).read())["cells"] for line in cell["source"] if cell["cell_type"] == "code"]), _globals) except Exception as e: print(f"Msg {e.msg}, line {e.lineno}:\n\n{e.text}")
[docs]def dontWrap(): """Don't wrap horizontally when in a notebook""" try: from IPython.core.display import display, HTML display(HTML("<style>div.jp-OutputArea-output pre {white-space: pre;}</style>")) display(HTML("<style>div.output_area pre {white-space: pre;}</style>")) except: print("Can't run dontWrap()")