Source code for k1lib.cli.utils

# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for all short and random quality-of-life utilities."""
from k1lib.cli.init import patchDefaultDelim, BaseCli, Table, T
import k1lib.cli as cli, numbers, torch, numpy as np, dis
from typing import overload, Iterator, Any, List, Set, Union, Callable
import k1lib, time, math, os
__all__ = ["size", "shape", "item", "iden", "join", "wrapList",
           "equals", "reverse", "ignore", "rateLimit", "timeLimit", "tab", "indent",
           "clipboard", "headerIdx", "deref", "bindec", "smooth", "disassemble",
           "tree", "lookup", "dictFields"]
settings = k1lib.settings.cli
def exploreSize(it):
    """Returns first element and length of array. Returns [first item, length]"""
    if isinstance(it, str): return None, len(it)
    try: return it[0], len(it)
    except: pass
    sentinel = object(); it = iter(it)
    o = next(it, sentinel); count = 1
    if o is sentinel: return None, 0
    try:
        while True: next(it); count += 1
    except StopIteration: pass
    return o, count
[docs]class size(BaseCli):
[docs] def __init__(self, idx=None): """Returns number of rows and columns in the input. Example:: # returns (3, 2) [[2, 3], [4, 5, 6], [3]] | size() # returns 3 [[2, 3], [4, 5, 6], [3]] | size(0) # returns 2 [[2, 3], [4, 5, 6], [3]] | size(1) # returns (2, 0) [[], [2, 3]] | size() # returns (3,) [2, 3, 5] | size() # returns 3 [2, 3, 5] | size(0) # returns (3, 2, 2) [[[2, 1], [0, 6, 7]], 3, 5] | size() # returns (1, 3) ["abc"] | size() # returns (1, 2, 3) [torch.randn(2, 3)] | size() # returns (2, 3, 5) size()(np.random.randn(2, 3, 5)) There's also :class:`lengths`, which is sort of a simplified/faster version of this, but only use it if you are sure that ``len(it)`` can be called. If encounter PyTorch tensors or Numpy arrays, then this will just get the shape instead of actually looping over them. :param idx: if idx is None return (rows, columns). If 0 or 1, then rows or columns""" super().__init__(); self.idx = idx; if idx is not None: self._f = cli.item(idx)
[docs] def __ror__(self, it:Iterator[str]): if self.idx == 0: try: return len(it) except: return exploreSize(it)[1] if self.idx is None: answer = [] try: while True: if isinstance(it, settings.arrayTypes): return tuple(answer + list(it.shape)) it, s = exploreSize(it); answer.append(s) except TypeError: pass return tuple(answer) return exploreSize(it | self._f)[1]
shape = size noFill = object()
[docs]class item(BaseCli):
[docs] def __init__(self, amt:int=1, fill=noFill): """Returns the first row. Example:: # returns 0 iter(range(5)) | item() # returns torch.Size([5]) torch.randn(3,4,5) | item(2) | shape() # returns 3 [] | item(fill=3) :param amt: how many times do you want to call item() back to back? :param fill: if iterator length is 0, return this""" self.amt = amt; self.fill = fill self.fillP = [fill] if fill != noFill else [] # preprocessed, to be faster if self.amt != 1: self._f = cli.serial(*(item(fill=self.fill) for _ in range(self.amt)))
[docs] def __ror__(self, it:Iterator[str]): if self.amt != 1: return it | self._f return next(iter(it), *self.fillP)
[docs]class iden(BaseCli):
[docs] def __init__(self): """Yields whatever the input is. Useful for multiple streams. Example:: # returns range(5) range(5) | iden()""" super().__init__()
[docs] def __ror__(self, it:Iterator[Any]): return it
[docs]class join(BaseCli):
[docs] def __init__(self, delim:str=None): r"""Merges all strings into 1, with `delim` in the middle. Basically :meth:`str.join`. Example:: # returns '2\na' [2, "a"] | join("\n")""" super().__init__(); self.delim = patchDefaultDelim(delim)
[docs] def __ror__(self, it:Iterator[str]): return self.delim.join(it | cli.toStr())
[docs]class wrapList(BaseCli):
[docs] def __init__(self): """Wraps inputs inside a list. There's a more advanced cli tool built from this, which is :meth:`~k1lib.cli.structural.unsqueeze`.""" super().__init__()
[docs] def __ror__(self, it:T) -> List[T]: return [it]
class _EarlyExp(Exception): pass
[docs]class equals:
[docs] def __init__(self): """Checks if all incoming columns/streams are identical""" super().__init__()
[docs] def __ror__(self, streams:Iterator[Iterator[str]]): streams = list(streams) for row in zip(*streams): sampleElem = row[0] try: for elem in row: if sampleElem != elem: yield False; raise _EarlyExp() yield True except _EarlyExp: pass
[docs]class reverse(BaseCli):
[docs] def __init__(self): """Reverses incoming list. Example:: # returns [3, 5, 2] [2, 5, 3] | reverse() | deref()""" super().__init__()
[docs] def __ror__(self, it:Iterator[str]) -> List[str]: return reversed(list(it))
[docs]class ignore(BaseCli):
[docs] def __init__(self): r"""Just loops through everything, ignoring the output. Example:: # will just return an iterator, and not print anything [2, 3] | apply(lambda x: print(x)) # will prints "2\n3" [2, 3] | apply(lambda x: print(x)) | ignore()""" super().__init__()
[docs] def __ror__(self, it:Iterator[Any]): for _ in it: pass
[docs]class rateLimit(BaseCli):
[docs] def __init__(self, f, delay=0.1): """Limits the execution flow rate upon a condition. Example:: s = 0; semaphore = 0 def heavyAsyncOperation(i): global semaphore, s semaphore += 1 s += i; time.sleep(1) semaphore -= 1; return i**2 # returns (20,), takes 1s to run range(20) | applyTh(heavyAsyncOperation, 100) | shape() # returns (20,), takes 4s to run (20/5 = 4) range(20) | rateLimit(lambda: semaphore < 5) | applyTh(heavyAsyncOperation, 100) | shape() The first test case is not rate-limited, so it will run all 20 threads at the same time, and all of them will finish after 1 second. The second test case is rate-limited, so that there can only be 5 concurrently executing threads because of the semaphore count check. Therefore this takes around 4 seconds to run. :param f: checking function. Should return true if execution is allowed :param delay: delay in seconds between calling ``f()``""" self.f = f; self.delay = delay
[docs] def __ror__(self, it): f = self.f; delay = self.delay for e in it: while not f(): time.sleep(delay) yield e
[docs] @staticmethod def cpu(maxUtilization=90): """Limits flow rate when cpu utilization is more than a specified percentage amount. Needs to install the package ``psutil`` to actually work. Example:: # returns [0, 1, 4, 9, 16] range(5) | rateLimit.cpu() | apply(op()**2) | deref()""" import psutil return rateLimit(lambda: psutil.cpu_percent() < maxUtilization)
[docs]class timeLimit(BaseCli):
[docs] def __init__(self, t): """Caps the flow after a specified amount of time has passed. Example:: # returns 20, or roughly close to that repeatF(lambda: time.sleep(0.1)) | timeLimit(2) | shape(0)""" self.t = t
[docs] def __ror__(self, it): _time = time.time; endTime = _time() + self.t for e in it: yield e if _time() > endTime: break
[docs]def tab(pad:str=" "*4): """Indents incoming string iterator. Example:: # prints out indented 0 to 9 range(10) | tab() | headOut()""" return cli.apply(lambda x: f"{pad}{x}")
indent = tab
[docs]class clipboard(BaseCli):
[docs] def __init__(self): """Saves the input to clipboard. Example:: # copies "abc" into the clipboard. Just use Ctrl+V to paste as usual "abc" | clipboard()""" import pyperclip; self.pyperclip = pyperclip
[docs] def __ror__(self, s): self.pyperclip.copy(s)
[docs]def headerIdx(): """Cuts out first line, put an index column next to it, and prints it out. Useful when you want to know what your column's index is to cut it out. Also sets the context variable "header", in case you need it later. Example:: # returns [[0, 'a'], [1, 'b'], [2, 'c']] ["abc"] | headerIdx() | deref()""" return item() | cli.wrapList() | cli.transpose() | cli.insertIdColumn(True)
settings.atomic.add("deref", (numbers.Number, np.number, str, bool, bytes, torch.nn.Module, k1lib.UValue), "used by deref") Tensor = torch.Tensor; atomic = settings.atomic class inv_dereference(BaseCli): def __init__(self, igT=False): """Kinda the inverse to :class:`dereference`""" super().__init__(); self.igT = igT def __ror__(self, it:Iterator[Any]) -> List[Any]: for e in it: if e is None or isinstance(e, atomic.deref): yield e elif isinstance(e, settings.arrayTypes): if not self.igT and len(e.shape) == 0: yield e.item() else: yield e else: try: yield e | self except: yield e
[docs]class deref(BaseCli):
[docs] def __init__(self, maxDepth=float("inf"), igT=True): """Recursively converts any iterator into a list. Only :class:`str`, :class:`numbers.Number` and :class:`~torch.nn.Module` are not converted. Example:: # returns something like "<range_iterator at 0x7fa8c52ca870>" iter(range(5)) # returns [0, 1, 2, 3, 4] iter(range(5)) | deref() # returns [2, 3], yieldSentinel stops things early [2, 3, yieldSentinel, 6] | deref() You can also specify a ``maxDepth``:: # returns something like "<list_iterator at 0x7f810cf0fdc0>" iter([range(3)]) | deref(0) # returns [range(3)] iter([range(3)]) | deref(1) # returns [[0, 1, 2]] iter([range(3)]) | deref(2) There are a few classes/types that are considered atomic, and :class:`deref` will never try to iterate over it. If you wish to change it, do something like:: settings.cli.atomic.deref = (int, float, ...) :param maxDepth: maximum depth to dereference. Starts at 0 for not doing anything at all :param igT: short for "ignore tensor". If True, then don't loop over :class:`torch.Tensor` and :class:`numpy.ndarray` internals""" super().__init__(); self.igT = igT self.maxDepth = maxDepth; self.depth = 0 self.arrayType = (torch.Tensor, np.ndarray) if k1lib.settings.startup.or_patch else torch.Tensor
[docs] def __ror__(self, it:Iterator[T]) -> List[T]: if self.depth >= self.maxDepth: return it elif isinstance(it, atomic.deref): return it elif isinstance(it, self.arrayType): if self.igT: return it if len(it.shape) == 0: return it.item() elif isinstance(it, dict): self.depth += 1; _d = {k: self.__ror__(v) for k, v in it.items()}; self.depth -= 1; return _d try: iter(it) except: return it self.depth += 1; answer = [] for e in it: if e is cli.yieldSentinel: return answer answer.append(self.__ror__(e)) self.depth -= 1; return answer
[docs] def __invert__(self) -> BaseCli: """Returns a :class:`~k1lib.cli.init.BaseCli` that makes everything an iterator. Not entirely sure when this comes in handy, but it's there.""" return inv_dereference(self.igT)
[docs]class bindec(BaseCli):
[docs] def __init__(self, cats:List[Any], f=None): """Binary decodes the input. Example:: # returns ['a', 'c'] 5 | bindec("abcdef") # returns 'a,c' 5 | bindec("abcdef", join(",")) :param cats: categories :param f: transformation function of the selected elements. Defaulted to :class:`toList`, but others like :class:`join` is useful too""" self.cats = cats; self.f = f or cli.toList()
[docs] def __ror__(self, it): it = bin(int(it))[2:][::-1] return (e for i, e in zip(it, self.cats) if i == '1') | self.f
settings.add("smooth", 10, "default smooth amount, used in utils.smooth")
[docs]class smooth(BaseCli):
[docs] def __init__(self, consecutives=None): """Smoothes out the input stream. Literally just a shortcut for:: batched(consecutives) | toMean().all() Example:: # returns [4.5, 14.5, 24.5] range(30) | smooth(10) | deref() Smoothing over :class:`torch.Tensor` or :class:`numpy.ndarray` will be much faster, and produce high dimensional results:: # returns torch.Tensor with shape (2, 3, 4) torch.randn(10, 3, 4) | smooth(4) The default consecutive value is in ``settings.cli.smooth``. This is useful if you are smoothing over multiple lists at the same time, like this:: # can change a single smooth value temporarily here, and all sequences will be smoothed in the same way with settings.cli.context(smooth=5): x = list(np.linspace(-2, 2, 50)) y = x | apply(op()**2) | deref() plt.plot(x | smooth() | deref(), y | smooth() | deref()) :param consecutives: if not defined, then used the value inside ``settings.cli.smooth``""" self.b = cli.batched(consecutives or settings.smooth)
[docs] def __ror__(self, it): it = it | self.b if isinstance(it, settings.arrayTypes): return it.mean(1) return it | cli.toMean().all()
def _f(): pass _code = type(_f.__code__)
[docs]def disassemble(f=None): """Disassembles anything piped into it. Normal usage:: def f(a, b): return a**2 + b # both of these print out disassembled info f | disassemble() disassemble(f) # you can pass in lambdas disassemble(lambda x: x + 3) # or even raw code "lambda x: x + 3" | disassemble()""" c = f if c is None: return cli.aS(disassemble) if isinstance(c, str): c = compile(c, "", "exec") try: c = c.__code__ except: pass if not isinstance(c, _code): raise RuntimeError(f"`{c}` is not a code object/function/class method/string code") print(f"co_argcount: {c.co_argcount}") print(f"co_cellvars: {c.co_cellvars}") print(f"co_consts: {c.co_consts}") print(f"co_filename: {c.co_filename}") print(f"co_firstlineno: {c.co_firstlineno}") print(f"co_flags: {c.co_flags}") print(f"co_freevars: {c.co_freevars}") print(f"co_kwonlyargcount: {c.co_kwonlyargcount}") print(f"co_lnotab: {c.co_lnotab | cli.toStr() | join(' ')}") print(f"co_name: {c.co_name}") print(f"co_names: {c.co_names}") print(f"co_nlocals: {c.co_nlocals}") print(f"co_posonlyargcount: {c.co_posonlyargcount}") print(f"co_stacksize: {c.co_stacksize}") print(f"co_varnames: {c.co_varnames}") print(f"Disassembly:"); dis.disassemble(c) with k1lib.captureStdout() as out: c.co_consts | cli.filt(lambda x: "code" in str(type(x))) | cli.tee(lambda _: "----------------------- inner code object -----------------------\n") | cli.apply(disassemble) | cli.ignore() out() | cli.filt(cli.op().strip() != "") | cli.apply("|" + cli.op()) | cli.indent() | cli.stdout()
shortName = lambda s: s.split(os.sep)[-1]
[docs]def tree(fL=10, dL=10, depth=float("inf"), ff:Callable[[str], bool]=(lambda s: True), df:Callable[[str], bool]=(lambda s: True)): """Recursively gets all files and folders. Output format might be a bit strange, so this is mainly for visualization. Example:: "." | tree() | deref() :param fL: max number of file per directory included in output :param dL: max number of child directories per directory included in output :param depth: explore depth :param ff: optional file filter function :param df: optional directory filter function""" processFolders = cli.apply(lambda x: [shortName(x), x]) | cli.apply(lambda x: x | tree(fL, dL, depth-1, ff, df) if depth > 0 else [], 1) | cli.transpose() | cli.toDict() return cli.ls() | ~cli.sortF(os.path.isfile) | ((cli.filt(os.path.isfile) | cli.filt(ff) | cli.head(fL) | cli.apply(shortName)) & (~cli.filt(os.path.isfile) | cli.filt(df) | cli.head(dL) | processFolders))
[docs]class lookup(BaseCli):
[docs] def __init__(self, d:dict, col:int=None): """Looks up items from a dictionary/object. Example:: d = {"a": 3, "b": 5, "c": 52} # returns [3, 5, 52, 52, 3] "abcca" | lookup(d) | deref() # returns [[0, 3], [1, 5], [2, 52], [3, 52], [4, 3]] [range(5), "abcca"] | transpose() | lookup(d, 1) | deref() :param d: any object that can be sliced with the inputs :param col: if None, lookup on each row, else lookup a specific column only""" self.d = d; self.col = col
[docs] def __ror__(self, it): d = self.d return it | cli.apply(lambda e: d[e], self.col)
[docs]class dictFields(BaseCli):
[docs] def __init__(self, *fields, default=""): """Grab a bunch of dictionary fields. Example:: # returns [3, 1, ''] {"a": 1, "b": 2, "c": 3} | dictFields("c", "a", "d") """ self.fields = fields; self.default = default
[docs] def __ror__(self, d): return [d.get(f, self.default) for f in self.fields]