# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for all short and random quality-of-life utilities."""
from k1lib.cli.init import patchDefaultDelim, BaseCli, yieldT
import k1lib.cli as cli, numbers, numpy as np, dis
from k1lib.cli.typehint import *
from typing import overload, Iterator, Any, List, Set, Union, Callable
import k1lib, time, math, os
from collections import defaultdict
try: import torch; hasTorch = True
except: torch = k1lib.Object().withAutoDeclare(lambda: type("RandomClass", (object, ), {})); hasTorch = False
try: import PIL; hasPIL = True
except: hasPIL = False
try: plt = k1lib.dep("matplotlib.pyplot")
except: pass
__all__ = ["size", "shape", "item", "rItem", "iden", "join", "wrapList",
"equals", "reverse", "ignore", "rateLimit", "timeLimit", "tab", "indent",
"clipboard", "deref", "bindec", "smooth", "disassemble",
"tree", "lookup", "dictFields", "backup", "sketch", "syncStepper"]
settings = k1lib.settings.cli
def exploreSize(it): # exploreSize
"""Returns first element and length of array. Returns [first item, length]""" # exploreSize
if isinstance(it, str): return None, len(it) # exploreSize
try: return it[0], len(it) # exploreSize
except: pass # exploreSize
sentinel = object(); it = iter(it) # exploreSize
o = next(it, sentinel); count = 1 # exploreSize
if o is sentinel: return None, 0 # exploreSize
try: # exploreSize
while True: next(it); count += 1 # exploreSize
except StopIteration: pass # exploreSize
return o, count # exploreSize
[docs]class size(BaseCli): # size
[docs] def __init__(self, idx=None): # size
"""Returns number of rows and columns in the input.
Example::
# returns (3, 2)
[[2, 3], [4, 5, 6], [3]] | shape()
# returns 3
[[2, 3], [4, 5, 6], [3]] | shape(0)
# returns 2
[[2, 3], [4, 5, 6], [3]] | shape(1)
# returns (2, 0)
[[], [2, 3]] | shape()
# returns (3,)
[2, 3, 5] | shape()
# returns 3
[2, 3, 5] | shape(0)
# returns (3, 2, 2)
[[[2, 1], [0, 6, 7]], 3, 5] | shape()
# returns (1, 3)
["abc"] | shape()
# returns (1, 2, 3)
[torch.randn(2, 3)] | shape()
# returns (2, 3, 5)
shape()(np.random.randn(2, 3, 5))
:class:`shape` is an alias of this cli. Use whichever is more intuitive for you.
There's also :class:`lengths`, which is sort of a simplified/faster version of
this, but only use it if you are sure that ``len(it)`` can be called.
:param idx: if not specified, returns a tuple of ints. If specified,
then returns the specific index of the tuple""" # size
super().__init__(); self.idx = idx; # size
if idx is not None: self._f = cli.item(idx) # size
def _all_array_opt(self, it, level): return np.array(it.shape[level:])[tuple([None]*level)] + np.zeros(it.shape[:level], dtype=int)[(*[slice(None)]*level, None)] # size
def _typehint(self, inp): # size
if self.idx is not None: return int # size
return tList(int) # size
[docs] def __ror__(self, it:Iterator[str]): # size
idx = self.idx # size
if idx == 0: # size
try: return len(it) # size
except: return exploreSize(it)[1] # size
if hasPIL and isinstance(it, PIL.Image.Image): # size
return it.size if idx is None else it.size[idx] # size
if idx is None: # size
answer = [] # size
try: # size
while True: # size
if isinstance(it, settings.arrayTypes): # size
return tuple(answer + list(it.shape)) # size
it, s = exploreSize(it); answer.append(s) # size
except TypeError: pass # size
return tuple(answer) # size
return exploreSize(it | self._f)[1] # size
shape = size # size
noFill = object() # size
[docs]class item(BaseCli): # item
[docs] def __init__(self, amt:int=1, fill=noFill): # item
"""Returns the first element of the input iterator.
Example::
# returns 0
range(5) | item()
# returns torch.Size([5])
torch.randn(3,4,5) | item(2) | shape()
# returns 3
[] | item(fill=3)
:param amt: how many times do you want to call item() back to back?
:param fill: if iterator length is 0, return this""" # item
self.amt = amt; self.fill = fill # item
self.fillP = [fill] if fill != noFill else [] # preprocessed, to be faster # item
if self.amt != 1: self._f = cli.serial(*(item(fill=self.fill) for _ in range(self.amt))) # item
def _all_array_opt(self, it, level): return it[(*[slice(None, None, None) for i in range(level)], 0)] # item
def _typehint(self, inp): # item
if isinstance(inp, tListIterSet): return inp.child # item
if isinstance(inp, tCollection): return inp.children[0] # item
if isinstance(inp, tArrayTypes): # item
if inp.rank is None: return inp.__class__(inp.child, None) # item
if inp.rank - self.amt >= 1: return inp.__class__(inp.child, inp.rank-self.amt) # item
return inp.child # item
return tAny() # item
[docs] def __ror__(self, it:Iterator[str]): # item
if self.amt != 1: return it | self._f # item
return next(iter(it), *self.fillP) # item
[docs]class rItem(BaseCli): # rItem
[docs] def __init__(self, idx:int): # rItem
"""Combines ``rows(idx) | item()``, as this is a pretty common pattern.
Example::
iter(range(10)) | rItem(4) # returns 4
""" # rItem
self.idx = idx; self.arrayTypes = (*settings.arrayTypes, list, tuple) # rItem
def _all_array_opt(self, it, level:int): return it[(*[slice(None, None, None) for i in range(level)], self.idx)] # rItem
[docs] def __ror__(self, it): # rItem
if isinstance(it, self.arrayTypes): return it[self.idx] # rItem
for i, e in zip(range(self.idx+1), it): pass # rItem
return e # rItem
[docs]class iden(BaseCli): # iden
[docs] def __init__(self): # iden
"""Yields whatever the input is. Useful for multiple streams.
Example::
# returns range(5)
range(5) | iden()""" # iden
super().__init__() # iden
def _all_array_opt(self, it, level): return it # iden
def _typehint(self, inp): return inp # iden
[docs] def __ror__(self, it:Iterator[Any]): return it # iden
[docs]class join(BaseCli): # join
[docs] def __init__(self, delim:str=None): # join
r"""Merges all strings into 1, with `delim` in the middle. Basically
:meth:`str.join`. Example::
# returns '2\na'
[2, "a"] | join("\n")""" # join
super().__init__(); self.delim = patchDefaultDelim(delim) # join
def _typehint(self, inp): return str # join
[docs] def __ror__(self, it:Iterator[str]): # join
return self.delim.join(it | cli.apply(str)) # join
[docs]class wrapList(BaseCli): # wrapList
[docs] def __init__(self): # wrapList
"""Wraps inputs inside a list. There's a more advanced cli tool
built from this, which is :meth:`~k1lib.cli.structural.unsqueeze`. Example::
# returns [5]
5 | wrapList()""" # wrapList
super().__init__() # wrapList
def _all_array_opt(self, it, level): return it[(*[slice(None)]*level, None)] # wrapList
def _typehint(self, inp): return tList(inp) # wrapList
[docs] def __ror__(self, it) -> List[Any]: # wrapList
if isinstance(it, settings.arrayTypes): return it[None] # wrapList
return [it] # wrapList
class _EarlyExp(Exception): pass # _EarlyExp
[docs]class equals: # equals
[docs] def __init__(self): # equals
"""Checks if all incoming columns/streams are identical""" # equals
super().__init__() # equals
[docs] def __ror__(self, streams:Iterator[Iterator[str]]): # equals
streams = list(streams) # equals
for row in zip(*streams): # equals
sampleElem = row[0] # equals
try: # equals
for elem in row: # equals
if sampleElem != elem: yield False; raise _EarlyExp() # equals
yield True # equals
except _EarlyExp: pass # equals
[docs]class reverse(BaseCli): # reverse
[docs] def __init__(self): # reverse
"""Reverses incoming list.
Example::
# returns [3, 5, 2]
[2, 5, 3] | reverse() | deref()""" # reverse
super().__init__() # reverse
def _all_array_opt(self, it, level): return it[(*[slice(None)]*level, slice(None, None, -1))] # reverse
def _typehint(self, inp): # reverse
if isinstance(inp, tListIterSet): return tIter(inp.child) # reverse
return tAny() # reverse
[docs] def __ror__(self, it:Iterator[str]) -> List[str]: # reverse
if isinstance(it, settings.arrayTypes): return it[::-1] # reverse
return reversed(list(it)) # reverse
[docs]class ignore(BaseCli): # ignore
[docs] def __init__(self): # ignore
r"""Just loops through everything, ignoring the output.
Example::
# will just return an iterator, and not print anything
[2, 3] | apply(lambda x: print(x))
# will prints "2\n3"
[2, 3] | apply(lambda x: print(x)) | ignore()""" # ignore
super().__init__() # ignore
def _all_array_opt(self, it, level): return it # ignore
def _typehint(self, inp): return type(None) # ignore
[docs] def __ror__(self, it:Iterator[Any]): # ignore
if isinstance(it, settings.arrayTypes): return # ignore
for _ in it: pass # ignore
[docs]class rateLimit(BaseCli): # rateLimit
[docs] def __init__(self, f, delay=0.1): # rateLimit
"""Limits the execution flow rate upon a condition.
Example::
s = 0; semaphore = 0
def heavyAsyncOperation(i):
global semaphore, s
semaphore += 1
s += i; time.sleep(1)
semaphore -= 1; return i**2
# returns (20,), takes 1s to run
range(20) | applyTh(heavyAsyncOperation, 100) | shape()
# returns (20,), takes 4s to run (20/5 = 4)
range(20) | rateLimit(lambda: semaphore < 5) | applyTh(heavyAsyncOperation, 100) | shape()
The first test case is not rate-limited, so it will run all 20 threads at the
same time, and all of them will finish after 1 second.
The second test case is rate-limited, so that there can only be 5 concurrently
executing threads because of the semaphore count check. Therefore this takes
around 4 seconds to run.
:param f: checking function. Should return true if execution is allowed
:param delay: delay in seconds between calling ``f()``""" # rateLimit
self.f = f; self.delay = delay # rateLimit
def _typehint(self, inp): # rateLimit
if isinstance(inp, tListIterSet): return tIter(inp.child) # rateLimit
if isinstance(inp, tArrayTypes): # rateLimit
if inp.rank is None: return tIter(inp) # rateLimit
if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1)) # rateLimit
return tIter(inp.child) # rateLimit
if isinstance(inp, tCollection): return inp # rateLimit
return tAny() # rateLimit
[docs] def __ror__(self, it): # rateLimit
f = self.f; delay = self.delay # rateLimit
for e in it: # rateLimit
while not f(): time.sleep(delay) # rateLimit
yield e # rateLimit
[docs] @staticmethod # rateLimit
def cpu(maxUtilization=90): # rateLimit
"""Limits flow rate when cpu utilization is more than a specified
percentage amount. Needs to install the package ``psutil`` to actually work.
Example::
# returns [0, 1, 4, 9, 16]
range(5) | rateLimit.cpu() | apply(op()**2) | deref()""" # rateLimit
import psutil # rateLimit
return rateLimit(lambda: psutil.cpu_percent() < maxUtilization) # rateLimit
[docs]class timeLimit(BaseCli): # timeLimit
[docs] def __init__(self, t): # timeLimit
"""Caps the flow after a specified amount of time has
passed. Example::
# returns 20, or roughly close to that
repeatF(lambda: time.sleep(0.1)) | timeLimit(2) | shape(0)""" # timeLimit
self.t = t # timeLimit
def _typehint(self, inp): # timeLimit
if isinstance(inp, tListIterSet): return tIter(inp.child) # timeLimit
if isinstance(inp, tArrayTypes): # timeLimit
if inp.rank is None: return tIter(inp) # timeLimit
if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1)) # timeLimit
return tIter(inp.child) # timeLimit
if isinstance(inp, tCollection): return inp # timeLimit
return tAny() # timeLimit
[docs] def __ror__(self, it): # timeLimit
_time = time.time; endTime = _time() + self.t # timeLimit
for e in it: # timeLimit
yield e # timeLimit
if _time() > endTime: break # timeLimit
[docs]def tab(pad:str=" "*4): # tab
"""Indents incoming string iterator.
Example::
# prints out indented 0 to 9
range(10) | tab() | headOut()""" # tab
return cli.apply(lambda x: f"{pad}{x}") # tab
indent = tab # tab
[docs]class clipboard(BaseCli): # clipboard
[docs] def __init__(self): # clipboard
"""Saves the input to clipboard.
Example::
# copies "abc" into the clipboard. Just use Ctrl+V to paste as usual
"abc" | clipboard()""" # clipboard
import pyperclip; self.pyperclip = pyperclip # clipboard
def _typehint(self, inp): return type(None) # clipboard
[docs] def __ror__(self, s): self.pyperclip.copy(s) # clipboard
a = [numbers.Number, np.number, str, bool, bytes, k1lib.UValue] # clipboard
if hasTorch: a.append(torch.nn.Module) # clipboard
settings.atomic.add("deref", tuple(a), "used by deref") # clipboard
Tensor = torch.Tensor; atomic = settings.atomic # clipboard
class inv_dereference(BaseCli): # inv_dereference
def __init__(self, igT=False): # inv_dereference
"""Kinda the inverse to :class:`dereference`""" # inv_dereference
super().__init__(); self.igT = igT # inv_dereference
def __ror__(self, it:Iterator[Any]) -> List[Any]: # inv_dereference
for e in it: # inv_dereference
if e is None or isinstance(e, atomic.deref): yield e # inv_dereference
elif isinstance(e, settings.arrayTypes): # inv_dereference
if not self.igT and len(e.shape) == 0: yield e.item() # inv_dereference
else: yield e # inv_dereference
else: # inv_dereference
try: yield e | self # inv_dereference
except: yield e # inv_dereference
[docs]class deref(BaseCli): # deref
[docs] def __init__(self, maxDepth=float("inf"), igT=True): # deref
"""Recursively converts any iterator into a list.
Example::
# returns something like "<range_iterator at 0x7fa8c52ca870>"
iter(range(5))
# returns [0, 1, 2, 3, 4]
iter(range(5)) | deref()
# returns [2, 3], yieldT stops things early
[2, 3, yieldT, 6] | deref()
You can also specify a ``maxDepth``::
# returns something like "<list_iterator at 0x7f810cf0fdc0>"
iter([range(3)]) | deref(0)
# returns [range(3)]
iter([range(3)]) | deref(1)
# returns [[0, 1, 2]]
iter([range(3)]) | deref(2)
There are a few classes/types that are considered atomic, and :class:`deref`
will never try to iterate over it. If you wish to change it, do something like::
settings.cli.atomic.deref = (int, float, ...)
:param maxDepth: maximum depth to dereference. Starts at 0 for not doing anything
at all
:param igT: short for "ignore tensor". If True, then don't loop over :class:`torch.Tensor`
and :class:`numpy.ndarray` internals""" # deref
super().__init__(); self.igT = igT # deref
self.maxDepth = maxDepth; self.depth = 0 # deref
if hasTorch: # deref
self.arrayType = (torch.Tensor, np.ndarray) if k1lib.settings.startup.or_patch.numpy else torch.Tensor # deref
else: self.arrayType = (np.ndarray,) if k1lib.settings.startup.or_patch.numpy else () # deref
def _typehint(self, inp, depth=float("inf")): # deref
if depth == 0: return inp # deref
if depth == float("inf"): depth = self.maxDepth # deref
if isinstance(inp, type) and issubclass(inp, atomic.deref): return inp # deref
if isinstance(inp, tArrayTypes): # deref
if self.igT: return inp # deref
if inp.rank is None: return tList(tAny()) # deref
if inp.rank == 1: # deref
if isinstance(inp, tTensor): # deref
return tList(type(torch.tensor(3, dtype=inp.child).item())) # deref
if isinstance(inp, tNpArray): # deref
return tList(type(np.array(3, dtype=inp.child).item())) # deref
return tList(self._typehint(inp.item(), depth-1)) # deref
if isinstance(inp, tListIterSet): # deref
return tList(self._typehint(inp.child, depth-1)) # deref
if isinstance(inp, tCollection): # deref
return tCollection(*(self._typehint(e, depth-1) for e in inp.children)) # deref
return tAny() # deref
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: # deref
if self.depth >= self.maxDepth: return it # deref
elif isinstance(it, np.number): return it.item() # deref
elif isinstance(it, atomic.deref): return it # deref
elif isinstance(it, self.arrayType): # deref
if self.igT: return it # deref
if len(it.shape) == 0: return it.item() # deref
elif isinstance(it, dict): self.depth += 1; _d = {k: self.__ror__(v) for k, v in it.items()}; self.depth -= 1; return _d # deref
elif isinstance(it, tuple): self.depth += 1; _t = tuple(self.__ror__(k) for k in it); self.depth -= 1; return _t # deref
elif isinstance(it, set): self.depth += 1; _s = set (self.__ror__(k) for k in it); self.depth -= 1; return _s # deref
try: iter(it) # deref
except: return it # deref
self.depth += 1; answer = [] # deref
for e in it: # deref
if e is cli.yieldT: return answer # deref
answer.append(self.__ror__(e)) # deref
self.depth -= 1; return answer # deref
[docs] def __invert__(self) -> BaseCli: # deref
"""Returns a :class:`~k1lib.cli.init.BaseCli` that makes
everything an iterator. Not entirely sure when this comes in handy, but it's
there.""" # deref
return inv_dereference(self.igT) # deref
[docs]class bindec(BaseCli): # bindec
[docs] def __init__(self, cats:List[Any], f=None): # bindec
"""Binary decodes the input.
Example::
# returns ['a', 'c']
5 | bindec("abcdef")
# returns 'a,c'
5 | bindec("abcdef", join(","))
:param cats: categories
:param f: transformation function of the selected elements. Defaulted to :class:`toList`, but others like :class:`join` is useful too""" # bindec
self.cats = cats; self.f = f or cli.toList() # bindec
[docs] def __ror__(self, it): # bindec
it = bin(int(it))[2:][::-1] # bindec
return (e for i, e in zip(it, self.cats) if i == '1') | self.f # bindec
settings.add("smooth", 10, "default smooth amount, used in utils.smooth") # bindec
[docs]class smooth(BaseCli): # smooth
[docs] def __init__(self, consecutives=None): # smooth
"""Smoothes out the input stream.
Literally just a shortcut for::
batched(consecutives) | toMean().all()
Example::
# returns [4.5, 14.5, 24.5]
range(30) | smooth(10) | deref()
Smoothing over :class:`torch.Tensor` or :class:`numpy.ndarray` will
be much faster, and produce high dimensional results::
# returns torch.Tensor with shape (2, 3, 4)
torch.randn(10, 3, 4) | smooth(4)
The default consecutive value is in ``settings.cli.smooth``. This
is useful if you are smoothing over multiple lists at the same
time, like this::
# can change a single smooth value temporarily here, and all sequences will be smoothed in the same way
with settings.cli.context(smooth=5):
x = list(np.linspace(-2, 2, 50))
y = x | apply(op()**2) | deref()
plt.plot(x | smooth() | deref(), y | smooth() | deref())
:param consecutives: if not defined, then used the value inside ``settings.cli.smooth``""" # smooth
self.b = cli.batched(consecutives or settings.smooth) # smooth
[docs] def __ror__(self, it): # smooth
it = it | self.b # smooth
if isinstance(it, settings.arrayTypes): return it.mean(1) # smooth
return it | cli.toMean().all() # smooth
def _f(): pass # _f
_code = type(_f.__code__) # _f
[docs]def disassemble(f=None): # disassemble
"""Disassembles anything piped into it.
Normal usage::
def f(a, b):
return a**2 + b
# both of these print out disassembled info
f | disassemble()
disassemble(f)
# you can pass in lambdas
disassemble(lambda x: x + 3)
# or even raw code
"lambda x: x + 3" | disassemble()""" # disassemble
c = f # disassemble
if c is None: return cli.aS(disassemble) # disassemble
if isinstance(c, str): c = compile(c, "", "exec") # disassemble
try: c = c.__code__ # disassemble
except: pass # disassemble
if not isinstance(c, _code): raise RuntimeError(f"`{c}` is not a code object/function/class method/string code") # disassemble
print(f"co_argcount: {c.co_argcount}") # disassemble
print(f"co_cellvars: {c.co_cellvars}") # disassemble
print(f"co_consts: {c.co_consts}") # disassemble
print(f"co_filename: {c.co_filename}") # disassemble
print(f"co_firstlineno: {c.co_firstlineno}") # disassemble
print(f"co_flags: {c.co_flags}") # disassemble
print(f"co_freevars: {c.co_freevars}") # disassemble
print(f"co_kwonlyargcount: {c.co_kwonlyargcount}") # disassemble
print(f"co_lnotab: {c.co_lnotab | cli.apply(str) | join(' ')}") # disassemble
print(f"co_name: {c.co_name}") # disassemble
print(f"co_names: {c.co_names}") # disassemble
print(f"co_nlocals: {c.co_nlocals}") # disassemble
print(f"co_posonlyargcount: {c.co_posonlyargcount}") # disassemble
print(f"co_stacksize: {c.co_stacksize}") # disassemble
print(f"co_varnames: {c.co_varnames}") # disassemble
print(f"Disassembly:"); dis.disassemble(c) # disassemble
with k1lib.captureStdout() as out: # disassemble
c.co_consts | cli.filt(lambda x: "code" in str(type(x))) | cli.tee(lambda _: "----------------------- inner code object -----------------------\n") | cli.apply(disassemble) | cli.ignore() # disassemble
out() | cli.filt(cli.op().strip() != "") | cli.apply("|" + cli.op()) | cli.indent() | cli.stdout() # disassemble
shortName = lambda s: s.split(os.sep)[-1] # disassemble
[docs]def tree(fL=10, dL=10, depth=float("inf"), ff:Callable[[str], bool]=(lambda s: True), df:Callable[[str], bool]=(lambda s: True)): # tree
"""Recursively gets all files and folders. Output format might be a bit
strange, so this is mainly for visualization. Example::
"." | tree() | deref()
:param fL: max number of file per directory included in output
:param dL: max number of child directories per directory included in output
:param depth: explore depth
:param ff: optional file filter function
:param df: optional directory filter function""" # tree
processFolders = cli.apply(lambda x: [shortName(x), x]) | cli.apply(lambda x: x | tree(fL, dL, depth-1, ff, df) if depth > 0 else [], 1) | cli.toDict() # tree
a = cli.filt(os.path.isfile) | cli.filt(ff) | cli.head(fL) | cli.apply(shortName) | cli.aS(set) # tree
b = ~cli.filt(os.path.isfile) | cli.filt(df) | cli.head(dL) | processFolders # tree
return cli.ls() | ~cli.sortF(os.path.isfile) | (a & b) # tree
[docs]class lookup(BaseCli): # lookup
[docs] def __init__(self, d:dict, col:int=None, fill=None): # lookup
"""Looks up items from a dictionary/object. Example::
d = {"a": 3, "b": 5, "c": 52}
# returns [3, 5, 52, 52, 3]
"abcca" | lookup(d) | deref()
# returns [[0, 3], [1, 5], [2, 52], [3, 52], [4, 3]]
[range(5), "abcca"] | transpose() | lookup(d, 1) | deref()
:param d: any object that can be sliced with the inputs
:param col: if None, lookup on each row, else lookup a specific
column only
:param fill: if None, throws error if looked up element is not
available, else returns the fill value""" # lookup
self.d = d; self.col = col # lookup
if fill is not None: self.d = defaultdict(lambda: fill, self.d) # lookup
def _typehint(self, inp): # lookup
t = inferType(list(self.d.values())) # lookup
if isinstance(t, tListIterSet): return tIter(t.child) # lookup
if isinstance(t, tCollection): return tIter(tLowest(*t.children)) # lookup
return tIter(tAny()) # lookup
[docs] def __ror__(self, it): # lookup
d = self.d # lookup
return it | cli.apply(lambda e: d[e], self.col) # lookup
[docs]class dictFields(BaseCli): # dictFields
[docs] def __init__(self, *fields, default=""): # dictFields
"""Grab a bunch of dictionary fields.
Example::
# returns [3, 1, '']
{"a": 1, "b": 2, "c": 3} | dictFields("c", "a", "d")
""" # dictFields
self.fields = fields; self.default = default # dictFields
[docs] def __ror__(self, d): # dictFields
return [d.get(f, self.default) for f in self.fields] # dictFields
[docs]class backup(BaseCli): # backup
[docs] def __init__(self): # backup
"""Backs up a file/folder.
Example::
"some/folderOrFile" | backup()
"some/folderOrFile" | backup.restore()
Really straightforward. Uses bash internally to copy files recursively, so
not available on Windows.""" # backup
pass # backup
[docs] def __ror__(self, it): # backup
it = os.path.expanduser(it) # backup
None | cli.cmd(f"rm -rf '{it}.backup'") | cli.ignore() # backup
None | cli.cmd(f"cp -r '{it}' '{it}.backup'") | cli.ignore() # backup
[docs] @staticmethod # backup
def restore(): # backup
def inner(it): # backup
it = os.path.expanduser(it) # backup
None | cli.cmd(f"rm -rf '{it}'") | cli.ignore() # backup
None | cli.cmd(f"cp -r '{it}.backup' '{it}'") | cli.ignore() # backup
return cli.aS(inner) # backup
[docs]class sketch(BaseCli): # sketch
[docs] def __init__(self, transforms:List[callable]=[], titles:List[str]=None, im:bool=False, ncols:int=None): # sketch
"""Convenience tool to plot multiple matplotlib plots at the same
time, while still keeping everything short and in 1 line. For this example,
we're trying to plot x^1, x^2, ..., x^8 on 2 separate plots, one left one
right. The left will have x^1 till x^4, the right will have x^5 to x^8.
How you would do this normally::
x = np.linspace(-2, 2); exps = range(1, 9)
fig, axes = plt.subplots(1, 2, figsize=(10, 4))
# simplest solution
plt.sca(axes[0]); plt.plot(x, x**1); plt.plot(x, x**2); plt.plot(x, x**3); plt.plot(x, x**4); plt.legend([1, 2, 3, 4]); plt.xlabel("x axis")
# solution using a little bit of cli
plt.sca(axes[1]); range(5, 9) | apply(lambda a: [x, x**a]) | ~apply(plt.plot) | ignore(); plt.legend([5, 6, 7, 8]); plt.xlabel("x axis")
But this is long, and I'm incredibly lazy to write it all out. So here's how
it's going to work using this cli::
# plotting the first 4 lines only, in a single plot. Should be familiar and make sense to you before moving on
exps | apply(lambda a: [x, x**a]) | batched(4) | item() | ~apply(plt.plot) | ignore()
# plotting 8 lines across 2 plots. Simplest example using sketch(). It kinda captures clis after it and use it to plot each plot
exps | apply(lambda a: [x, x**a]) | batched(4) | (sketch() | ~apply(plt.plot))
# same as above, but adding a grid and x axis label to all plots. Transformation functions can be anything you would
# put inside a normal cli (`plt` will be passed as argument): string code, op()-capture, lambda functions, other cli tools
transforms = ["x.grid(True)", op().xlabel("x axis"), lambda x: x.ylabel("y axis")]
exps | apply(lambda a: [x, x**a]) | batched(4) | (sketch(transforms) | ~apply(plt.plot))
# same as above, but adding legends. [x, x**a] will eventually be directed to ~apply(plt.plot), while f"x^{a}" will be directed to aS(plt.legend)
exps | apply(lambda a: [[x, x**a], f"x^{a}"]) | batched(4) | (sketch() | transpose() | ~apply(plt.plot) + iden() | deref() | rItem(1) | aS(plt.legend)) | deref()
Last line will generate this plot:
.. image:: ../images/probScale.png
Is it worth the extra confusion? Afterall, it just saves you 2-3 lines of
code. To me, it is worth it, because you can quickly change styles (add
a grid, make y axis log)
See also: :class:`~k1lib.cli.output.plotImgs`
:param transforms: transform functions to be run when drawing every plot. ``plt`` (aka ``matplotlib.pyplot``) will be passed in
:param titles: if specified, use these titles for each plot. Kinda hacky I have to admit
:param im: if True, returns a PIL image and closes the sketch, else return nothing but still have the sketch open
:param ncols: if specified, will sketch with this number of columns""" # sketch
super().__init__(capture=True); self.titles = titles; self.im = im # sketch
self.transforms = [cli.fastF(t) for t in transforms]; self.ncols = ncols # sketch
[docs] def __ror__(self, it): # sketch
it = list(it); n = len(it); s = self.capturedSerial; transforms = self.transforms # sketch
ncols = self.ncols or math.ceil(n**0.5); nrows = math.ceil(n/ncols) # sketch
fig, axes = plt.subplots(nrows, ncols, figsize=(ncols*5, nrows*4)) # sketch
if axes | cli.shape() | cli.shape(0) > 1: axes = axes.flatten() # sketch
for i, [ax, e, title] in enumerate(zip(axes, it, self.titles or ("" | cli.repeat()))): # sketch
plt.sca(ax); e | s | cli.deref() # sketch
if title: plt.title(title) # sketch
for trans in transforms: trans(plt) # sketch
axes[i+1:] | cli.op().remove().all() | cli.deref(); plt.tight_layout() # sketch
if self.im: return plt.gcf() | cli.toImg() # sketch
import numbers, sys; from collections import deque # sketch
[docs]class syncStepper(BaseCli): # syncStepper
[docs] def __init__(self, col=0, sort=False): # syncStepper
"""Steps forward all streams at a time, yielding same results from min to max.
That's a bit vague, so let's see an example::
a = [["a", 1], ["b", 7 ], ["c", 4], ["e", 6]]
b = [["b", 5], ["c", 1 ], ["d", 3], ["f", 5]]
c = [["a", 2], ["c", -4], ["d", 9], ["e", 4]]
[a, b, c] | syncStepper() | deref() # sync-step by the 1st column
[a, b, c] | syncStepper(1, True) | deref() # sync-step by the 2nd column. Have to sort it explicitly
The first line returns this::
[[['a', 1], None, ['a', 2]],
[['b', 7], ['b', 5], None],
[['c', 4], ['c', 1], ['c', -4]],
[None, ['d', 3], ['d', 9]],
[['e', 6], None, ['e', 4]],
[None, ['f', 5], None]]
The second line returns this::
[[None, None, ['c', -4]],
[['a', 1], ['c', 1], None],
[None, None, ['a', 2]],
[None, ['d', 3], None],
[['c', 4], None, ['e', 4]],
[None, ['b', 5], None],
[['e', 6], None, None],
[['b', 7], None, None],
[None, None, ['d', 9]]]
``col`` can be None, but it's quite a strange use case::
[['a', 'b', 'c', 'e'], ['b', 'c', 'd', 'f'], ['a', 'c', 'd', 'e']] | syncStepper(None) | deref()
It returns this::
[[['a'], None, ['a']],
[['b'], ['b'], None],
[['c'], ['c'], ['c']],
[None, ['d'], ['d']],
[['e'], None, ['e']],
[None, ['f'], None]]
As you can see, for each line, it kinda yields elements with the same column. If
that element doesn't exist, it'll just put None there. This expects the input
streams are sorted at the column of interest. If they are not, specify ``sort=True``.
It has roughly the same vibe as :class:`~k1lib.cli.structural.groupBy`, in that
it groups everything by a specific column. The main difference here is that you
can sync-step them line-by-line, loading very little into memory, so you can run
this on giant datasets and not have to worry about running out of memory.
With k streams each having n elements, you should expect memory complexity to be
O(k), and the time complexity to be O(n*k^2/2). That k^2 term is kinda worrying,
but in most use cases, k is small and so k^2 can be treated as a constant
:param col: column where it should compare values and merge them together. Can be None, but that would be quite a weird use case
:param sort: whether to sort the streams or not. This cli requires it, but it's
not turned on by default because it's an intensive operation""" # syncStepper
if col is None: self.col = 0; self.colPreprocess = cli.wrapList().all() # syncStepper
else: self.col = col; self.colPreprocess = cli.iden() # syncStepper
self.bank = deque(); self.sentinel = object(); self._sort = sort # syncStepper
def _append(self, stIdx1, val1, elem1): # append to bank in the correct position # syncStepper
i = 0; val2 = self.minObj # syncStepper
for i, [stIdx2, val2, elem2] in enumerate(self.bank): # syncStepper
if val1 <= val2: break # syncStepper
if val1 <= val2: self.bank.insert(i, [stIdx1, val1, elem1]) # syncStepper
else: self.bank.append([stIdx1, val1, elem1]) # syncStepper
def _yieldNext(self): # yield the next set of values # syncStepper
n = len(self.sts); res = [None]*n; last = None; hasInit = False; changed = False; bank = self.bank; sentinel = self.sentinel # syncStepper
for i, [stIdx, val, elem] in enumerate(bank): # syncStepper
if not hasInit and elem is sentinel: return res, changed # syncStepper
if last == val or not hasInit: changed = True; res[stIdx] = elem # syncStepper
elif hasInit: break # syncStepper
hasInit = True; last = val # syncStepper
while bank[0][1] == last: # popping the values off # syncStepper
stIdx, val1, elem1 = bank.popleft(); val2, elem2 = next(self.sts[stIdx]) # syncStepper
if val1 > val2: raise Exception(f"Stream {stIdx} has not been sorted yet! Please sort all streams before passing it into syncStepper") # syncStepper
self._append(stIdx, val2, elem2) # syncStepper
return res, changed # syncStepper
[docs] def __ror__(self, sts): # sts = "streams" # syncStepper
col = self.col # syncStepper
# --------------------- All of this is just to figure out the type of the column dynamically. So painful --------------------- # syncStepper
samples, sts = sts | self.colPreprocess.all() | cli.apply(cli.peek()) | cli.transpose() | cli.cut(col) + cli.iden() | cli.apply(list) # syncStepper
if len([e for e in sts if e != []]) == 0: return # no elements to yield at all! # syncStepper
n_nums = sum([1 if isinstance(e, numbers.Number) else 0 for e in samples]) # syncStepper
n_strs = sum([1 if isinstance(e, str) else 0 for e in samples]); n = len(samples) # syncStepper
if n_nums*(n-n_nums) + n_strs*(n-n_strs) > 0: raise Exception("The requested column in some of the streams is not purely of numeric or string type, a requirement of syncStepper(). Please fix your data structure and try again.") # syncStepper
if n_nums + n_strs == 0: raise Exception("The requested column in some of the streams is not of numeric or string type, so can't compare them to sync-step them") # syncStepper
# n = 3; n_strs = 1 # syncStepper
text = n_strs > 0; self.minObj = "" if text else float("-inf"); self.maxObj = chr(sys.maxunicode) if text else float("inf"); senObj = [self.maxObj, self.sentinel] # syncStepper
# --------------------- And here's the meat of the cli --------------------- # syncStepper
sts = sts | (cli.sort(col, not text).all() if self._sort else cli.iden()) | cli.apply(lambda st: [st | cli.apply(lambda elem: [elem[col], elem]), senObj | cli.repeat()] | cli.joinStreams()) | cli.aS(list) # syncStepper
sts | cli.apply(next) | cli.insertIdColumn() | ~cli.apply(lambda idx,e: self._append(idx, *e)) | cli.ignore(); self.sts = sts # syncStepper
while True: # syncStepper
res, changed = self._yieldNext() # syncStepper
if not changed: break # syncStepper
yield res # syncStepper