Source code for k1lib.cli.utils

# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for all short and random quality-of-life utilities."""
from k1lib.cli.init import patchDefaultDelim, BaseCli, Table, T, yieldT
import k1lib.cli as cli, numbers, torch, numpy as np, dis
from k1lib.cli.typehint import *
from typing import overload, Iterator, Any, List, Set, Union, Callable
import k1lib, time, math, os
from collections import defaultdict
__all__ = ["size", "shape", "item", "iden", "join", "wrapList",
           "equals", "reverse", "ignore", "rateLimit", "timeLimit", "tab", "indent",
           "clipboard", "deref", "bindec", "smooth", "disassemble",
           "tree", "lookup", "dictFields"]
settings = k1lib.settings.cli
def exploreSize(it):
    """Returns first element and length of array. Returns [first item, length]"""
    if isinstance(it, str): return None, len(it)
    try: return it[0], len(it)
    except: pass
    sentinel = object(); it = iter(it)
    o = next(it, sentinel); count = 1
    if o is sentinel: return None, 0
    try:
        while True: next(it); count += 1
    except StopIteration: pass
    return o, count
[docs]class size(BaseCli):
[docs] def __init__(self, idx=None): """Returns number of rows and columns in the input. Example:: # returns (3, 2) [[2, 3], [4, 5, 6], [3]] | size() # returns 3 [[2, 3], [4, 5, 6], [3]] | size(0) # returns 2 [[2, 3], [4, 5, 6], [3]] | size(1) # returns (2, 0) [[], [2, 3]] | size() # returns (3,) [2, 3, 5] | size() # returns 3 [2, 3, 5] | size(0) # returns (3, 2, 2) [[[2, 1], [0, 6, 7]], 3, 5] | size() # returns (1, 3) ["abc"] | size() # returns (1, 2, 3) [torch.randn(2, 3)] | size() # returns (2, 3, 5) size()(np.random.randn(2, 3, 5)) There's also :class:`lengths`, which is sort of a simplified/faster version of this, but only use it if you are sure that ``len(it)`` can be called. If encounter PyTorch tensors or Numpy arrays, then this will just get the shape instead of actually looping over them. :param idx: if idx is None return (rows, columns). If 0 or 1, then rows or columns""" super().__init__(); self.idx = idx; if idx is not None: self._f = cli.item(idx)
def _typehint(self, inp): if self.idx is not None: return int return tList(int)
[docs] def __ror__(self, it:Iterator[str]): if self.idx == 0: try: return len(it) except: return exploreSize(it)[1] if self.idx is None: answer = [] try: while True: if isinstance(it, settings.arrayTypes): return tuple(answer + list(it.shape)) it, s = exploreSize(it); answer.append(s) except TypeError: pass return tuple(answer) return exploreSize(it | self._f)[1]
shape = size noFill = object()
[docs]class item(BaseCli):
[docs] def __init__(self, amt:int=1, fill=noFill): """Returns the first row. Example:: # returns 0 iter(range(5)) | item() # returns torch.Size([5]) torch.randn(3,4,5) | item(2) | shape() # returns 3 [] | item(fill=3) :param amt: how many times do you want to call item() back to back? :param fill: if iterator length is 0, return this""" self.amt = amt; self.fill = fill self.fillP = [fill] if fill != noFill else [] # preprocessed, to be faster if self.amt != 1: self._f = cli.serial(*(item(fill=self.fill) for _ in range(self.amt)))
def _typehint(self, inp): if isinstance(inp, tListIterSet): return inp.child if isinstance(inp, tCollection): return inp.children[0] if isinstance(inp, tArrayTypes): if inp.rank is None: return inp.__class__(inp.child, None) if inp.rank - self.amt >= 1: return inp.__class__(inp.child, inp.rank-self.amt) return inp.child return tAny()
[docs] def __ror__(self, it:Iterator[str]): if self.amt != 1: return it | self._f return next(iter(it), *self.fillP)
[docs]class iden(BaseCli):
[docs] def __init__(self): """Yields whatever the input is. Useful for multiple streams. Example:: # returns range(5) range(5) | iden()""" super().__init__()
def _typehint(self, inp): return inp
[docs] def __ror__(self, it:Iterator[Any]): return it
[docs]class join(BaseCli):
[docs] def __init__(self, delim:str=None): r"""Merges all strings into 1, with `delim` in the middle. Basically :meth:`str.join`. Example:: # returns '2\na' [2, "a"] | join("\n")""" super().__init__(); self.delim = patchDefaultDelim(delim)
def _typehint(self, inp): return str
[docs] def __ror__(self, it:Iterator[str]): return self.delim.join(it | cli.toStr())
[docs]class wrapList(BaseCli):
[docs] def __init__(self): """Wraps inputs inside a list. There's a more advanced cli tool built from this, which is :meth:`~k1lib.cli.structural.unsqueeze`.""" super().__init__()
def _typehint(self, inp): return tList(inp)
[docs] def __ror__(self, it:T) -> List[T]: return [it]
class _EarlyExp(Exception): pass
[docs]class equals:
[docs] def __init__(self): """Checks if all incoming columns/streams are identical""" super().__init__()
[docs] def __ror__(self, streams:Iterator[Iterator[str]]): streams = list(streams) for row in zip(*streams): sampleElem = row[0] try: for elem in row: if sampleElem != elem: yield False; raise _EarlyExp() yield True except _EarlyExp: pass
[docs]class reverse(BaseCli):
[docs] def __init__(self): """Reverses incoming list. Example:: # returns [3, 5, 2] [2, 5, 3] | reverse() | deref()""" super().__init__()
def _typehint(self, inp): if isinstance(inp, tListIterSet): return tIter(inp.child) return tAny()
[docs] def __ror__(self, it:Iterator[str]) -> List[str]: return reversed(list(it))
[docs]class ignore(BaseCli):
[docs] def __init__(self): r"""Just loops through everything, ignoring the output. Example:: # will just return an iterator, and not print anything [2, 3] | apply(lambda x: print(x)) # will prints "2\n3" [2, 3] | apply(lambda x: print(x)) | ignore()""" super().__init__()
def _typehint(self, inp): return type(None)
[docs] def __ror__(self, it:Iterator[Any]): for _ in it: pass
[docs]class rateLimit(BaseCli):
[docs] def __init__(self, f, delay=0.1): """Limits the execution flow rate upon a condition. Example:: s = 0; semaphore = 0 def heavyAsyncOperation(i): global semaphore, s semaphore += 1 s += i; time.sleep(1) semaphore -= 1; return i**2 # returns (20,), takes 1s to run range(20) | applyTh(heavyAsyncOperation, 100) | shape() # returns (20,), takes 4s to run (20/5 = 4) range(20) | rateLimit(lambda: semaphore < 5) | applyTh(heavyAsyncOperation, 100) | shape() The first test case is not rate-limited, so it will run all 20 threads at the same time, and all of them will finish after 1 second. The second test case is rate-limited, so that there can only be 5 concurrently executing threads because of the semaphore count check. Therefore this takes around 4 seconds to run. :param f: checking function. Should return true if execution is allowed :param delay: delay in seconds between calling ``f()``""" self.f = f; self.delay = delay
def _typehint(self, inp): if isinstance(inp, tListIterSet): return tIter(inp.child) if isinstance(inp, tArrayTypes): if inp.rank is None: return tIter(inp) if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1)) return tIter(inp.child) if isinstance(inp, tCollection): return inp return tAny()
[docs] def __ror__(self, it): f = self.f; delay = self.delay for e in it: while not f(): time.sleep(delay) yield e
[docs] @staticmethod def cpu(maxUtilization=90): """Limits flow rate when cpu utilization is more than a specified percentage amount. Needs to install the package ``psutil`` to actually work. Example:: # returns [0, 1, 4, 9, 16] range(5) | rateLimit.cpu() | apply(op()**2) | deref()""" import psutil return rateLimit(lambda: psutil.cpu_percent() < maxUtilization)
[docs]class timeLimit(BaseCli):
[docs] def __init__(self, t): """Caps the flow after a specified amount of time has passed. Example:: # returns 20, or roughly close to that repeatF(lambda: time.sleep(0.1)) | timeLimit(2) | shape(0)""" self.t = t
def _typehint(self, inp): if isinstance(inp, tListIterSet): return tIter(inp.child) if isinstance(inp, tArrayTypes): if inp.rank is None: return tIter(inp) if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1)) return tIter(inp.child) if isinstance(inp, tCollection): return inp return tAny()
[docs] def __ror__(self, it): _time = time.time; endTime = _time() + self.t for e in it: yield e if _time() > endTime: break
[docs]def tab(pad:str=" "*4): """Indents incoming string iterator. Example:: # prints out indented 0 to 9 range(10) | tab() | headOut()""" return cli.apply(lambda x: f"{pad}{x}")
indent = tab
[docs]class clipboard(BaseCli):
[docs] def __init__(self): """Saves the input to clipboard. Example:: # copies "abc" into the clipboard. Just use Ctrl+V to paste as usual "abc" | clipboard()""" import pyperclip; self.pyperclip = pyperclip
def _typehint(self, inp): return type(None)
[docs] def __ror__(self, s): self.pyperclip.copy(s)
settings.atomic.add("deref", (numbers.Number, np.number, str, bool, bytes, torch.nn.Module, k1lib.UValue), "used by deref") Tensor = torch.Tensor; atomic = settings.atomic class inv_dereference(BaseCli): def __init__(self, igT=False): """Kinda the inverse to :class:`dereference`""" super().__init__(); self.igT = igT def __ror__(self, it:Iterator[Any]) -> List[Any]: for e in it: if e is None or isinstance(e, atomic.deref): yield e elif isinstance(e, settings.arrayTypes): if not self.igT and len(e.shape) == 0: yield e.item() else: yield e else: try: yield e | self except: yield e
[docs]class deref(BaseCli):
[docs] def __init__(self, maxDepth=float("inf"), igT=True): """Recursively converts any iterator into a list. Example:: # returns something like "<range_iterator at 0x7fa8c52ca870>" iter(range(5)) # returns [0, 1, 2, 3, 4] iter(range(5)) | deref() # returns [2, 3], yieldT stops things early [2, 3, yieldT, 6] | deref() You can also specify a ``maxDepth``:: # returns something like "<list_iterator at 0x7f810cf0fdc0>" iter([range(3)]) | deref(0) # returns [range(3)] iter([range(3)]) | deref(1) # returns [[0, 1, 2]] iter([range(3)]) | deref(2) There are a few classes/types that are considered atomic, and :class:`deref` will never try to iterate over it. If you wish to change it, do something like:: settings.cli.atomic.deref = (int, float, ...) :param maxDepth: maximum depth to dereference. Starts at 0 for not doing anything at all :param igT: short for "ignore tensor". If True, then don't loop over :class:`torch.Tensor` and :class:`numpy.ndarray` internals""" super().__init__(); self.igT = igT self.maxDepth = maxDepth; self.depth = 0 self.arrayType = (torch.Tensor, np.ndarray) if k1lib.settings.startup.or_patch else torch.Tensor
def _typehint(self, inp, depth=float("inf")): if depth == 0: return inp if depth == float("inf"): depth = self.maxDepth if isinstance(inp, type) and issubclass(inp, atomic.deref): return inp if isinstance(inp, tArrayTypes): if self.igT: return inp if inp.rank is None: return tList(tAny()) if inp.rank == 1: if isinstance(inp, tTensor): return tList(type(torch.tensor(3, dtype=inp.child).item())) if isinstance(inp, tNpArray): return tList(type(np.array(3, dtype=inp.child).item())) return tList(self._typehint(inp.item(), depth-1)) if isinstance(inp, tListIterSet): return tList(self._typehint(inp.child, depth-1)) if isinstance(inp, tCollection): return tCollection(*(self._typehint(e, depth-1) for e in inp.children)) return tAny()
[docs] def __ror__(self, it:Iterator[T]) -> List[T]: if self.depth >= self.maxDepth: return it elif isinstance(it, np.number): return it.item() elif isinstance(it, atomic.deref): return it elif isinstance(it, self.arrayType): if self.igT: return it if len(it.shape) == 0: return it.item() elif isinstance(it, dict): self.depth += 1; _d = {k: self.__ror__(v) for k, v in it.items()}; self.depth -= 1; return _d try: iter(it) except: return it self.depth += 1; answer = [] for e in it: if e is cli.yieldT: return answer answer.append(self.__ror__(e)) self.depth -= 1; return answer
[docs] def __invert__(self) -> BaseCli: """Returns a :class:`~k1lib.cli.init.BaseCli` that makes everything an iterator. Not entirely sure when this comes in handy, but it's there.""" return inv_dereference(self.igT)
[docs]class bindec(BaseCli):
[docs] def __init__(self, cats:List[Any], f=None): """Binary decodes the input. Example:: # returns ['a', 'c'] 5 | bindec("abcdef") # returns 'a,c' 5 | bindec("abcdef", join(",")) :param cats: categories :param f: transformation function of the selected elements. Defaulted to :class:`toList`, but others like :class:`join` is useful too""" self.cats = cats; self.f = f or cli.toList()
[docs] def __ror__(self, it): it = bin(int(it))[2:][::-1] return (e for i, e in zip(it, self.cats) if i == '1') | self.f
settings.add("smooth", 10, "default smooth amount, used in utils.smooth")
[docs]class smooth(BaseCli):
[docs] def __init__(self, consecutives=None): """Smoothes out the input stream. Literally just a shortcut for:: batched(consecutives) | toMean().all() Example:: # returns [4.5, 14.5, 24.5] range(30) | smooth(10) | deref() Smoothing over :class:`torch.Tensor` or :class:`numpy.ndarray` will be much faster, and produce high dimensional results:: # returns torch.Tensor with shape (2, 3, 4) torch.randn(10, 3, 4) | smooth(4) The default consecutive value is in ``settings.cli.smooth``. This is useful if you are smoothing over multiple lists at the same time, like this:: # can change a single smooth value temporarily here, and all sequences will be smoothed in the same way with settings.cli.context(smooth=5): x = list(np.linspace(-2, 2, 50)) y = x | apply(op()**2) | deref() plt.plot(x | smooth() | deref(), y | smooth() | deref()) :param consecutives: if not defined, then used the value inside ``settings.cli.smooth``""" self.b = cli.batched(consecutives or settings.smooth)
[docs] def __ror__(self, it): it = it | self.b if isinstance(it, settings.arrayTypes): return it.mean(1) return it | cli.toMean().all()
def _f(): pass _code = type(_f.__code__)
[docs]def disassemble(f=None): """Disassembles anything piped into it. Normal usage:: def f(a, b): return a**2 + b # both of these print out disassembled info f | disassemble() disassemble(f) # you can pass in lambdas disassemble(lambda x: x + 3) # or even raw code "lambda x: x + 3" | disassemble()""" c = f if c is None: return cli.aS(disassemble) if isinstance(c, str): c = compile(c, "", "exec") try: c = c.__code__ except: pass if not isinstance(c, _code): raise RuntimeError(f"`{c}` is not a code object/function/class method/string code") print(f"co_argcount: {c.co_argcount}") print(f"co_cellvars: {c.co_cellvars}") print(f"co_consts: {c.co_consts}") print(f"co_filename: {c.co_filename}") print(f"co_firstlineno: {c.co_firstlineno}") print(f"co_flags: {c.co_flags}") print(f"co_freevars: {c.co_freevars}") print(f"co_kwonlyargcount: {c.co_kwonlyargcount}") print(f"co_lnotab: {c.co_lnotab | cli.toStr() | join(' ')}") print(f"co_name: {c.co_name}") print(f"co_names: {c.co_names}") print(f"co_nlocals: {c.co_nlocals}") print(f"co_posonlyargcount: {c.co_posonlyargcount}") print(f"co_stacksize: {c.co_stacksize}") print(f"co_varnames: {c.co_varnames}") print(f"Disassembly:"); dis.disassemble(c) with k1lib.captureStdout() as out: c.co_consts | cli.filt(lambda x: "code" in str(type(x))) | cli.tee(lambda _: "----------------------- inner code object -----------------------\n") | cli.apply(disassemble) | cli.ignore() out() | cli.filt(cli.op().strip() != "") | cli.apply("|" + cli.op()) | cli.indent() | cli.stdout()
shortName = lambda s: s.split(os.sep)[-1]
[docs]def tree(fL=10, dL=10, depth=float("inf"), ff:Callable[[str], bool]=(lambda s: True), df:Callable[[str], bool]=(lambda s: True)): """Recursively gets all files and folders. Output format might be a bit strange, so this is mainly for visualization. Example:: "." | tree() | deref() :param fL: max number of file per directory included in output :param dL: max number of child directories per directory included in output :param depth: explore depth :param ff: optional file filter function :param df: optional directory filter function""" processFolders = cli.apply(lambda x: [shortName(x), x]) | cli.apply(lambda x: x | tree(fL, dL, depth-1, ff, df) if depth > 0 else [], 1) | cli.transpose() | cli.toDict() a = cli.filt(os.path.isfile) | cli.filt(ff) | cli.head(fL) | cli.apply(shortName) | cli.toSet() b = ~cli.filt(os.path.isfile) | cli.filt(df) | cli.head(dL) | processFolders return cli.ls() | ~cli.sortF(os.path.isfile) | (a & b)
[docs]class lookup(BaseCli):
[docs] def __init__(self, d:dict, col:int=None, fill=None): """Looks up items from a dictionary/object. Example:: d = {"a": 3, "b": 5, "c": 52} # returns [3, 5, 52, 52, 3] "abcca" | lookup(d) | deref() # returns [[0, 3], [1, 5], [2, 52], [3, 52], [4, 3]] [range(5), "abcca"] | transpose() | lookup(d, 1) | deref() :param d: any object that can be sliced with the inputs :param col: if None, lookup on each row, else lookup a specific column only :param fill: if None, throws error if looked up element is not available, else returns the fill value""" self.d = d; self.col = col if fill is not None: self.d = defaultdict(lambda: fill, self.d)
def _typehint(self, inp): t = inferType(list(self.d.values())) if isinstance(t, tListIterSet): return tIter(t.child) if isinstance(t, tCollection): return tIter(tLowest(*t.children)) return tIter(tAny())
[docs] def __ror__(self, it): d = self.d return it | cli.apply(lambda e: d[e], self.col)
[docs]class dictFields(BaseCli):
[docs] def __init__(self, *fields, default=""): """Grab a bunch of dictionary fields. Example:: # returns [3, 1, ''] {"a": 1, "b": 2, "c": 3} | dictFields("c", "a", "d") """ self.fields = fields; self.default = default
[docs] def __ror__(self, d): return [d.get(f, self.default) for f in self.fields]