Source code for k1lib.cli.utils

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for all short and random quality-of-life utilities."""
from k1lib.cli.init import patchDefaultDelim, BaseCli, yieldT
import k1lib.cli as cli, numbers, numpy as np, dis
from k1lib.cli.typehint import *
from typing import overload, Iterator, Any, List, Set, Union, Callable
import k1lib, time, math, os
from collections import defaultdict
try: import torch; hasTorch = True
except: torch = k1lib.Object().withAutoDeclare(lambda: type("RandomClass", (object, ), {})); hasTorch = False
try: import PIL; hasPIL = True
except: hasPIL = False
try: plt = k1lib.dep("matplotlib.pyplot")
except: pass
__all__ = ["size", "shape", "item", "rItem", "iden", "join", "wrapList",
           "equals", "reverse", "ignore", "rateLimit", "timeLimit", "tab", "indent",
           "clipboard", "deref", "bindec", "smooth", "disassemble",
           "tree", "lookup", "dictFields", "backup", "sketch", "syncStepper"]
settings = k1lib.settings.cli
def exploreSize(it):                                                             # exploreSize
    """Returns first element and length of array. Returns [first item, length]""" # exploreSize
    if isinstance(it, str): return None, len(it)                                 # exploreSize
    try: return it[0], len(it)                                                   # exploreSize
    except: pass                                                                 # exploreSize
    sentinel = object(); it = iter(it)                                           # exploreSize
    o = next(it, sentinel); count = 1                                            # exploreSize
    if o is sentinel: return None, 0                                             # exploreSize
    try:                                                                         # exploreSize
        while True: next(it); count += 1                                         # exploreSize
    except StopIteration: pass                                                   # exploreSize
    return o, count                                                              # exploreSize
[docs]class size(BaseCli): # size
[docs] def __init__(self, idx=None): # size """Returns number of rows and columns in the input. Example:: # returns (3, 2) [[2, 3], [4, 5, 6], [3]] | shape() # returns 3 [[2, 3], [4, 5, 6], [3]] | shape(0) # returns 2 [[2, 3], [4, 5, 6], [3]] | shape(1) # returns (2, 0) [[], [2, 3]] | shape() # returns (3,) [2, 3, 5] | shape() # returns 3 [2, 3, 5] | shape(0) # returns (3, 2, 2) [[[2, 1], [0, 6, 7]], 3, 5] | shape() # returns (1, 3) ["abc"] | shape() # returns (1, 2, 3) [torch.randn(2, 3)] | shape() # returns (2, 3, 5) shape()(np.random.randn(2, 3, 5)) :class:`shape` is an alias of this cli. Use whichever is more intuitive for you. There's also :class:`lengths`, which is sort of a simplified/faster version of this, but only use it if you are sure that ``len(it)`` can be called. :param idx: if not specified, returns a tuple of ints. If specified, then returns the specific index of the tuple""" # size super().__init__(); self.idx = idx; # size if idx is not None: self._f = cli.item(idx) # size
def _all_array_opt(self, it, level): return np.array(it.shape[level:])[tuple([None]*level)] + np.zeros(it.shape[:level], dtype=int)[(*[slice(None)]*level, None)] # size def _typehint(self, inp): # size if self.idx is not None: return int # size return tList(int) # size
[docs] def __ror__(self, it:Iterator[str]): # size idx = self.idx # size if idx == 0: # size try: return len(it) # size except: return exploreSize(it)[1] # size if hasPIL and isinstance(it, PIL.Image.Image): # size return it.size if idx is None else it.size[idx] # size if idx is None: # size answer = [] # size try: # size while True: # size if isinstance(it, settings.arrayTypes): # size return tuple(answer + list(it.shape)) # size it, s = exploreSize(it); answer.append(s) # size except TypeError: pass # size return tuple(answer) # size return exploreSize(it | self._f)[1] # size
shape = size # size noFill = object() # size
[docs]class item(BaseCli): # item
[docs] def __init__(self, amt:int=1, fill=noFill): # item """Returns the first element of the input iterator. Example:: # returns 0 range(5) | item() # returns torch.Size([5]) torch.randn(3,4,5) | item(2) | shape() # returns 3 [] | item(fill=3) :param amt: how many times do you want to call item() back to back? :param fill: if iterator length is 0, return this""" # item self.amt = amt; self.fill = fill # item self.fillP = [fill] if fill != noFill else [] # preprocessed, to be faster # item if self.amt != 1: self._f = cli.serial(*(item(fill=self.fill) for _ in range(self.amt))) # item
def _all_array_opt(self, it, level): return it[(*[slice(None, None, None) for i in range(level)], 0)] # item def _typehint(self, inp): # item if isinstance(inp, tListIterSet): return inp.child # item if isinstance(inp, tCollection): return inp.children[0] # item if isinstance(inp, tArrayTypes): # item if inp.rank is None: return inp.__class__(inp.child, None) # item if inp.rank - self.amt >= 1: return inp.__class__(inp.child, inp.rank-self.amt) # item return inp.child # item return tAny() # item
[docs] def __ror__(self, it:Iterator[str]): # item if self.amt != 1: return it | self._f # item return next(iter(it), *self.fillP) # item
[docs]class rItem(BaseCli): # rItem
[docs] def __init__(self, idx:int): # rItem """Combines ``rows(idx) | item()``, as this is a pretty common pattern. Example:: iter(range(10)) | rItem(4) # returns 4 """ # rItem self.idx = idx; self.arrayTypes = (*settings.arrayTypes, list, tuple) # rItem
def _all_array_opt(self, it, level:int): return it[(*[slice(None, None, None) for i in range(level)], self.idx)] # rItem
[docs] def __ror__(self, it): # rItem if isinstance(it, self.arrayTypes): return it[self.idx] # rItem for i, e in zip(range(self.idx+1), it): pass # rItem return e # rItem
[docs]class iden(BaseCli): # iden
[docs] def __init__(self): # iden """Yields whatever the input is. Useful for multiple streams. Example:: # returns range(5) range(5) | iden()""" # iden super().__init__() # iden
def _all_array_opt(self, it, level): return it # iden def _typehint(self, inp): return inp # iden
[docs] def __ror__(self, it:Iterator[Any]): return it # iden
[docs]class join(BaseCli): # join
[docs] def __init__(self, delim:str=None): # join r"""Merges all strings into 1, with `delim` in the middle. Basically :meth:`str.join`. Example:: # returns '2\na' [2, "a"] | join("\n")""" # join super().__init__(); self.delim = patchDefaultDelim(delim) # join
def _typehint(self, inp): return str # join
[docs] def __ror__(self, it:Iterator[str]): # join return self.delim.join(it | cli.apply(str)) # join
[docs]class wrapList(BaseCli): # wrapList
[docs] def __init__(self): # wrapList """Wraps inputs inside a list. There's a more advanced cli tool built from this, which is :meth:`~k1lib.cli.structural.unsqueeze`. Example:: # returns [5] 5 | wrapList()""" # wrapList super().__init__() # wrapList
def _all_array_opt(self, it, level): return it[(*[slice(None)]*level, None)] # wrapList def _typehint(self, inp): return tList(inp) # wrapList
[docs] def __ror__(self, it) -> List[Any]: # wrapList if isinstance(it, settings.arrayTypes): return it[None] # wrapList return [it] # wrapList
class _EarlyExp(Exception): pass # _EarlyExp
[docs]class equals: # equals
[docs] def __init__(self): # equals """Checks if all incoming columns/streams are identical""" # equals super().__init__() # equals
[docs] def __ror__(self, streams:Iterator[Iterator[str]]): # equals streams = list(streams) # equals for row in zip(*streams): # equals sampleElem = row[0] # equals try: # equals for elem in row: # equals if sampleElem != elem: yield False; raise _EarlyExp() # equals yield True # equals except _EarlyExp: pass # equals
[docs]class reverse(BaseCli): # reverse
[docs] def __init__(self): # reverse """Reverses incoming list. Example:: # returns [3, 5, 2] [2, 5, 3] | reverse() | deref()""" # reverse super().__init__() # reverse
def _all_array_opt(self, it, level): return it[(*[slice(None)]*level, slice(None, None, -1))] # reverse def _typehint(self, inp): # reverse if isinstance(inp, tListIterSet): return tIter(inp.child) # reverse return tAny() # reverse
[docs] def __ror__(self, it:Iterator[str]) -> List[str]: # reverse if isinstance(it, settings.arrayTypes): return it[::-1] # reverse return reversed(list(it)) # reverse
[docs]class ignore(BaseCli): # ignore
[docs] def __init__(self): # ignore r"""Just loops through everything, ignoring the output. Example:: # will just return an iterator, and not print anything [2, 3] | apply(lambda x: print(x)) # will prints "2\n3" [2, 3] | apply(lambda x: print(x)) | ignore()""" # ignore super().__init__() # ignore
def _all_array_opt(self, it, level): return it # ignore def _typehint(self, inp): return type(None) # ignore
[docs] def __ror__(self, it:Iterator[Any]): # ignore if isinstance(it, settings.arrayTypes): return # ignore for _ in it: pass # ignore
[docs]class rateLimit(BaseCli): # rateLimit
[docs] def __init__(self, f, delay=0.1): # rateLimit """Limits the execution flow rate upon a condition. Example:: s = 0; semaphore = 0 def heavyAsyncOperation(i): global semaphore, s semaphore += 1 s += i; time.sleep(1) semaphore -= 1; return i**2 # returns (20,), takes 1s to run range(20) | applyTh(heavyAsyncOperation, 100) | shape() # returns (20,), takes 4s to run (20/5 = 4) range(20) | rateLimit(lambda: semaphore < 5) | applyTh(heavyAsyncOperation, 100) | shape() The first test case is not rate-limited, so it will run all 20 threads at the same time, and all of them will finish after 1 second. The second test case is rate-limited, so that there can only be 5 concurrently executing threads because of the semaphore count check. Therefore this takes around 4 seconds to run. :param f: checking function. Should return true if execution is allowed :param delay: delay in seconds between calling ``f()``""" # rateLimit self.f = f; self.delay = delay # rateLimit
def _typehint(self, inp): # rateLimit if isinstance(inp, tListIterSet): return tIter(inp.child) # rateLimit if isinstance(inp, tArrayTypes): # rateLimit if inp.rank is None: return tIter(inp) # rateLimit if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1)) # rateLimit return tIter(inp.child) # rateLimit if isinstance(inp, tCollection): return inp # rateLimit return tAny() # rateLimit
[docs] def __ror__(self, it): # rateLimit f = self.f; delay = self.delay # rateLimit for e in it: # rateLimit while not f(): time.sleep(delay) # rateLimit yield e # rateLimit
[docs] @staticmethod # rateLimit def cpu(maxUtilization=90): # rateLimit """Limits flow rate when cpu utilization is more than a specified percentage amount. Needs to install the package ``psutil`` to actually work. Example:: # returns [0, 1, 4, 9, 16] range(5) | rateLimit.cpu() | apply(op()**2) | deref()""" # rateLimit import psutil # rateLimit return rateLimit(lambda: psutil.cpu_percent() < maxUtilization) # rateLimit
[docs]class timeLimit(BaseCli): # timeLimit
[docs] def __init__(self, t): # timeLimit """Caps the flow after a specified amount of time has passed. Example:: # returns 20, or roughly close to that repeatF(lambda: time.sleep(0.1)) | timeLimit(2) | shape(0)""" # timeLimit self.t = t # timeLimit
def _typehint(self, inp): # timeLimit if isinstance(inp, tListIterSet): return tIter(inp.child) # timeLimit if isinstance(inp, tArrayTypes): # timeLimit if inp.rank is None: return tIter(inp) # timeLimit if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1)) # timeLimit return tIter(inp.child) # timeLimit if isinstance(inp, tCollection): return inp # timeLimit return tAny() # timeLimit
[docs] def __ror__(self, it): # timeLimit _time = time.time; endTime = _time() + self.t # timeLimit for e in it: # timeLimit yield e # timeLimit if _time() > endTime: break # timeLimit
[docs]def tab(pad:str=" "*4): # tab """Indents incoming string iterator. Example:: # prints out indented 0 to 9 range(10) | tab() | headOut()""" # tab return cli.apply(lambda x: f"{pad}{x}") # tab
indent = tab # tab
[docs]class clipboard(BaseCli): # clipboard
[docs] def __init__(self): # clipboard """Saves the input to clipboard. Example:: # copies "abc" into the clipboard. Just use Ctrl+V to paste as usual "abc" | clipboard()""" # clipboard import pyperclip; self.pyperclip = pyperclip # clipboard
def _typehint(self, inp): return type(None) # clipboard
[docs] def __ror__(self, s): self.pyperclip.copy(s) # clipboard
a = [numbers.Number, np.number, str, bool, bytes, k1lib.UValue] # clipboard if hasTorch: a.append(torch.nn.Module) # clipboard settings.atomic.add("deref", tuple(a), "used by deref") # clipboard Tensor = torch.Tensor; atomic = settings.atomic # clipboard class inv_dereference(BaseCli): # inv_dereference def __init__(self, igT=False): # inv_dereference """Kinda the inverse to :class:`dereference`""" # inv_dereference super().__init__(); self.igT = igT # inv_dereference def __ror__(self, it:Iterator[Any]) -> List[Any]: # inv_dereference for e in it: # inv_dereference if e is None or isinstance(e, atomic.deref): yield e # inv_dereference elif isinstance(e, settings.arrayTypes): # inv_dereference if not self.igT and len(e.shape) == 0: yield e.item() # inv_dereference else: yield e # inv_dereference else: # inv_dereference try: yield e | self # inv_dereference except: yield e # inv_dereference
[docs]class deref(BaseCli): # deref
[docs] def __init__(self, maxDepth=float("inf"), igT=True): # deref """Recursively converts any iterator into a list. Example:: # returns something like "<range_iterator at 0x7fa8c52ca870>" iter(range(5)) # returns [0, 1, 2, 3, 4] iter(range(5)) | deref() # returns [2, 3], yieldT stops things early [2, 3, yieldT, 6] | deref() You can also specify a ``maxDepth``:: # returns something like "<list_iterator at 0x7f810cf0fdc0>" iter([range(3)]) | deref(0) # returns [range(3)] iter([range(3)]) | deref(1) # returns [[0, 1, 2]] iter([range(3)]) | deref(2) There are a few classes/types that are considered atomic, and :class:`deref` will never try to iterate over it. If you wish to change it, do something like:: settings.cli.atomic.deref = (int, float, ...) :param maxDepth: maximum depth to dereference. Starts at 0 for not doing anything at all :param igT: short for "ignore tensor". If True, then don't loop over :class:`torch.Tensor` and :class:`numpy.ndarray` internals""" # deref super().__init__(); self.igT = igT # deref self.maxDepth = maxDepth; self.depth = 0 # deref if hasTorch: # deref self.arrayType = (torch.Tensor, np.ndarray) if k1lib.settings.startup.or_patch.numpy else torch.Tensor # deref else: self.arrayType = (np.ndarray,) if k1lib.settings.startup.or_patch.numpy else () # deref
def _typehint(self, inp, depth=float("inf")): # deref if depth == 0: return inp # deref if depth == float("inf"): depth = self.maxDepth # deref if isinstance(inp, type) and issubclass(inp, atomic.deref): return inp # deref if isinstance(inp, tArrayTypes): # deref if self.igT: return inp # deref if inp.rank is None: return tList(tAny()) # deref if inp.rank == 1: # deref if isinstance(inp, tTensor): # deref return tList(type(torch.tensor(3, dtype=inp.child).item())) # deref if isinstance(inp, tNpArray): # deref return tList(type(np.array(3, dtype=inp.child).item())) # deref return tList(self._typehint(inp.item(), depth-1)) # deref if isinstance(inp, tListIterSet): # deref return tList(self._typehint(inp.child, depth-1)) # deref if isinstance(inp, tCollection): # deref return tCollection(*(self._typehint(e, depth-1) for e in inp.children)) # deref return tAny() # deref
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: # deref if self.depth >= self.maxDepth: return it # deref elif isinstance(it, np.number): return it.item() # deref elif isinstance(it, atomic.deref): return it # deref elif isinstance(it, self.arrayType): # deref if self.igT: return it # deref if len(it.shape) == 0: return it.item() # deref elif isinstance(it, dict): self.depth += 1; _d = {k: self.__ror__(v) for k, v in it.items()}; self.depth -= 1; return _d # deref elif isinstance(it, tuple): self.depth += 1; _t = tuple(self.__ror__(k) for k in it); self.depth -= 1; return _t # deref elif isinstance(it, set): self.depth += 1; _s = set (self.__ror__(k) for k in it); self.depth -= 1; return _s # deref try: iter(it) # deref except: return it # deref self.depth += 1; answer = [] # deref for e in it: # deref if e is cli.yieldT: return answer # deref answer.append(self.__ror__(e)) # deref self.depth -= 1; return answer # deref
[docs] def __invert__(self) -> BaseCli: # deref """Returns a :class:`~k1lib.cli.init.BaseCli` that makes everything an iterator. Not entirely sure when this comes in handy, but it's there.""" # deref return inv_dereference(self.igT) # deref
[docs]class bindec(BaseCli): # bindec
[docs] def __init__(self, cats:List[Any], f=None): # bindec """Binary decodes the input. Example:: # returns ['a', 'c'] 5 | bindec("abcdef") # returns 'a,c' 5 | bindec("abcdef", join(",")) :param cats: categories :param f: transformation function of the selected elements. Defaulted to :class:`toList`, but others like :class:`join` is useful too""" # bindec self.cats = cats; self.f = f or cli.toList() # bindec
[docs] def __ror__(self, it): # bindec it = bin(int(it))[2:][::-1] # bindec return (e for i, e in zip(it, self.cats) if i == '1') | self.f # bindec
settings.add("smooth", 10, "default smooth amount, used in utils.smooth") # bindec
[docs]class smooth(BaseCli): # smooth
[docs] def __init__(self, consecutives=None): # smooth """Smoothes out the input stream. Literally just a shortcut for:: batched(consecutives) | toMean().all() Example:: # returns [4.5, 14.5, 24.5] range(30) | smooth(10) | deref() Smoothing over :class:`torch.Tensor` or :class:`numpy.ndarray` will be much faster, and produce high dimensional results:: # returns torch.Tensor with shape (2, 3, 4) torch.randn(10, 3, 4) | smooth(4) The default consecutive value is in ``settings.cli.smooth``. This is useful if you are smoothing over multiple lists at the same time, like this:: # can change a single smooth value temporarily here, and all sequences will be smoothed in the same way with settings.cli.context(smooth=5): x = list(np.linspace(-2, 2, 50)) y = x | apply(op()**2) | deref() plt.plot(x | smooth() | deref(), y | smooth() | deref()) :param consecutives: if not defined, then used the value inside ``settings.cli.smooth``""" # smooth self.b = cli.batched(consecutives or settings.smooth) # smooth
[docs] def __ror__(self, it): # smooth it = it | self.b # smooth if isinstance(it, settings.arrayTypes): return it.mean(1) # smooth return it | cli.toMean().all() # smooth
def _f(): pass # _f _code = type(_f.__code__) # _f
[docs]def disassemble(f=None): # disassemble """Disassembles anything piped into it. Normal usage:: def f(a, b): return a**2 + b # both of these print out disassembled info f | disassemble() disassemble(f) # you can pass in lambdas disassemble(lambda x: x + 3) # or even raw code "lambda x: x + 3" | disassemble()""" # disassemble c = f # disassemble if c is None: return cli.aS(disassemble) # disassemble if isinstance(c, str): c = compile(c, "", "exec") # disassemble try: c = c.__code__ # disassemble except: pass # disassemble if not isinstance(c, _code): raise RuntimeError(f"`{c}` is not a code object/function/class method/string code") # disassemble print(f"co_argcount: {c.co_argcount}") # disassemble print(f"co_cellvars: {c.co_cellvars}") # disassemble print(f"co_consts: {c.co_consts}") # disassemble print(f"co_filename: {c.co_filename}") # disassemble print(f"co_firstlineno: {c.co_firstlineno}") # disassemble print(f"co_flags: {c.co_flags}") # disassemble print(f"co_freevars: {c.co_freevars}") # disassemble print(f"co_kwonlyargcount: {c.co_kwonlyargcount}") # disassemble print(f"co_lnotab: {c.co_lnotab | cli.apply(str) | join(' ')}") # disassemble print(f"co_name: {c.co_name}") # disassemble print(f"co_names: {c.co_names}") # disassemble print(f"co_nlocals: {c.co_nlocals}") # disassemble print(f"co_posonlyargcount: {c.co_posonlyargcount}") # disassemble print(f"co_stacksize: {c.co_stacksize}") # disassemble print(f"co_varnames: {c.co_varnames}") # disassemble print(f"Disassembly:"); dis.disassemble(c) # disassemble with k1lib.captureStdout() as out: # disassemble c.co_consts | cli.filt(lambda x: "code" in str(type(x))) | cli.tee(lambda _: "----------------------- inner code object -----------------------\n") | cli.apply(disassemble) | cli.ignore() # disassemble out() | cli.filt(cli.op().strip() != "") | cli.apply("|" + cli.op()) | cli.indent() | cli.stdout() # disassemble
shortName = lambda s: s.split(os.sep)[-1] # disassemble
[docs]def tree(fL=10, dL=10, depth=float("inf"), ff:Callable[[str], bool]=(lambda s: True), df:Callable[[str], bool]=(lambda s: True)): # tree """Recursively gets all files and folders. Output format might be a bit strange, so this is mainly for visualization. Example:: "." | tree() | deref() :param fL: max number of file per directory included in output :param dL: max number of child directories per directory included in output :param depth: explore depth :param ff: optional file filter function :param df: optional directory filter function""" # tree processFolders = cli.apply(lambda x: [shortName(x), x]) | cli.apply(lambda x: x | tree(fL, dL, depth-1, ff, df) if depth > 0 else [], 1) | cli.toDict() # tree a = cli.filt(os.path.isfile) | cli.filt(ff) | cli.head(fL) | cli.apply(shortName) | cli.aS(set) # tree b = ~cli.filt(os.path.isfile) | cli.filt(df) | cli.head(dL) | processFolders # tree return cli.ls() | ~cli.sortF(os.path.isfile) | (a & b) # tree
[docs]class lookup(BaseCli): # lookup
[docs] def __init__(self, d:dict, col:int=None, fill=None): # lookup """Looks up items from a dictionary/object. Example:: d = {"a": 3, "b": 5, "c": 52} # returns [3, 5, 52, 52, 3] "abcca" | lookup(d) | deref() # returns [[0, 3], [1, 5], [2, 52], [3, 52], [4, 3]] [range(5), "abcca"] | transpose() | lookup(d, 1) | deref() :param d: any object that can be sliced with the inputs :param col: if None, lookup on each row, else lookup a specific column only :param fill: if None, throws error if looked up element is not available, else returns the fill value""" # lookup self.d = d; self.col = col # lookup if fill is not None: self.d = defaultdict(lambda: fill, self.d) # lookup
def _typehint(self, inp): # lookup t = inferType(list(self.d.values())) # lookup if isinstance(t, tListIterSet): return tIter(t.child) # lookup if isinstance(t, tCollection): return tIter(tLowest(*t.children)) # lookup return tIter(tAny()) # lookup
[docs] def __ror__(self, it): # lookup d = self.d # lookup return it | cli.apply(lambda e: d[e], self.col) # lookup
[docs]class dictFields(BaseCli): # dictFields
[docs] def __init__(self, *fields, default=""): # dictFields """Grab a bunch of dictionary fields. Example:: # returns [3, 1, ''] {"a": 1, "b": 2, "c": 3} | dictFields("c", "a", "d") """ # dictFields self.fields = fields; self.default = default # dictFields
[docs] def __ror__(self, d): # dictFields return [d.get(f, self.default) for f in self.fields] # dictFields
[docs]class backup(BaseCli): # backup
[docs] def __init__(self): # backup """Backs up a file/folder. Example:: "some/folderOrFile" | backup() "some/folderOrFile" | backup.restore() Really straightforward. Uses bash internally to copy files recursively, so not available on Windows.""" # backup pass # backup
[docs] def __ror__(self, it): # backup it = os.path.expanduser(it) # backup None | cli.cmd(f"rm -rf '{it}.backup'") | cli.ignore() # backup None | cli.cmd(f"cp -r '{it}' '{it}.backup'") | cli.ignore() # backup
[docs] @staticmethod # backup def restore(): # backup def inner(it): # backup it = os.path.expanduser(it) # backup None | cli.cmd(f"rm -rf '{it}'") | cli.ignore() # backup None | cli.cmd(f"cp -r '{it}.backup' '{it}'") | cli.ignore() # backup return cli.aS(inner) # backup
[docs]class sketch(BaseCli): # sketch
[docs] def __init__(self, transforms:List[callable]=[], titles:List[str]=None, im:bool=False, ncols:int=None): # sketch """Convenience tool to plot multiple matplotlib plots at the same time, while still keeping everything short and in 1 line. For this example, we're trying to plot x^1, x^2, ..., x^8 on 2 separate plots, one left one right. The left will have x^1 till x^4, the right will have x^5 to x^8. How you would do this normally:: x = np.linspace(-2, 2); exps = range(1, 9) fig, axes = plt.subplots(1, 2, figsize=(10, 4)) # simplest solution plt.sca(axes[0]); plt.plot(x, x**1); plt.plot(x, x**2); plt.plot(x, x**3); plt.plot(x, x**4); plt.legend([1, 2, 3, 4]); plt.xlabel("x axis") # solution using a little bit of cli plt.sca(axes[1]); range(5, 9) | apply(lambda a: [x, x**a]) | ~apply(plt.plot) | ignore(); plt.legend([5, 6, 7, 8]); plt.xlabel("x axis") But this is long, and I'm incredibly lazy to write it all out. So here's how it's going to work using this cli:: # plotting the first 4 lines only, in a single plot. Should be familiar and make sense to you before moving on exps | apply(lambda a: [x, x**a]) | batched(4) | item() | ~apply(plt.plot) | ignore() # plotting 8 lines across 2 plots. Simplest example using sketch(). It kinda captures clis after it and use it to plot each plot exps | apply(lambda a: [x, x**a]) | batched(4) | (sketch() | ~apply(plt.plot)) # same as above, but adding a grid and x axis label to all plots. Transformation functions can be anything you would # put inside a normal cli (`plt` will be passed as argument): string code, op()-capture, lambda functions, other cli tools transforms = ["x.grid(True)", op().xlabel("x axis"), lambda x: x.ylabel("y axis")] exps | apply(lambda a: [x, x**a]) | batched(4) | (sketch(transforms) | ~apply(plt.plot)) # same as above, but adding legends. [x, x**a] will eventually be directed to ~apply(plt.plot), while f"x^{a}" will be directed to aS(plt.legend) exps | apply(lambda a: [[x, x**a], f"x^{a}"]) | batched(4) | (sketch() | transpose() | ~apply(plt.plot) + iden() | deref() | rItem(1) | aS(plt.legend)) | deref() Last line will generate this plot: .. image:: ../images/probScale.png Is it worth the extra confusion? Afterall, it just saves you 2-3 lines of code. To me, it is worth it, because you can quickly change styles (add a grid, make y axis log) See also: :class:`~k1lib.cli.output.plotImgs` :param transforms: transform functions to be run when drawing every plot. ``plt`` (aka ``matplotlib.pyplot``) will be passed in :param titles: if specified, use these titles for each plot. Kinda hacky I have to admit :param im: if True, returns a PIL image and closes the sketch, else return nothing but still have the sketch open :param ncols: if specified, will sketch with this number of columns""" # sketch super().__init__(capture=True); self.titles = titles; self.im = im # sketch self.transforms = [cli.fastF(t) for t in transforms]; self.ncols = ncols # sketch
[docs] def __ror__(self, it): # sketch it = list(it); n = len(it); s = self.capturedSerial; transforms = self.transforms # sketch ncols = self.ncols or math.ceil(n**0.5); nrows = math.ceil(n/ncols) # sketch fig, axes = plt.subplots(nrows, ncols, figsize=(ncols*5, nrows*4)) # sketch if axes | cli.shape() | cli.shape(0) > 1: axes = axes.flatten() # sketch for i, [ax, e, title] in enumerate(zip(axes, it, self.titles or ("" | cli.repeat()))): # sketch plt.sca(ax); e | s | cli.deref() # sketch if title: plt.title(title) # sketch for trans in transforms: trans(plt) # sketch axes[i+1:] | cli.op().remove().all() | cli.deref(); plt.tight_layout() # sketch if self.im: return plt.gcf() | cli.toImg() # sketch
import numbers, sys; from collections import deque # sketch
[docs]class syncStepper(BaseCli): # syncStepper
[docs] def __init__(self, col=0, sort=False): # syncStepper """Steps forward all streams at a time, yielding same results from min to max. That's a bit vague, so let's see an example:: a = [["a", 1], ["b", 7 ], ["c", 4], ["e", 6]] b = [["b", 5], ["c", 1 ], ["d", 3], ["f", 5]] c = [["a", 2], ["c", -4], ["d", 9], ["e", 4]] [a, b, c] | syncStepper() | deref() # sync-step by the 1st column [a, b, c] | syncStepper(1, True) | deref() # sync-step by the 2nd column. Have to sort it explicitly The first line returns this:: [[['a', 1], None, ['a', 2]], [['b', 7], ['b', 5], None], [['c', 4], ['c', 1], ['c', -4]], [None, ['d', 3], ['d', 9]], [['e', 6], None, ['e', 4]], [None, ['f', 5], None]] The second line returns this:: [[None, None, ['c', -4]], [['a', 1], ['c', 1], None], [None, None, ['a', 2]], [None, ['d', 3], None], [['c', 4], None, ['e', 4]], [None, ['b', 5], None], [['e', 6], None, None], [['b', 7], None, None], [None, None, ['d', 9]]] ``col`` can be None, but it's quite a strange use case:: [['a', 'b', 'c', 'e'], ['b', 'c', 'd', 'f'], ['a', 'c', 'd', 'e']] | syncStepper(None) | deref() It returns this:: [[['a'], None, ['a']], [['b'], ['b'], None], [['c'], ['c'], ['c']], [None, ['d'], ['d']], [['e'], None, ['e']], [None, ['f'], None]] As you can see, for each line, it kinda yields elements with the same column. If that element doesn't exist, it'll just put None there. This expects the input streams are sorted at the column of interest. If they are not, specify ``sort=True``. It has roughly the same vibe as :class:`~k1lib.cli.structural.groupBy`, in that it groups everything by a specific column. The main difference here is that you can sync-step them line-by-line, loading very little into memory, so you can run this on giant datasets and not have to worry about running out of memory. With k streams each having n elements, you should expect memory complexity to be O(k), and the time complexity to be O(n*k^2/2). That k^2 term is kinda worrying, but in most use cases, k is small and so k^2 can be treated as a constant :param col: column where it should compare values and merge them together. Can be None, but that would be quite a weird use case :param sort: whether to sort the streams or not. This cli requires it, but it's not turned on by default because it's an intensive operation""" # syncStepper if col is None: self.col = 0; self.colPreprocess = cli.wrapList().all() # syncStepper else: self.col = col; self.colPreprocess = cli.iden() # syncStepper self.bank = deque(); self.sentinel = object(); self._sort = sort # syncStepper
def _append(self, stIdx1, val1, elem1): # append to bank in the correct position # syncStepper i = 0; val2 = self.minObj # syncStepper for i, [stIdx2, val2, elem2] in enumerate(self.bank): # syncStepper if val1 <= val2: break # syncStepper if val1 <= val2: self.bank.insert(i, [stIdx1, val1, elem1]) # syncStepper else: self.bank.append([stIdx1, val1, elem1]) # syncStepper def _yieldNext(self): # yield the next set of values # syncStepper n = len(self.sts); res = [None]*n; last = None; hasInit = False; changed = False; bank = self.bank; sentinel = self.sentinel # syncStepper for i, [stIdx, val, elem] in enumerate(bank): # syncStepper if not hasInit and elem is sentinel: return res, changed # syncStepper if last == val or not hasInit: changed = True; res[stIdx] = elem # syncStepper elif hasInit: break # syncStepper hasInit = True; last = val # syncStepper while bank[0][1] == last: # popping the values off # syncStepper stIdx, val1, elem1 = bank.popleft(); val2, elem2 = next(self.sts[stIdx]) # syncStepper if val1 > val2: raise Exception(f"Stream {stIdx} has not been sorted yet! Please sort all streams before passing it into syncStepper") # syncStepper self._append(stIdx, val2, elem2) # syncStepper return res, changed # syncStepper
[docs] def __ror__(self, sts): # sts = "streams" # syncStepper col = self.col # syncStepper # --------------------- All of this is just to figure out the type of the column dynamically. So painful --------------------- # syncStepper samples, sts = sts | self.colPreprocess.all() | cli.apply(cli.peek()) | cli.transpose() | cli.cut(col) + cli.iden() | cli.apply(list) # syncStepper if len([e for e in sts if e != []]) == 0: return # no elements to yield at all! # syncStepper n_nums = sum([1 if isinstance(e, numbers.Number) else 0 for e in samples]) # syncStepper n_strs = sum([1 if isinstance(e, str) else 0 for e in samples]); n = len(samples) # syncStepper if n_nums*(n-n_nums) + n_strs*(n-n_strs) > 0: raise Exception("The requested column in some of the streams is not purely of numeric or string type, a requirement of syncStepper(). Please fix your data structure and try again.") # syncStepper if n_nums + n_strs == 0: raise Exception("The requested column in some of the streams is not of numeric or string type, so can't compare them to sync-step them") # syncStepper # n = 3; n_strs = 1 # syncStepper text = n_strs > 0; self.minObj = "" if text else float("-inf"); self.maxObj = chr(sys.maxunicode) if text else float("inf"); senObj = [self.maxObj, self.sentinel] # syncStepper # --------------------- And here's the meat of the cli --------------------- # syncStepper sts = sts | (cli.sort(col, not text).all() if self._sort else cli.iden()) | cli.apply(lambda st: [st | cli.apply(lambda elem: [elem[col], elem]), senObj | cli.repeat()] | cli.joinStreams()) | cli.aS(list) # syncStepper sts | cli.apply(next) | cli.insertIdColumn() | ~cli.apply(lambda idx,e: self._append(idx, *e)) | cli.ignore(); self.sts = sts # syncStepper while True: # syncStepper res, changed = self._yieldNext() # syncStepper if not changed: break # syncStepper yield res # syncStepper