Source code for k1lib.cli.utils

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for all short and random quality-of-life utilities."""
from k1lib.cli.init import patchDefaultDelim, BaseCli, yieldT
import k1lib.cli as cli, k1lib.cli.init as init, numbers, numpy as np, dis
from k1lib.cli.typehint import *
from typing import overload, Iterator, Any, List, Set, Union, Callable
import k1lib, time, math, os, json
from collections import defaultdict
try: import torch; hasTorch = True
except: torch = k1lib.Object().withAutoDeclare(lambda: type("RandomClass", (object, ), {})); hasTorch = False
try: import PIL; hasPIL = True
except: hasPIL = False
try: plt = k1lib.dep("matplotlib.pyplot")
except: pass
__all__ = ["size", "shape", "item", "rItem", "iden", "join", "wrapList",
           "equals", "reverse", "ignore", "rateLimit", "timeLimit", "tab", "indent",
           "clipboard", "deref", "bindec", "smooth", "disassemble",
           "tree", "lookup", "dictFields", "backup", "sketch", "syncStepper", "zeroes", "normalize"]
settings = k1lib.settings.cli
def exploreSize(it):                                                             # exploreSize
    """Returns first element and length of array. Returns [first item, length]""" # exploreSize
    if isinstance(it, str): return None, len(it)                                 # exploreSize
    try: return it[0], len(it)                                                   # exploreSize
    except: pass                                                                 # exploreSize
    sentinel = object(); it = iter(it)                                           # exploreSize
    o = next(it, sentinel); count = 1                                            # exploreSize
    if o is sentinel: return None, 0                                             # exploreSize
    try:                                                                         # exploreSize
        while True: next(it); count += 1                                         # exploreSize
    except StopIteration: pass                                                   # exploreSize
    return o, count                                                              # exploreSize
[docs]class size(BaseCli): # size
[docs] def __init__(self, idx=None): # size """Returns number of rows and columns in the input. Example:: # returns (3, 2) [[2, 3], [4, 5, 6], [3]] | shape() # returns 3 [[2, 3], [4, 5, 6], [3]] | shape(0) # returns 2 [[2, 3], [4, 5, 6], [3]] | shape(1) # returns (2, 0) [[], [2, 3]] | shape() # returns (3,) [2, 3, 5] | shape() # returns 3 [2, 3, 5] | shape(0) # returns (3, 2, 2) [[[2, 1], [0, 6, 7]], 3, 5] | shape() # returns (1, 3) ["abc"] | shape() # returns (1, 2, 3) [torch.randn(2, 3)] | shape() # returns (2, 3, 5) shape()(np.random.randn(2, 3, 5)) :class:`shape` is an alias of this cli. Use whichever is more intuitive for you. There's also :class:`lengths`, which is sort of a simplified/faster version of this, but only use it if you are sure that ``len(it)`` can be called. :param idx: if not specified, returns a tuple of ints. If specified, then returns the specific index of the tuple""" # size super().__init__(); self.idx = idx; # size if idx is not None: self._f = cli.item(idx) # size
def _all_array_opt(self, it, level): return np.array(it.shape[level:])[tuple([None]*level)] + np.zeros(it.shape[:level], dtype=int)[(*[slice(None)]*level, None)] # size def _typehint(self, inp): # size if self.idx is not None: return int # size return tList(int) # size
[docs] def __ror__(self, it:Iterator[str]): # size idx = self.idx # size if idx == 0: # size try: return len(it) # size except: # size try: return exploreSize(it)[1] # size except: pass # size if hasPIL and isinstance(it, PIL.Image.Image): # size return it.size if idx is None else it.size[idx] # size if idx is None: # size answer = [] # size try: # size while True: # size if isinstance(it, settings.arrayTypes): # size return tuple(answer + list(it.shape)) # size it, s = exploreSize(it); answer.append(s) # size except TypeError: pass # size return tuple(answer) # size return exploreSize(it | self._f)[1] # size
def _jsF(self, meta): # size fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # size post = "" if self.idx is None else f"[{cli.kjs.v(self.idx)}]" # size return f"const {fIdx} = ({dataIdx}) => {dataIdx}.shape(){post}", fIdx # size
shape = size # size noFill = object() # size
[docs]class item(BaseCli): # item
[docs] def __init__(self, amt:int=1, fill=noFill): # item """Returns the first element of the input iterator. Example:: # returns 0 range(5) | item() # returns torch.Size([5]) torch.randn(3,4,5) | item(2) | shape() # returns 3 [] | item(fill=3) :param amt: how many times do you want to call item() back to back? :param fill: if iterator length is 0, return this""" # item self.amt = amt; self.fill = fill # item self.fillP = [fill] if fill != noFill else [] # preprocessed, to be faster # item if self.amt != 1: self._f = cli.serial(*(item(fill=self.fill) for _ in range(self.amt))) # item
def _all_array_opt(self, it, level): return it[(*[slice(None, None, None) for i in range(level)], 0)] # item def _typehint(self, inp): # item if isinstance(inp, tListIterSet): return inp.child # item if isinstance(inp, tCollection): return inp.children[0] # item if isinstance(inp, tArrayTypes): # item if inp.rank is None: return inp.__class__(inp.child, None) # item if inp.rank - self.amt >= 1: return inp.__class__(inp.child, inp.rank-self.amt) # item return inp.child # item return tAny() # item
[docs] def __ror__(self, it:Iterator[str]): # item if self.amt != 1: return it | self._f # item return next(iter(it), *self.fillP) # item
def _jsF(self, meta): # item fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # item return f"const {fIdx} = ({dataIdx}) => {dataIdx}[0]", fIdx # item
[docs]class rItem(BaseCli): # rItem
[docs] def __init__(self, idx:int): # rItem """Combines ``rows(idx) | item()``, as this is a pretty common pattern. Example:: iter(range(10)) | rItem(4) # returns 4 """ # rItem self.idx = idx; self.arrayTypes = (*settings.arrayTypes, list, tuple) # rItem
def _all_array_opt(self, it, level:int): return it[(*[slice(None, None, None) for i in range(level)], self.idx)] # rItem
[docs] def __ror__(self, it): # rItem if isinstance(it, self.arrayTypes): return it[self.idx] # rItem for i, e in zip(range(self.idx+1), it): pass # rItem return e # rItem
def _jsF(self, meta): # rItem fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # rItem return f"const {fIdx} = ({dataIdx}) => {dataIdx}[{cli.kjs.v(self.idx)}]", fIdx # rItem
[docs]class iden(BaseCli): # iden
[docs] def __init__(self): # iden """Yields whatever the input is. Useful for multiple streams. Example:: # returns range(5) range(5) | iden()""" # iden super().__init__() # iden
def _all_array_opt(self, it, level): return it # iden def _typehint(self, inp): return inp # iden
[docs] def __ror__(self, it:Iterator[Any]): return it # iden
def _jsF(self, meta): # iden fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # iden return f"const {fIdx} = ({dataIdx}) => {dataIdx}", fIdx # iden
[docs]class join(BaseCli): # join
[docs] def __init__(self, delim:str=None): # join r"""Merges all strings into 1, with `delim` in the middle. Basically :meth:`str.join`. Example:: # returns '2\na' [2, "a"] | join("\n")""" # join super().__init__(); self.delim = patchDefaultDelim(delim) # join
def _typehint(self, inp): return str # join
[docs] def __ror__(self, it:Iterator[str]): # join return self.delim.join(it | cli.apply(str)) # join
def _jsF(self, meta): # join fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # join return f"const {fIdx} = ({dataIdx}) => {dataIdx}.join({json.dumps(self.delim)})", fIdx # join
[docs]class wrapList(BaseCli): # wrapList
[docs] def __init__(self): # wrapList """Wraps inputs inside a list. There's a more advanced cli tool built from this, which is :meth:`~k1lib.cli.structural.unsqueeze`. Example:: # returns [5] 5 | wrapList()""" # wrapList super().__init__() # wrapList
def _all_array_opt(self, it, level): return it[(*[slice(None)]*level, None)] # wrapList def _typehint(self, inp): return tList(inp) # wrapList
[docs] def __ror__(self, it) -> List[Any]: # wrapList if isinstance(it, settings.arrayTypes): return it[None] # wrapList return [it] # wrapList
def _jsF(self, meta): # wrapList fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # wrapList return f"const {fIdx} = ({dataIdx}) => [{dataIdx}]", fIdx # wrapList
class _EarlyExp(Exception): pass # _EarlyExp
[docs]class equals: # equals
[docs] def __init__(self): # equals """Checks if all incoming columns/streams are identical""" # equals super().__init__() # equals
[docs] def __ror__(self, streams:Iterator[Iterator[str]]): # equals streams = list(streams) # equals for row in zip(*streams): # equals sampleElem = row[0] # equals try: # equals for elem in row: # equals if sampleElem != elem: yield False; raise _EarlyExp() # equals yield True # equals except _EarlyExp: pass # equals
[docs]class reverse(BaseCli): # reverse
[docs] def __init__(self): # reverse """Reverses incoming list. Example:: # returns [3, 5, 2] [2, 5, 3] | reverse() | deref()""" # reverse super().__init__() # reverse
def _all_array_opt(self, it, level): return it[(*[slice(None)]*level, slice(None, None, -1))] # reverse def _typehint(self, inp): # reverse if isinstance(inp, tListIterSet): return tIter(inp.child) # reverse return tAny() # reverse
[docs] def __ror__(self, it:Iterator[str]) -> List[str]: # reverse if isinstance(it, settings.arrayTypes): return it[::-1] # reverse return reversed(list(it)) # reverse
def _jsF(self, meta): # reverse fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # reverse return f"const {fIdx} = ({dataIdx}) => [...{dataIdx}].reverse()", fIdx # reverse
[docs]class ignore(BaseCli): # ignore
[docs] def __init__(self): # ignore r"""Just loops through everything, ignoring the output. Example:: # will just return an iterator, and not print anything [2, 3] | apply(lambda x: print(x)) # will prints "2\n3" [2, 3] | apply(lambda x: print(x)) | ignore()""" # ignore super().__init__() # ignore
def _all_array_opt(self, it, level): return it # ignore def _typehint(self, inp): return type(None) # ignore
[docs] def __ror__(self, it:Iterator[Any]): # ignore if isinstance(it, settings.arrayTypes): return # ignore for _ in it: pass # ignore
def _jsF(self, meta): # ignore fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # ignore return f"const {fIdx} = ({dataIdx}) => {dataIdx}", fIdx # ignore
[docs]class rateLimit(BaseCli): # rateLimit
[docs] def __init__(self, f, delay=0.1): # rateLimit """Limits the execution flow rate upon a condition. Example:: s = 0; semaphore = 0 def heavyAsyncOperation(i): global semaphore, s semaphore += 1 s += i; time.sleep(1) semaphore -= 1; return i**2 # returns (20,), takes 1s to run range(20) | applyTh(heavyAsyncOperation, 100) | shape() # returns (20,), takes 4s to run (20/5 = 4) range(20) | rateLimit(lambda: semaphore < 5) | applyTh(heavyAsyncOperation, 100) | shape() The first test case is not rate-limited, so it will run all 20 threads at the same time, and all of them will finish after 1 second. The second test case is rate-limited, so that there can only be 5 concurrently executing threads because of the semaphore count check. Therefore this takes around 4 seconds to run. :param f: checking function. Should return true if execution is allowed :param delay: delay in seconds between calling ``f()``""" # rateLimit self.f = f; self.delay = delay # rateLimit
def _typehint(self, inp): # rateLimit if isinstance(inp, tListIterSet): return tIter(inp.child) # rateLimit if isinstance(inp, tArrayTypes): # rateLimit if inp.rank is None: return tIter(inp) # rateLimit if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1)) # rateLimit return tIter(inp.child) # rateLimit if isinstance(inp, tCollection): return inp # rateLimit return tAny() # rateLimit
[docs] def __ror__(self, it): # rateLimit f = self.f; delay = self.delay # rateLimit for e in it: # rateLimit while not f(): time.sleep(delay) # rateLimit yield e # rateLimit
[docs] @staticmethod # rateLimit def cpu(maxUtilization=90): # rateLimit """Limits flow rate when cpu utilization is more than a specified percentage amount. Needs to install the package ``psutil`` to actually work. Example:: # returns [0, 1, 4, 9, 16] range(5) | rateLimit.cpu() | apply(op()**2) | deref()""" # rateLimit import psutil # rateLimit return rateLimit(lambda: psutil.cpu_percent() < maxUtilization) # rateLimit
[docs]class timeLimit(BaseCli): # timeLimit
[docs] def __init__(self, t): # timeLimit """Caps the flow after a specified amount of time has passed. Example:: # returns 20, or roughly close to that repeatF(lambda: time.sleep(0.1)) | timeLimit(2) | shape(0)""" # timeLimit self.t = t # timeLimit
def _typehint(self, inp): # timeLimit if isinstance(inp, tListIterSet): return tIter(inp.child) # timeLimit if isinstance(inp, tArrayTypes): # timeLimit if inp.rank is None: return tIter(inp) # timeLimit if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1)) # timeLimit return tIter(inp.child) # timeLimit if isinstance(inp, tCollection): return inp # timeLimit return tAny() # timeLimit
[docs] def __ror__(self, it): # timeLimit _time = time.time; endTime = _time() + self.t # timeLimit for e in it: # timeLimit yield e # timeLimit if _time() > endTime: break # timeLimit
[docs]def tab(pad:str=" "*4): # tab """Indents incoming string iterator. Example:: # prints out indented 0 to 9 range(10) | tab() | headOut()""" # tab return cli.apply(lambda x: f"{pad}{x}") # tab
indent = tab # tab
[docs]class clipboard(BaseCli): # clipboard
[docs] def __init__(self): # clipboard """Saves the input to clipboard. Example:: # copies "abc" into the clipboard. Just use Ctrl+V to paste as usual "abc" | clipboard()""" # clipboard import pyperclip; self.pyperclip = pyperclip # clipboard
def _typehint(self, inp): return type(None) # clipboard
[docs] def __ror__(self, s): self.pyperclip.copy(s) # clipboard
a = [numbers.Number, np.number, str, bool, bytes, k1lib.UValue, cli.conv.Audio] # clipboard if hasTorch: a.append(torch.nn.Module) # clipboard settings.atomic.add("deref", tuple(a), "used by deref") # clipboard Tensor = torch.Tensor; atomic = settings.atomic # clipboard class inv_dereference(BaseCli): # inv_dereference def __init__(self, igT=False): # inv_dereference """Kinda the inverse to :class:`dereference`""" # inv_dereference super().__init__(); self.igT = igT # inv_dereference def __ror__(self, it:Iterator[Any]) -> List[Any]: # inv_dereference for e in it: # inv_dereference if e is None or isinstance(e, atomic.deref): yield e # inv_dereference elif isinstance(e, settings.arrayTypes): # inv_dereference if not self.igT and len(e.shape) == 0: yield e.item() # inv_dereference else: yield e # inv_dereference else: # inv_dereference try: yield e | self # inv_dereference except: yield e # inv_dereference
[docs]class deref(BaseCli): # deref
[docs] def __init__(self, maxDepth=float("inf"), igT=True): # deref """Recursively converts any iterator into a list. Example:: # returns something like "<range_iterator at 0x7fa8c52ca870>" iter(range(5)) # returns [0, 1, 2, 3, 4] iter(range(5)) | deref() # returns [2, 3], yieldT stops things early [2, 3, yieldT, 6] | deref() You can also specify a ``maxDepth``:: # returns something like "<list_iterator at 0x7f810cf0fdc0>" iter([range(3)]) | deref(0) # returns [range(3)] iter([range(3)]) | deref(1) # returns [[0, 1, 2]] iter([range(3)]) | deref(2) There are a few classes/types that are considered atomic, and :class:`deref` will never try to iterate over it. If you wish to change it, do something like:: settings.cli.atomic.deref = (int, float, ...) :param maxDepth: maximum depth to dereference. Starts at 0 for not doing anything at all :param igT: short for "ignore tensor". If True, then don't loop over :class:`torch.Tensor` and :class:`numpy.ndarray` internals""" # deref super().__init__(); self.igT = igT # deref self.maxDepth = maxDepth; self.depth = 0 # deref if hasTorch: # deref self.arrayType = (torch.Tensor, np.ndarray) if k1lib.settings.startup.or_patch.numpy else torch.Tensor # deref else: self.arrayType = (np.ndarray,) if k1lib.settings.startup.or_patch.numpy else () # deref
def _typehint(self, inp, depth=float("inf")): # deref if depth == 0: return inp # deref if depth == float("inf"): depth = self.maxDepth # deref if isinstance(inp, type) and issubclass(inp, atomic.deref): return inp # deref if isinstance(inp, tArrayTypes): # deref if self.igT: return inp # deref if inp.rank is None: return tList(tAny()) # deref if inp.rank == 1: # deref if isinstance(inp, tTensor): # deref return tList(type(torch.tensor(3, dtype=inp.child).item())) # deref if isinstance(inp, tNpArray): # deref return tList(type(np.array(3, dtype=inp.child).item())) # deref return tList(self._typehint(inp.item(), depth-1)) # deref if isinstance(inp, tListIterSet): # deref return tList(self._typehint(inp.child, depth-1)) # deref if isinstance(inp, tCollection): # deref return tCollection(*(self._typehint(e, depth-1) for e in inp.children)) # deref return tAny() # deref
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: # deref if self.depth >= self.maxDepth: return it # deref elif isinstance(it, np.number): return it.item() # deref elif isinstance(it, atomic.deref): return it # deref elif isinstance(it, self.arrayType): # deref if self.igT: return it # deref if len(it.shape) == 0: return it.item() # deref elif isinstance(it, dict): self.depth += 1; _d = {k: self.__ror__(v) for k, v in it.items()}; self.depth -= 1; return _d # deref elif isinstance(it, tuple): self.depth += 1; _t = tuple(self.__ror__(k) for k in it); self.depth -= 1; return _t # deref elif isinstance(it, set): self.depth += 1; _s = set (self.__ror__(k) for k in it); self.depth -= 1; return _s # deref try: iter(it) # deref except: return it # deref self.depth += 1; answer = [] # deref for e in it: # deref if e is cli.yieldT: return answer # deref answer.append(self.__ror__(e)) # deref self.depth -= 1; return answer # deref
[docs] def __invert__(self) -> BaseCli: # deref """Returns a :class:`~k1lib.cli.init.BaseCli` that makes everything an iterator. Not entirely sure when this comes in handy, but it's there.""" # deref return inv_dereference(self.igT) # deref
def _jsF(self, meta): # deref fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # deref return f"const {fIdx} = ({dataIdx}) => {dataIdx}", fIdx # deref
[docs] @staticmethod # deref def js(): # deref """Deref incoming object and turn them into a js object (NOT json string!). Example:: # returns "[...Array(10).keys()]" range(10) | deref.json() How does it know to transpile it? Based on the dictionary at `settings.cli.kjs.jsonF` and the object's "._jsonF" function. Say you have a custom list object, you can do something like this:: class CustomList: def __init__(self): ... def _jsonF(self): return "your js string here" Or, you can do something like this:: class CustomList: ... settings.cli.kjs.jsonF[CustomList] = lambda obj: "your js string here" A variety of data types are included out of the box already for common types, view the source code of this method to check them out.""" # deref jsonF = settings.kjs.jsonF # deref jsonF[list] = lambda x: "[" + ", ".join([deref_js(e) for e in x]) + "]" # deref jsonF[str] = lambda x: json.dumps(x) # deref jsonF[tuple] = jsonF[list]; jsonF[set] = lambda x: "new Set(" + jsonF[list](x) + ")" # deref jsonF[type(None)] = lambda x: "null" # deref jsonF[np.ndarray] = lambda x: json.dumps(x | deref(igT=False)) # deref if hasTorch: jsonF[torch.Tensor] = lambda x: json.dumps(x | deref(igT=False)) # deref jsonF[type(iter(range(10)))] = lambda x: "[" + ", ".join([str(e) for e in x]) + "]" # deref jsonF[type((x for x in range(0)))] = jsonF[list] # deref jsonF[type({}.keys())] = jsonF[list]; jsonF[type({}.values())] = jsonF[list] # deref jsonF[dict] = lambda x: "{" + ", ".join([f"{json.dumps(k)}: {deref_js(v)}" for k,v in x.items()]) + "}" # deref jsonF[defaultdict] = jsonF[dict] # deref deref.js = lambda: cli.aS(deref_js); return deref.js() # initializes at runtime, then patches deref.json() to get a faster path! # deref
def deref_js(obj): # deref_js # only 2 special cases, perf considerations, everything else is pluggable # deref_js if isinstance(obj, bool): return "true" if obj else "false" # deref_js if isinstance(obj, (numbers.Number, np.number)): return str(obj) # deref_js fn = settings.kjs.jsonF.get(type(obj), None) # deref_js if fn: return fn(obj) # deref_js if hasattr(obj, "_jsonF"): return obj._jsonF() # deref_js raise Exception(f"Don't know how to transcribe object with class {type(obj)}. Either add the serialization function to `settings.cli.kjs.jsonF`, or implement the function `._jsonF()` to your custom class") # deref_js
[docs]class bindec(BaseCli): # bindec
[docs] def __init__(self, cats:List[Any], f=None): # bindec """Binary decodes the input. Example:: # returns ['a', 'c'] 5 | bindec("abcdef") # returns 'a,c' 5 | bindec("abcdef", join(",")) :param cats: categories :param f: transformation function of the selected elements. Defaulted to :class:`toList`, but others like :class:`join` is useful too""" # bindec self.cats = cats; self.f = f or cli.toList() # bindec
[docs] def __ror__(self, it): # bindec it = bin(int(it))[2:][::-1] # bindec return (e for i, e in zip(it, self.cats) if i == '1') | self.f # bindec
settings.add("smooth", 10, "default smooth amount, used in utils.smooth") # bindec
[docs]class smooth(BaseCli): # smooth
[docs] def __init__(self, consecutives=None): # smooth """Smoothes out the input stream. Literally just a shortcut for:: batched(consecutives) | toMean().all() Example:: # returns [4.5, 14.5, 24.5] range(30) | smooth(10) | deref() Smoothing over :class:`torch.Tensor` or :class:`numpy.ndarray` will be much faster, and produce high dimensional results:: # returns torch.Tensor with shape (2, 3, 4) torch.randn(10, 3, 4) | smooth(4) The default consecutive value is in ``settings.cli.smooth``. This is useful if you are smoothing over multiple lists at the same time, like this:: # can change a single smooth value temporarily here, and all sequences will be smoothed in the same way with settings.cli.context(smooth=5): x = list(np.linspace(-2, 2, 50)) y = x | apply(op()**2) | deref() plt.plot(x | smooth() | deref(), y | smooth() | deref()) :param consecutives: if not defined, then used the value inside ``settings.cli.smooth``""" # smooth self.b = cli.batched(consecutives or settings.smooth); self.consecutives = consecutives # smooth
[docs] def __ror__(self, it): # smooth it = it | self.b # smooth if isinstance(it, settings.arrayTypes): return it.mean(1) # smooth return it | cli.toMean().all() # smooth
def _jsF(self, meta): # smooth fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # smooth return f"const {fIdx} = ({dataIdx}) => {dataIdx}.smooth({cli.kjs.v(self.consecutives)})", fIdx # smooth
def _f(): pass # _f _code = type(_f.__code__) # _f
[docs]def disassemble(f=None): # disassemble """Disassembles anything piped into it. Normal usage:: def f(a, b): return a**2 + b # both of these print out disassembled info f | disassemble() disassemble(f) # you can pass in lambdas disassemble(lambda x: x + 3) # or even raw code "lambda x: x + 3" | disassemble()""" # disassemble c = f # disassemble if c is None: return cli.aS(disassemble) # disassemble if isinstance(c, str): c = compile(c, "", "exec") # disassemble try: c = c.__code__ # disassemble except: pass # disassemble if not isinstance(c, _code): raise RuntimeError(f"`{c}` is not a code object/function/class method/string code") # disassemble print(f"co_argcount: {c.co_argcount}") # disassemble print(f"co_cellvars: {c.co_cellvars}") # disassemble print(f"co_consts: {c.co_consts}") # disassemble print(f"co_filename: {c.co_filename}") # disassemble print(f"co_firstlineno: {c.co_firstlineno}") # disassemble print(f"co_flags: {c.co_flags}") # disassemble print(f"co_freevars: {c.co_freevars}") # disassemble print(f"co_kwonlyargcount: {c.co_kwonlyargcount}") # disassemble print(f"co_lnotab: {c.co_lnotab | cli.apply(str) | join(' ')}") # disassemble print(f"co_name: {c.co_name}") # disassemble print(f"co_names: {c.co_names}") # disassemble print(f"co_nlocals: {c.co_nlocals}") # disassemble print(f"co_posonlyargcount: {c.co_posonlyargcount}") # disassemble print(f"co_stacksize: {c.co_stacksize}") # disassemble print(f"co_varnames: {c.co_varnames}") # disassemble print(f"Disassembly:"); dis.disassemble(c) # disassemble with k1lib.captureStdout() as out: # disassemble c.co_consts | cli.filt(lambda x: "code" in str(type(x))) | cli.tee(lambda _: "----------------------- inner code object -----------------------\n") | cli.apply(disassemble) | cli.ignore() # disassemble out() | cli.filt(cli.op().strip() != "") | cli.apply("|" + cli.op()) | cli.indent() | cli.stdout() # disassemble
shortName = lambda s: s.split(os.sep)[-1] # disassemble
[docs]def tree(fL=10, dL=10, depth=float("inf"), ff:Callable[[str], bool]=(lambda s: True), df:Callable[[str], bool]=(lambda s: True)): # tree """Recursively gets all files and folders. Output format might be a bit strange, so this is mainly for visualization. Example:: "." | tree() | deref() :param fL: max number of file per directory included in output :param dL: max number of child directories per directory included in output :param depth: explore depth :param ff: optional file filter function :param df: optional directory filter function""" # tree processFolders = cli.apply(lambda x: [shortName(x), x]) | cli.apply(lambda x: x | tree(fL, dL, depth-1, ff, df) if depth > 0 else [], 1) | cli.toDict() # tree a = cli.filt(os.path.isfile) | cli.filt(ff) | cli.head(fL) | cli.apply(shortName) | cli.aS(set) # tree b = ~cli.filt(os.path.isfile) | cli.filt(df) | cli.head(dL) | processFolders # tree return cli.ls() | ~cli.sortF(os.path.isfile) | (a & b) # tree
[docs]class lookup(BaseCli): # lookup
[docs] def __init__(self, d:dict, col:int=None, fill=None): # lookup """Looks up items from a dictionary/object. Example:: d = {"a": 3, "b": 5, "c": 52} # returns [3, 5, 52, 52, 3] "abcca" | lookup(d) | deref() # raises Exception, as "d" does not exist "abccad" | lookup(d) | deref() # returns [3, 5, 52, 52, 3, '(not found)'] "abccad" | lookup(d, fill="(not found)") | deref() # returns [3, 5, 52, 52, 3, 'd'] "abccad" | lookup(d, fill=input) | deref() # returns [[0, 3], [1, 5], [2, 52], [3, 52], [4, 3]] [range(5), "abcca"] | transpose() | lookup(d, 1) | deref() :param d: any object that can be sliced with the inputs :param col: if None, lookup on each row, else lookup a specific column only :param fill: if None, throws error if looked up element is not available, else returns the fill value (or the input value if ``fill == input``)""" # lookup self.d = d; self.col = col; self.fill = fill # lookup if fill is None: self.f = lambda e: d[e] # lookup elif fill == input: self.f = lambda e: d.get(e, e) # lookup else: self.f = lambda e: d.get(e, fill) # lookup
def _typehint(self, inp): # lookup t = inferType(list(self.d.values())) # lookup if isinstance(t, tListIterSet): return tIter(t.child) # lookup if isinstance(t, tCollection): return tIter(tLowest(*t.children)) # lookup return tIter(tAny()) # lookup
[docs] def __ror__(self, it): return it | cli.apply(self.f, self.col) # lookup
def _jsF(self, meta): # lookup fIdx = init._jsFAuto(); dictIdx = init._jsDAuto(); dataIdx = init._jsDAuto() # lookup if not self.col is None: raise Exception("lookup._jsF() doesn't support custom .col yet") # lookup return f"const {dictIdx} = {json.dumps(self.d)}\nconst {fIdx} = ({dataIdx}) => {dataIdx}.lookup({dictIdx}, {cli.kjs.v(self.col)}, {cli.kjs.v(self.fill)})", fIdx # lookup
[docs]class dictFields(BaseCli): # dictFields
[docs] def __init__(self, *fields, default=""): # dictFields """Grab a bunch of dictionary fields. Example:: # returns [3, 1, ''] {"a": 1, "b": 2, "c": 3} | dictFields("c", "a", "d") """ # dictFields self.fields = fields; self.default = default # dictFields
[docs] def __ror__(self, d): # dictFields return [d.get(f, self.default) for f in self.fields] # dictFields
[docs]class backup(BaseCli): # backup
[docs] def __init__(self): # backup """Backs up a file/folder. Example:: "some/folderOrFile" | backup() "some/folderOrFile" | backup.restore() Really straightforward. Uses bash internally to copy files recursively, so not available on Windows.""" # backup pass # backup
[docs] def __ror__(self, it): # backup it = os.path.expanduser(it) # backup None | cli.cmd(f"rm -rf '{it}.backup'") | cli.ignore() # backup None | cli.cmd(f"cp -r '{it}' '{it}.backup'") | cli.ignore() # backup
[docs] @staticmethod # backup def restore(): # backup def inner(it): # backup it = os.path.expanduser(it) # backup None | cli.cmd(f"rm -rf '{it}'") | cli.ignore() # backup None | cli.cmd(f"cp -r '{it}.backup' '{it}'") | cli.ignore() # backup return cli.aS(inner) # backup
sketch_interceptor = {} # backup
[docs]class sketch(BaseCli): # sketch _jsF_ctxIdx = None # sketch
[docs] def __init__(self, transforms:List[callable]=[], titles:List[str]=None, im:bool=False, ncols:int=None, n:int=None, axes:int=None): # sketch """Convenience tool to plot multiple matplotlib plots at the same time, while still keeping everything short and in 1 line. For this example, we're trying to plot x^1, x^2, ..., x^8 on 2 separate plots, one left one right. The left will have x^1 till x^4, the right will have x^5 to x^8. How you would do this normally:: x = np.linspace(-2, 2); exps = range(1, 9) fig, axes = plt.subplots(1, 2, figsize=(10, 4)) # simplest solution plt.sca(axes[0]); plt.plot(x, x**1); plt.plot(x, x**2); plt.plot(x, x**3); plt.plot(x, x**4); plt.legend([1, 2, 3, 4]); plt.xlabel("x axis") # solution using a little bit of cli plt.sca(axes[1]); range(5, 9) | apply(lambda a: [x, x**a]) | ~apply(plt.plot) | ignore(); plt.legend([5, 6, 7, 8]); plt.xlabel("x axis") But this is long, and I'm incredibly lazy to write it all out. So here's how it's going to work using this cli:: # plotting the first 4 lines only, in a single plot. Should be familiar and make sense to you before moving on exps | apply(lambda a: [x, x**a]) | batched(4) | item() | ~apply(plt.plot) | ignore() # plotting 8 lines across 2 plots. Simplest example using sketch(). It kinda captures clis after it and use it to plot each plot exps | apply(lambda a: [x, x**a]) | batched(4) | (sketch() | ~apply(plt.plot)) # same as above, but adding a grid and x axis label to all plots. Transformation functions can be anything you would # put inside a normal cli (`plt` will be passed as argument): string code, op()-capture, lambda functions, other cli tools transforms = ["x.grid(True)", op().xlabel("x axis"), lambda x: x.ylabel("y axis")] exps | apply(lambda a: [x, x**a]) | batched(4) | (sketch(transforms) | ~apply(plt.plot)) # same as above, but adding legends. [x, x**a] will eventually be directed to ~apply(plt.plot), while f"x^{a}" will be directed to aS(plt.legend) exps | apply(lambda a: [[x, x**a], f"x^{a}"]) | batched(4) | (sketch() | transpose() | ~apply(plt.plot) + iden() | deref() | rItem(1) | aS(plt.legend)) | deref() Last line will generate this plot: .. image:: ../images/probScale.png Is it worth the extra confusion? Afterall, it just saves you 2-3 lines of code. To me, it is worth it, because you can quickly change styles (add a grid, make y axis log) See also: :class:`~k1lib.cli.output.plotImgs` Check out a gallery of more examples at `kapi/9-mpl <https://mlexps.com/kapi/9-mpl/>`_. :param transforms: transform functions to be run when drawing every plot. ``plt`` (aka ``matplotlib.pyplot``) will be passed in :param titles: if specified, use these titles for each plot. Kinda hacky I have to admit :param im: if True, returns a PIL image and closes the sketch, else return nothing but still have the sketch open :param ncols: if specified, will sketch with this number of columns :param n: if specified, use this number of sketch instead of figuring out automatically :param axes: if specified, forgo calculating #axes and initialization altogether and just use the provided axes""" # sketch super().__init__(capture=True); self.titles = titles; self.im = im # sketch self.transforms = [cli.fastF(t) for t in transforms]; self.ncols = ncols; self.n = n; self.axes = axes # sketch
[docs] def __ror__(self, it): # sketch it = list(it); n = self.n or len(it); s = self.capturedSerial; transforms = self.transforms # sketch ncols = self.ncols or math.ceil(n**0.5); nrows = math.ceil(n/ncols) # sketch if self.axes: axes = self.axes # sketch else: # sketch fig, axes = plt.subplots(nrows, ncols, figsize=(ncols*5, nrows*4)) # sketch if nrows*ncols == 1: axes = [axes] # sketch if axes | cli.shape() | cli.shape(0) > 1: axes = axes.flatten() # sketch for i, [ax, e, title] in enumerate(zip(axes, it, self.titles or ("" | cli.repeat()))): # sketch plt.sca(ax); e | s | cli.deref() # sketch if title: plt.title(title) # sketch for trans in transforms: trans(plt) # sketch if self.n is None: axes[i+1:] | cli.op().remove().all() | cli.deref(); plt.tight_layout() # sketch if self.im: return plt.gcf() | cli.toImg() # sketch if self.n: return axes[i+1:] # sketch
def _jsF(self, meta): # sketch if self.n: raise Exception("sketch()._jsF() doesn't support .n parameter yet") # sketch if self.axes: raise Exception("sketch()._jsF() doesn't support .axes parameter yet") # sketch fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); ctxIdx = init._jsDAuto() # sketch # generate all child functions here # sketch sketch._jsF_ctxIdx = ctxIdx # sketch header, _fIdx, _async = k1lib.kast.asyncGuard(self.capturedSerial._jsF(meta)) # sketch # then generate all transforms here, using a tracing compiler # sketch tfStmts = "" # sketch if len(self.transforms) > 0: # sketch class Interceptor: # sketch def __getattr__(self, attr): # sketch if getattr(plt, attr) not in sketch_interceptor: raise Exception(f"Transpiling function `plt.{attr}` is not supported at the moment") # sketch return lambda *args, **kwargs: sketch_interceptor[getattr(plt, attr)](*args, **kwargs) # sketch tfStmts = tfs = self.transforms | cli.apply(cli.init.fastF) | cli.op()(Interceptor()).all() | cli.join("; ") # sketch sketch._jsF_ctxIdx = None # sketch return f"""\ let {ctxIdx} = null;\n{header} const {fIdx} = async ({dataIdx}) => {{ // dataIdx should have const ctx = []; // this is the object that will be sent to the rendering server! const titles = {json.dumps(self.titles)} ?? Array({dataIdx}.length); for (const i of [...Array({dataIdx}.length).keys()]) {{ {ctxIdx} = []; // actually executing function and plotting function downstream {'await ' if _async else ''}{_fIdx}({dataIdx}[i]); if (titles[i]) {ctxIdx}.push(["title", titles[i]]); // inject all transforms here {tfStmts}; ctx.push({ctxIdx}); {ctxIdx} = null; }} // console.log(ctx); // console.log(JSON.stringify(ctx)); const res = await (await fetch("https://local.mlexps.com/routeServer/kapi_9-mpl", {{ method: "POST", body: JSON.stringify({{ "ctx": JSON.stringify(ctx) }}), headers: {{ "Content-Type": "application/json" }} }})).json() if (res.success) {{ const base64 = res.data; console.log("mpl fetched"); return `<img src="data:image/jpg;base64, ${{base64}}" />` }} else {{ throw new Error(res.reason); }} // return ctx; }}""", fIdx # sketch return f"const {fIdx} = ({dataIdx}) => {dataIdx}.repeatFrom({cli.kjs.v(self.limit)})", fIdx # sketch
def _jsF_plt_ctxGuard(): # _jsF_plt_ctxGuard if sketch._jsF_ctxIdx is None: raise Exception("Have to wrap any plotting operations around sketch(). So, transform your code from `data | (toJsFunc() | ~aS(plt.plot))` into `[data] | (toJsFunc() | (sketch() | ~aS(plt.plot)))`") # _jsF_plt_ctxGuard return sketch._jsF_ctxIdx # _jsF_plt_ctxGuard try: import matplotlib.pyplot as plt; hasMpl = True # _jsF_plt_ctxGuard except: hasMpl = False # _jsF_plt_ctxGuard if hasMpl: # _jsF_plt_ctxGuard def _jsF_plt_plot(meta, c=None): # _jsF_plt_ctxGuard fIdx = init._jsFAuto(); xIdx = init._jsDAuto(); yIdx = init._jsDAuto(); ctxIdx = _jsF_plt_ctxGuard() # _jsF_plt_ctxGuard return f"""\ const {fIdx} = ({xIdx}, {yIdx}=null) => {{ if (!{yIdx}) {{ // handle only xIdx is available case {yIdx} = {xIdx}; {xIdx} = [...Array({yIdx}.length).keys()]; }} {ctxIdx}.push(["plot", {xIdx}, {yIdx}]); }}""", fIdx # _jsF_plt_ctxGuard settings.kjs.jsF[plt.plot] = _jsF_plt_plot # _jsF_plt_ctxGuard def _jsF_plt_title(meta): # version that passes args in js side # _jsF_plt_ctxGuard fIdx = init._jsFAuto(); titleIdx = init._jsDAuto(); ctxIdx = _jsF_plt_ctxGuard() # _jsF_plt_ctxGuard return f"""const {fIdx} = ({titleIdx}) => {{ {ctxIdx}.push(["title", {titleIdx}]); }}""", fIdx # _jsF_plt_ctxGuard settings.kjs.jsF[plt.title] = _jsF_plt_title # below is version that passes args in python side, returns statement, instead of (header, fIdx) like usual # _jsF_plt_ctxGuard sketch_interceptor[plt.title] = lambda title: f"""{_jsF_plt_ctxGuard()}.push(["title", `{title}`])""" # _jsF_plt_ctxGuard def _jsF_plt_grid(meta): # _jsF_plt_ctxGuard fIdx = init._jsFAuto(); tfIdx = init._jsDAuto(); ctxIdx = _jsF_plt_ctxGuard() # _jsF_plt_ctxGuard return f"""const {fIdx} = ({tfIdx}) => {{ {ctxIdx}.push(["grid", {tfIdx}]); }}""", fIdx # _jsF_plt_ctxGuard settings.kjs.jsF[plt.grid] = _jsF_plt_grid; sketch_interceptor[plt.grid] = lambda tf=True: f"""{_jsF_plt_ctxGuard()}.push(["grid", {cli.kjs.v(tf)}])""" # _jsF_plt_ctxGuard def _jsF_plt_legend(meta): # _jsF_plt_ctxGuard fIdx = init._jsFAuto(); legendIdx = init._jsDAuto(); ctxIdx = _jsF_plt_ctxGuard() # _jsF_plt_ctxGuard return f"""const {fIdx} = ({legendIdx}) => {{ {ctxIdx}.push(["legend", {legendIdx}]); }}""", fIdx # _jsF_plt_ctxGuard settings.kjs.jsF[plt.legend] = _jsF_plt_legend; sketch_interceptor[plt.legend] = lambda legend=None: f"""{_jsF_plt_ctxGuard()}.push(["legend", {cli.kjs.v(legend)}])""" # _jsF_plt_ctxGuard # _jsF_plt_ctxGuard sketch_interceptor[plt.xlim] = lambda left=None, right=None: f"""{_jsF_plt_ctxGuard()}.push(["xlim", {cli.kjs.v(left)}, {cli.kjs.v(right)}])""" # _jsF_plt_ctxGuard sketch_interceptor[plt.ylim] = lambda bottom=None, top=None: f"""{_jsF_plt_ctxGuard()}.push(["ylim", {cli.kjs.v(bottom)}, {cli.kjs.v(top)}])""" # _jsF_plt_ctxGuard sketch_interceptor[plt.xscale] = lambda scale: f"""{_jsF_plt_ctxGuard()}.push(["xscale", {cli.kjs.v(scale)}])""" # _jsF_plt_ctxGuard import numbers, sys; from collections import deque # _jsF_plt_ctxGuard
[docs]class syncStepper(BaseCli): # syncStepper
[docs] def __init__(self, col=0, sort=False): # syncStepper """Steps forward all streams at a time, yielding same results from min to max. That's a bit vague, so let's see an example:: a = [["a", 1], ["b", 7 ], ["c", 4], ["e", 6]] b = [["b", 5], ["c", 1 ], ["d", 3], ["f", 5]] c = [["a", 2], ["c", -4], ["d", 9], ["e", 4]] [a, b, c] | syncStepper() | deref() # sync-step by the 1st column [a, b, c] | syncStepper(1, True) | deref() # sync-step by the 2nd column. Have to sort it explicitly The first line returns this:: [[['a', 1], None, ['a', 2]], [['b', 7], ['b', 5], None], [['c', 4], ['c', 1], ['c', -4]], [None, ['d', 3], ['d', 9]], [['e', 6], None, ['e', 4]], [None, ['f', 5], None]] The second line returns this:: [[None, None, ['c', -4]], [['a', 1], ['c', 1], None], [None, None, ['a', 2]], [None, ['d', 3], None], [['c', 4], None, ['e', 4]], [None, ['b', 5], None], [['e', 6], None, None], [['b', 7], None, None], [None, None, ['d', 9]]] ``col`` can be None, but it's quite a strange use case:: [['a', 'b', 'c', 'e'], ['b', 'c', 'd', 'f'], ['a', 'c', 'd', 'e']] | syncStepper(None) | deref() It returns this:: [[['a'], None, ['a']], [['b'], ['b'], None], [['c'], ['c'], ['c']], [None, ['d'], ['d']], [['e'], None, ['e']], [None, ['f'], None]] As you can see, for each line, it kinda yields elements with the same column. If that element doesn't exist, it'll just put None there. This expects the input streams are sorted at the column of interest. If they are not, specify ``sort=True``. It has roughly the same vibe as :class:`~k1lib.cli.structural.groupBy`, in that it groups everything by a specific column. The main difference here is that you can sync-step them line-by-line, loading very little into memory, so you can run this on giant datasets and not have to worry about running out of memory. With k streams each having n elements, you should expect memory complexity to be O(k), and the time complexity to be O(n*k^2/2). That k^2 term is kinda worrying, but in most use cases, k is small and so k^2 can be treated as a constant See also: :class:`~k1lib.cli.structural.latch` :param col: column where it should compare values and merge them together. Can be None, but that would be quite a weird use case :param sort: whether to sort the streams or not. This cli requires it, but it's not turned on by default because it's an intensive operation""" # syncStepper if col is None: self.col = 0; self.colPreprocess = cli.wrapList().all() # syncStepper else: self.col = col; self.colPreprocess = cli.iden() # syncStepper self.bank = deque(); self.sentinel = object(); self._sort = sort # syncStepper
def _append(self, stIdx1, val1, elem1): # append to bank in the correct position # syncStepper i = 0; val2 = self.minObj # syncStepper for i, [stIdx2, val2, elem2] in enumerate(self.bank): # syncStepper if val1 <= val2: break # syncStepper if val1 <= val2: self.bank.insert(i, [stIdx1, val1, elem1]) # syncStepper else: self.bank.append([stIdx1, val1, elem1]) # syncStepper def _yieldNext(self): # yield the next set of values # syncStepper n = len(self.sts); res = [None]*n; last = None; hasInit = False; changed = False; bank = self.bank; sentinel = self.sentinel # syncStepper for i, [stIdx, val, elem] in enumerate(bank): # syncStepper if not hasInit and elem is sentinel: return res, changed # syncStepper if last == val or not hasInit: changed = True; res[stIdx] = elem # syncStepper elif hasInit: break # syncStepper hasInit = True; last = val # syncStepper while bank[0][1] == last: # popping the values off # syncStepper stIdx, val1, elem1 = bank.popleft(); val2, elem2 = next(self.sts[stIdx]) # syncStepper if val1 > val2: raise Exception(f"Stream {stIdx} has not been sorted yet! Please sort all streams before passing it into syncStepper") # syncStepper self._append(stIdx, val2, elem2) # syncStepper return res, changed # syncStepper
[docs] def __ror__(self, sts): # sts = "streams" # syncStepper col = self.col # syncStepper # --------------------- All of this is just to figure out the type of the column dynamically. So painful --------------------- # syncStepper samples, sts = sts | self.colPreprocess.all() | cli.apply(cli.peek()) | cli.transpose() | cli.cut(col) + cli.iden() | cli.apply(list) # syncStepper if len([e for e in sts if e != []]) == 0: return # no elements to yield at all! # syncStepper n_nums = sum([1 if isinstance(e, numbers.Number) else 0 for e in samples]) # syncStepper n_strs = sum([1 if isinstance(e, str) else 0 for e in samples]); n = len(samples) # syncStepper if n_nums*(n-n_nums) + n_strs*(n-n_strs) > 0: raise Exception("The requested column in some of the streams is not purely of numeric or string type, a requirement of syncStepper(). Please fix your data structure and try again.") # syncStepper if n_nums + n_strs == 0: raise Exception("The requested column in some of the streams is not of numeric or string type, so can't compare them to sync-step them") # syncStepper # n = 3; n_strs = 1 # syncStepper text = n_strs > 0; self.minObj = "" if text else float("-inf"); self.maxObj = chr(sys.maxunicode) if text else float("inf"); senObj = [self.maxObj, self.sentinel] # syncStepper # --------------------- And here's the meat of the cli --------------------- # syncStepper sts = sts | (cli.sort(col, not text).all() if self._sort else cli.iden()) | cli.apply(lambda st: [st | cli.apply(lambda elem: [elem[col], elem]), senObj | cli.repeat()] | cli.joinStreams()) | cli.aS(list) # syncStepper sts | cli.apply(next) | cli.insertIdColumn() | ~cli.apply(lambda idx,e: self._append(idx, *e)) | cli.ignore(); self.sts = sts # syncStepper while True: # syncStepper res, changed = self._yieldNext() # syncStepper if not changed: break # syncStepper yield res # syncStepper
[docs]class zeroes(BaseCli): # zeroes
[docs] def __init__(self, col:int=None, log=False, offset:float=0): # zeroes """Shift the specified column so that the first element is zero Example:: range(13, 20) | zeroes() | deref() # returns [0, 1, 2, 3, 4, 5, 6] range(13, 20) | zeroes(offset=5) | deref() # returns [5, 6, 7, 8, 9, 10, 11] [2, 3, 1, 4, 7] | zeroes() | deref() # returns [0, 1, -1, 2, 5] Assumes the first element is going to be transformed to zero, thus the last example. This cli also has log mode, where the natural log of the values will be shifted to zero:: # returns [1.0, 1.5, 0.5, 2.0, 3.5] [2, 3, 1, 4, 7] | zeroes(log=True) | aS(round, 2).all() | deref() # returns [2.72, 4.08, 1.36, 5.44, 9.51] [2, 3, 1, 4, 7] | zeroes(offset=1, log=True) | aS(round, 2).all() | deref() This is essentially the same as dividing everything by 2, so that the first element turns into 1. Super neat. The 2nd example is equivalent to multiplying everything by e/2. This cli can function in a table (.col != None):: # returns [[0, 'a'], [1, 'b'], [2, 'c'], [3, 'd'], [4, 'e'], [5, 'f'], [6, 'g']] [[13, 'a'], [14, 'b'], [15, 'c'], [16, 'd'], [17, 'e'], [18, 'f'], [19, 'g']] | zeroes(0) | deref() This cli can also act across multiple list of numbers:: data = [[2, 3, 1, 4, 7], [1, 4, 3, 6, 9]] data2 = [[[2, 'b'], [3, 'c'], [1, 'a'], [4, 'd'], [7, 'g']], [[1, 'a'], [4, 'd'], [3, 'c'], [6, 'f'], [9, 'i']]] # returns [[0, 1, -1, 2, 5], [5, 8, 7, 10, 13]] data | ~zeroes() | deref() # returns [[1, 2, 0, 3, 6], [6, 9, 8, 11, 14]] data | ~zeroes(offset=1) | deref() # returns [[1.0, 1.5, 0.5, 2.0, 3.5], [3.5, 14.0, 10.5, 21.0, 31.5]] data | ~zeroes(log=True) | aS(round, 2).all(2) | deref() # returns [[[0, 'b'], [1, 'c'], [-1, 'a'], [2, 'd'], [5, 'g']], [[5, 'a'], [8, 'd'], [7, 'c'], [10, 'f'], [13, 'i']]] data2 | ~zeroes(0) | deref() So as you can see, the offsets are adjusted so that the first element of each list starts from the last element of the previous list :param col: column to shift values :param offset: custom offset of the minimum value, defaulted to zero :param log: whether to zero it linearly or zero it logarithmically""" # zeroes self.col = col; self.log = log; self.offset = offset; self.inverted = False # zeroes
[docs] def __invert__(self): res = zeroes(self.col, self.log, self.offset); res.inverted = True; return res # zeroes
[docs] def __ror__(self, it): # zeroes col = self.col; log = self.log; offset = self.offset # zeroes if self.inverted: # zeroes def gen(): # zeroes currentOffset = offset # zeroes for arr in it: # zeroes arr = arr | zeroes(col, log, currentOffset) # zeroes if isinstance(arr, settings.arrayTypes): # zeroes bm = np if isinstance(arr, np.ndarray) else (torch if hasTorch and isinstance(arr, torch.Tensor) else None) # zeroes if bm: # zeroes if col is None: currentOffset = bm.log(arr[-1]) if log else arr[-1] # zeroes else: currentOffset = bm.log(arr[-1][col]) if log else arr[-1][col] # zeroes yield arr; continue # zeroes # yes, we have to deref() them, even though perf will suffer, because let's say # zeroes # that the user then does rItem(3), and discards elements 0, 1 and 2. Then 0, 1, 2 # zeroes # won't be run, so element 3 won't know its offset! # zeroes if col is None: arr = list(arr); currentOffset = math.log(arr[-1]) if log else arr[-1] # zeroes else: arr = [list(row) for row in arr]; currentOffset = math.log(arr[-1][col]) if log else arr[-1][col] # zeroes yield arr # zeroes return gen() # zeroes if isinstance(it, settings.arrayTypes): # zeroes bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None) # zeroes if bm: # zeroes cloneF = np.copy if isinstance(it, np.ndarray) else torch.clone # zeroes if log: # zeroes if col is None: minValue = bm.log(it[0]) - offset; return bm.exp(bm.log(it) - minValue) # zeroes else: minValue = bm.log(it[0, col]) - offset; it = cloneF(it); it[:,col] = bm.exp(bm.log(it[:,col]) - minValue); return it # zeroes else: # zeroes if col is None: minValue = it[0] - offset; return it - minValue # zeroes else: minValue = it[0, col] - offset; it = cloneF(it); it[:,col] = it[:,col] - minValue; return it # zeroes row, it = it | cli.peek() # zeroes if it == []: return [] # zeroes if log: # zeroes mlog = math.log; mexp = math.exp # zeroes if col is None: minValue = mlog(row) - offset; return (mexp(mlog(row) - minValue) for row in it) # zeroes else: minValue = mlog(row[col]) - offset; return ([*row[:col], mexp(mlog(row[col]) - minValue), *row[col+1:]] for row in it) # zeroes else: # zeroes if col is None: minValue = row - offset; return (row - minValue for row in it) # zeroes else: minValue = row[col] - offset; return ([*row[:col], row[col] - minValue, *row[col+1:]] for row in it) # zeroes
[docs]class normalize(BaseCli): # normalize
[docs] def __init__(self, col:int=None, mode:int=0): # normalize """Normalize the data going in. Example:: arr = np.random.randn(100)+10 arr | normalize() # returns array with mean around 0 arr | normalize(mode=1) # returns array with mean around 0.5, min 0, max 1 arr = np.random.randn(100, 20)+10 arr | normalize(2) # returns array with 2nd (0-indexing!) column have mean around 0. Other columns not touched arr | normalize(2, mode=1) # returns array with 2nd (0-indexing!) column have mean around 0.5 :param col: column to apply the normalization to :param mode: 0 for ``(x - x.mean())/s.std()``, 1 for ``(x - x.min())/(x.max() - x.min())``""" # normalize self.col = col; self.mode = mode # normalize
[docs] def __ror__(self, x): # normalize col = self.col; mode = self.mode # normalize if isinstance(x, k1lib.settings.cli.arrayTypes): # normalize dims = len(x.shape) # normalize if col is None: return (x - x.mean())/x.std() if mode == 0 else (x - x.min())/(x.max() - x.min()) # normalize else: # normalize if mode == 0: xc = x[:,col]; x[:,col] = (xc - xc.mean())/xc.std(); return x # normalize else: xc = x[:,col]; x[:,col] = (xc - xc.min())/(xc.max() - xc.min()); return x # normalize if col is None: return np.array(list(x)) | self # normalize else: # normalize it = x; ans = []; it = it | cli.deref(2) # normalize if mode == 0: # normalize mean = [row[col] for row in it] | cli.toMean() # normalize std = [row[col] for row in it] | cli.toStd() # normalize for row in it: row[col] = (row[col]-mean)/std; ans.append(row) # normalize else: # normalize _min = min([row[col] for row in it]) # normalize _max = max([row[col] for row in it]) # normalize for row in it: row[col] = (row[col]-_min)/(_max-_min); ans.append(row) # normalize return ans # normalize