# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for all short and random quality-of-life utilities."""
from k1lib.cli.init import patchDefaultDelim, BaseCli, Table, T, yieldT
import k1lib.cli as cli, numbers, numpy as np, dis
from k1lib.cli.typehint import *
from typing import overload, Iterator, Any, List, Set, Union, Callable
import k1lib, time, math, os
from collections import defaultdict
try: import torch; hasTorch = True
except: torch = k1lib.Object().withAutoDeclare(lambda: type("RandomClass", (object, ), {})); hasTorch = False
try: import PIL; hasPIL = True
except: hasPIL = False
__all__ = ["size", "shape", "item", "rItem", "iden", "join", "wrapList",
"equals", "reverse", "ignore", "rateLimit", "timeLimit", "tab", "indent",
"clipboard", "deref", "bindec", "smooth", "disassemble",
"tree", "lookup", "dictFields", "backup"]
settings = k1lib.settings.cli
def exploreSize(it):
"""Returns first element and length of array. Returns [first item, length]"""
if isinstance(it, str): return None, len(it)
try: return it[0], len(it)
except: pass
sentinel = object(); it = iter(it)
o = next(it, sentinel); count = 1
if o is sentinel: return None, 0
try:
while True: next(it); count += 1
except StopIteration: pass
return o, count
[docs]class size(BaseCli):
[docs] def __init__(self, idx=None):
"""Returns number of rows and columns in the input.
Example::
# returns (3, 2)
[[2, 3], [4, 5, 6], [3]] | shape()
# returns 3
[[2, 3], [4, 5, 6], [3]] | shape(0)
# returns 2
[[2, 3], [4, 5, 6], [3]] | shape(1)
# returns (2, 0)
[[], [2, 3]] | shape()
# returns (3,)
[2, 3, 5] | shape()
# returns 3
[2, 3, 5] | shape(0)
# returns (3, 2, 2)
[[[2, 1], [0, 6, 7]], 3, 5] | shape()
# returns (1, 3)
["abc"] | shape()
# returns (1, 2, 3)
[torch.randn(2, 3)] | shape()
# returns (2, 3, 5)
shape()(np.random.randn(2, 3, 5))
:class:`shape` is an alias of this cli. Use whichever is more intuitive for you.
There's also :class:`lengths`, which is sort of a simplified/faster version of
this, but only use it if you are sure that ``len(it)`` can be called.
:param idx: if not specified, returns a tuple of ints. If specified,
then returns the specific index of the tuple"""
super().__init__(); self.idx = idx;
if idx is not None: self._f = cli.item(idx)
def _typehint(self, inp):
if self.idx is not None: return int
return tList(int)
[docs] def __ror__(self, it:Iterator[str]):
idx = self.idx
if idx == 0:
try: return len(it)
except: return exploreSize(it)[1]
if hasPIL and isinstance(it, PIL.Image.Image):
return it.size if idx is None else it.size[idx]
if idx is None:
answer = []
try:
while True:
if isinstance(it, settings.arrayTypes):
return tuple(answer + list(it.shape))
it, s = exploreSize(it); answer.append(s)
except TypeError: pass
return tuple(answer)
return exploreSize(it | self._f)[1]
shape = size
noFill = object()
[docs]class item(BaseCli):
[docs] def __init__(self, amt:int=1, fill=noFill):
"""Returns the first element of the input iterator.
Example::
# returns 0
range(5) | item()
# returns torch.Size([5])
torch.randn(3,4,5) | item(2) | shape()
# returns 3
[] | item(fill=3)
:param amt: how many times do you want to call item() back to back?
:param fill: if iterator length is 0, return this"""
self.amt = amt; self.fill = fill
self.fillP = [fill] if fill != noFill else [] # preprocessed, to be faster
if self.amt != 1: self._f = cli.serial(*(item(fill=self.fill) for _ in range(self.amt)))
def _typehint(self, inp):
if isinstance(inp, tListIterSet): return inp.child
if isinstance(inp, tCollection): return inp.children[0]
if isinstance(inp, tArrayTypes):
if inp.rank is None: return inp.__class__(inp.child, None)
if inp.rank - self.amt >= 1: return inp.__class__(inp.child, inp.rank-self.amt)
return inp.child
return tAny()
[docs] def __ror__(self, it:Iterator[str]):
if self.amt != 1: return it | self._f
return next(iter(it), *self.fillP)
[docs]def rItem(idx: int):
"""Combines ``rows(idx) | item()``, as this is a pretty common pattern.
Example::
iter(range(10)) | rItem(4) # returns 4
"""
return cli.rows(idx) | item()
[docs]class iden(BaseCli):
[docs] def __init__(self):
"""Yields whatever the input is. Useful for multiple streams.
Example::
# returns range(5)
range(5) | iden()"""
super().__init__()
def _typehint(self, inp): return inp
[docs] def __ror__(self, it:Iterator[Any]): return it
[docs]class join(BaseCli):
[docs] def __init__(self, delim:str=None):
r"""Merges all strings into 1, with `delim` in the middle. Basically
:meth:`str.join`. Example::
# returns '2\na'
[2, "a"] | join("\n")"""
super().__init__(); self.delim = patchDefaultDelim(delim)
def _typehint(self, inp): return str
[docs] def __ror__(self, it:Iterator[str]):
return self.delim.join(it | cli.apply(str))
[docs]class wrapList(BaseCli):
[docs] def __init__(self):
"""Wraps inputs inside a list. There's a more advanced cli tool
built from this, which is :meth:`~k1lib.cli.structural.unsqueeze`. Example::
# returns [5]
5 | wrapList()"""
super().__init__()
def _typehint(self, inp): return tList(inp)
[docs] def __ror__(self, it:T) -> List[T]: return [it]
class _EarlyExp(Exception): pass
[docs]class equals:
[docs] def __init__(self):
"""Checks if all incoming columns/streams are identical"""
super().__init__()
[docs] def __ror__(self, streams:Iterator[Iterator[str]]):
streams = list(streams)
for row in zip(*streams):
sampleElem = row[0]
try:
for elem in row:
if sampleElem != elem: yield False; raise _EarlyExp()
yield True
except _EarlyExp: pass
[docs]class reverse(BaseCli):
[docs] def __init__(self):
"""Reverses incoming list.
Example::
# returns [3, 5, 2]
[2, 5, 3] | reverse() | deref()"""
super().__init__()
def _typehint(self, inp):
if isinstance(inp, tListIterSet): return tIter(inp.child)
return tAny()
[docs] def __ror__(self, it:Iterator[str]) -> List[str]:
return reversed(list(it))
[docs]class ignore(BaseCli):
[docs] def __init__(self):
r"""Just loops through everything, ignoring the output.
Example::
# will just return an iterator, and not print anything
[2, 3] | apply(lambda x: print(x))
# will prints "2\n3"
[2, 3] | apply(lambda x: print(x)) | ignore()"""
super().__init__()
def _typehint(self, inp): return type(None)
[docs] def __ror__(self, it:Iterator[Any]):
for _ in it: pass
[docs]class rateLimit(BaseCli):
[docs] def __init__(self, f, delay=0.1):
"""Limits the execution flow rate upon a condition.
Example::
s = 0; semaphore = 0
def heavyAsyncOperation(i):
global semaphore, s
semaphore += 1
s += i; time.sleep(1)
semaphore -= 1; return i**2
# returns (20,), takes 1s to run
range(20) | applyTh(heavyAsyncOperation, 100) | shape()
# returns (20,), takes 4s to run (20/5 = 4)
range(20) | rateLimit(lambda: semaphore < 5) | applyTh(heavyAsyncOperation, 100) | shape()
The first test case is not rate-limited, so it will run all 20 threads at the
same time, and all of them will finish after 1 second.
The second test case is rate-limited, so that there can only be 5 concurrently
executing threads because of the semaphore count check. Therefore this takes
around 4 seconds to run.
:param f: checking function. Should return true if execution is allowed
:param delay: delay in seconds between calling ``f()``"""
self.f = f; self.delay = delay
def _typehint(self, inp):
if isinstance(inp, tListIterSet): return tIter(inp.child)
if isinstance(inp, tArrayTypes):
if inp.rank is None: return tIter(inp)
if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1))
return tIter(inp.child)
if isinstance(inp, tCollection): return inp
return tAny()
[docs] def __ror__(self, it):
f = self.f; delay = self.delay
for e in it:
while not f(): time.sleep(delay)
yield e
[docs] @staticmethod
def cpu(maxUtilization=90):
"""Limits flow rate when cpu utilization is more than a specified
percentage amount. Needs to install the package ``psutil`` to actually work.
Example::
# returns [0, 1, 4, 9, 16]
range(5) | rateLimit.cpu() | apply(op()**2) | deref()"""
import psutil
return rateLimit(lambda: psutil.cpu_percent() < maxUtilization)
[docs]class timeLimit(BaseCli):
[docs] def __init__(self, t):
"""Caps the flow after a specified amount of time has
passed. Example::
# returns 20, or roughly close to that
repeatF(lambda: time.sleep(0.1)) | timeLimit(2) | shape(0)"""
self.t = t
def _typehint(self, inp):
if isinstance(inp, tListIterSet): return tIter(inp.child)
if isinstance(inp, tArrayTypes):
if inp.rank is None: return tIter(inp)
if inp.rank >= 2: return tIter(inp.__class__(inp.child, inp.rank - 1))
return tIter(inp.child)
if isinstance(inp, tCollection): return inp
return tAny()
[docs] def __ror__(self, it):
_time = time.time; endTime = _time() + self.t
for e in it:
yield e
if _time() > endTime: break
[docs]def tab(pad:str=" "*4):
"""Indents incoming string iterator.
Example::
# prints out indented 0 to 9
range(10) | tab() | headOut()"""
return cli.apply(lambda x: f"{pad}{x}")
indent = tab
[docs]class clipboard(BaseCli):
[docs] def __init__(self):
"""Saves the input to clipboard.
Example::
# copies "abc" into the clipboard. Just use Ctrl+V to paste as usual
"abc" | clipboard()"""
import pyperclip; self.pyperclip = pyperclip
def _typehint(self, inp): return type(None)
[docs] def __ror__(self, s): self.pyperclip.copy(s)
a = [numbers.Number, np.number, str, bool, bytes, k1lib.UValue]
if hasTorch: a.append(torch.nn.Module)
settings.atomic.add("deref", tuple(a), "used by deref")
Tensor = torch.Tensor; atomic = settings.atomic
class inv_dereference(BaseCli):
def __init__(self, igT=False):
"""Kinda the inverse to :class:`dereference`"""
super().__init__(); self.igT = igT
def __ror__(self, it:Iterator[Any]) -> List[Any]:
for e in it:
if e is None or isinstance(e, atomic.deref): yield e
elif isinstance(e, settings.arrayTypes):
if not self.igT and len(e.shape) == 0: yield e.item()
else: yield e
else:
try: yield e | self
except: yield e
[docs]class deref(BaseCli):
[docs] def __init__(self, maxDepth=float("inf"), igT=True):
"""Recursively converts any iterator into a list.
Example::
# returns something like "<range_iterator at 0x7fa8c52ca870>"
iter(range(5))
# returns [0, 1, 2, 3, 4]
iter(range(5)) | deref()
# returns [2, 3], yieldT stops things early
[2, 3, yieldT, 6] | deref()
You can also specify a ``maxDepth``::
# returns something like "<list_iterator at 0x7f810cf0fdc0>"
iter([range(3)]) | deref(0)
# returns [range(3)]
iter([range(3)]) | deref(1)
# returns [[0, 1, 2]]
iter([range(3)]) | deref(2)
There are a few classes/types that are considered atomic, and :class:`deref`
will never try to iterate over it. If you wish to change it, do something like::
settings.cli.atomic.deref = (int, float, ...)
:param maxDepth: maximum depth to dereference. Starts at 0 for not doing anything
at all
:param igT: short for "ignore tensor". If True, then don't loop over :class:`torch.Tensor`
and :class:`numpy.ndarray` internals"""
super().__init__(); self.igT = igT
self.maxDepth = maxDepth; self.depth = 0
if hasTorch:
self.arrayType = (torch.Tensor, np.ndarray) if k1lib.settings.startup.or_patch.numpy else torch.Tensor
else: self.arrayType = (np.ndarray,) if k1lib.settings.startup.or_patch.numpy else ()
def _typehint(self, inp, depth=float("inf")):
if depth == 0: return inp
if depth == float("inf"): depth = self.maxDepth
if isinstance(inp, type) and issubclass(inp, atomic.deref): return inp
if isinstance(inp, tArrayTypes):
if self.igT: return inp
if inp.rank is None: return tList(tAny())
if inp.rank == 1:
if isinstance(inp, tTensor):
return tList(type(torch.tensor(3, dtype=inp.child).item()))
if isinstance(inp, tNpArray):
return tList(type(np.array(3, dtype=inp.child).item()))
return tList(self._typehint(inp.item(), depth-1))
if isinstance(inp, tListIterSet):
return tList(self._typehint(inp.child, depth-1))
if isinstance(inp, tCollection):
return tCollection(*(self._typehint(e, depth-1) for e in inp.children))
return tAny()
[docs] def __ror__(self, it:Iterator[T]) -> List[T]:
if self.depth >= self.maxDepth: return it
elif isinstance(it, np.number): return it.item()
elif isinstance(it, atomic.deref): return it
elif isinstance(it, self.arrayType):
if self.igT: return it
if len(it.shape) == 0: return it.item()
elif isinstance(it, dict):
self.depth += 1; _d = {k: self.__ror__(v) for k, v in it.items()}; self.depth -= 1; return _d
try: iter(it)
except: return it
self.depth += 1; answer = []
for e in it:
if e is cli.yieldT: return answer
answer.append(self.__ror__(e))
self.depth -= 1; return answer
[docs] def __invert__(self) -> BaseCli:
"""Returns a :class:`~k1lib.cli.init.BaseCli` that makes
everything an iterator. Not entirely sure when this comes in handy, but it's
there."""
return inv_dereference(self.igT)
[docs]class bindec(BaseCli):
[docs] def __init__(self, cats:List[Any], f=None):
"""Binary decodes the input.
Example::
# returns ['a', 'c']
5 | bindec("abcdef")
# returns 'a,c'
5 | bindec("abcdef", join(","))
:param cats: categories
:param f: transformation function of the selected elements. Defaulted to :class:`toList`, but others like :class:`join` is useful too"""
self.cats = cats; self.f = f or cli.toList()
[docs] def __ror__(self, it):
it = bin(int(it))[2:][::-1]
return (e for i, e in zip(it, self.cats) if i == '1') | self.f
settings.add("smooth", 10, "default smooth amount, used in utils.smooth")
[docs]class smooth(BaseCli):
[docs] def __init__(self, consecutives=None):
"""Smoothes out the input stream.
Literally just a shortcut for::
batched(consecutives) | toMean().all()
Example::
# returns [4.5, 14.5, 24.5]
range(30) | smooth(10) | deref()
Smoothing over :class:`torch.Tensor` or :class:`numpy.ndarray` will
be much faster, and produce high dimensional results::
# returns torch.Tensor with shape (2, 3, 4)
torch.randn(10, 3, 4) | smooth(4)
The default consecutive value is in ``settings.cli.smooth``. This
is useful if you are smoothing over multiple lists at the same
time, like this::
# can change a single smooth value temporarily here, and all sequences will be smoothed in the same way
with settings.cli.context(smooth=5):
x = list(np.linspace(-2, 2, 50))
y = x | apply(op()**2) | deref()
plt.plot(x | smooth() | deref(), y | smooth() | deref())
:param consecutives: if not defined, then used the value inside ``settings.cli.smooth``"""
self.b = cli.batched(consecutives or settings.smooth)
[docs] def __ror__(self, it):
it = it | self.b
if isinstance(it, settings.arrayTypes): return it.mean(1)
return it | cli.toMean().all()
def _f(): pass
_code = type(_f.__code__)
[docs]def disassemble(f=None):
"""Disassembles anything piped into it.
Normal usage::
def f(a, b):
return a**2 + b
# both of these print out disassembled info
f | disassemble()
disassemble(f)
# you can pass in lambdas
disassemble(lambda x: x + 3)
# or even raw code
"lambda x: x + 3" | disassemble()"""
c = f
if c is None: return cli.aS(disassemble)
if isinstance(c, str): c = compile(c, "", "exec")
try: c = c.__code__
except: pass
if not isinstance(c, _code): raise RuntimeError(f"`{c}` is not a code object/function/class method/string code")
print(f"co_argcount: {c.co_argcount}")
print(f"co_cellvars: {c.co_cellvars}")
print(f"co_consts: {c.co_consts}")
print(f"co_filename: {c.co_filename}")
print(f"co_firstlineno: {c.co_firstlineno}")
print(f"co_flags: {c.co_flags}")
print(f"co_freevars: {c.co_freevars}")
print(f"co_kwonlyargcount: {c.co_kwonlyargcount}")
print(f"co_lnotab: {c.co_lnotab | cli.apply(str) | join(' ')}")
print(f"co_name: {c.co_name}")
print(f"co_names: {c.co_names}")
print(f"co_nlocals: {c.co_nlocals}")
print(f"co_posonlyargcount: {c.co_posonlyargcount}")
print(f"co_stacksize: {c.co_stacksize}")
print(f"co_varnames: {c.co_varnames}")
print(f"Disassembly:"); dis.disassemble(c)
with k1lib.captureStdout() as out:
c.co_consts | cli.filt(lambda x: "code" in str(type(x))) | cli.tee(lambda _: "----------------------- inner code object -----------------------\n") | cli.apply(disassemble) | cli.ignore()
out() | cli.filt(cli.op().strip() != "") | cli.apply("|" + cli.op()) | cli.indent() | cli.stdout()
shortName = lambda s: s.split(os.sep)[-1]
[docs]def tree(fL=10, dL=10, depth=float("inf"), ff:Callable[[str], bool]=(lambda s: True), df:Callable[[str], bool]=(lambda s: True)):
"""Recursively gets all files and folders. Output format might be a bit
strange, so this is mainly for visualization. Example::
"." | tree() | deref()
:param fL: max number of file per directory included in output
:param dL: max number of child directories per directory included in output
:param depth: explore depth
:param ff: optional file filter function
:param df: optional directory filter function"""
processFolders = cli.apply(lambda x: [shortName(x), x]) | cli.apply(lambda x: x | tree(fL, dL, depth-1, ff, df) if depth > 0 else [], 1) | cli.toDict()
a = cli.filt(os.path.isfile) | cli.filt(ff) | cli.head(fL) | cli.apply(shortName) | cli.aS(set)
b = ~cli.filt(os.path.isfile) | cli.filt(df) | cli.head(dL) | processFolders
return cli.ls() | ~cli.sortF(os.path.isfile) | (a & b)
[docs]class lookup(BaseCli):
[docs] def __init__(self, d:dict, col:int=None, fill=None):
"""Looks up items from a dictionary/object. Example::
d = {"a": 3, "b": 5, "c": 52}
# returns [3, 5, 52, 52, 3]
"abcca" | lookup(d) | deref()
# returns [[0, 3], [1, 5], [2, 52], [3, 52], [4, 3]]
[range(5), "abcca"] | transpose() | lookup(d, 1) | deref()
:param d: any object that can be sliced with the inputs
:param col: if None, lookup on each row, else lookup a specific
column only
:param fill: if None, throws error if looked up element is not
available, else returns the fill value"""
self.d = d; self.col = col
if fill is not None: self.d = defaultdict(lambda: fill, self.d)
def _typehint(self, inp):
t = inferType(list(self.d.values()))
if isinstance(t, tListIterSet): return tIter(t.child)
if isinstance(t, tCollection): return tIter(tLowest(*t.children))
return tIter(tAny())
[docs] def __ror__(self, it):
d = self.d
return it | cli.apply(lambda e: d[e], self.col)
[docs]class dictFields(BaseCli):
[docs] def __init__(self, *fields, default=""):
"""Grab a bunch of dictionary fields.
Example::
# returns [3, 1, '']
{"a": 1, "b": 2, "c": 3} | dictFields("c", "a", "d")
"""
self.fields = fields; self.default = default
[docs] def __ror__(self, d):
return [d.get(f, self.default) for f in self.fields]
[docs]class backup(BaseCli):
[docs] def __init__(self):
"""Backs up a file/folder.
Example::
"some/folderOrFile" | backup()
"some/folderOrFile" | backup.restore()
Really straightforward. Uses bash internally to copy files recursively, so
not available on Windows."""
pass
[docs] def __ror__(self, it):
it = os.path.expanduser(it)
None | cli.cmd(f"rm -rf '{it}.backup'") | cli.ignore()
None | cli.cmd(f"cp -r '{it}' '{it}.backup'") | cli.ignore()
[docs] @staticmethod
def restore():
def inner(it):
it = os.path.expanduser(it)
None | cli.cmd(f"rm -rf '{it}'") | cli.ignore()
None | cli.cmd(f"cp -r '{it}.backup' '{it}'") | cli.ignore()
return cli.aS(inner)