# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for all short utilities that has the boilerplate feeling. Conversion clis
might feel they have different styles, as :class:`toFloat` converts object iterator to
float iterator, while :class:`toPIL` converts single image url to single PIL image,
whereas :class:`toSum` converts float iterator into a single float value.
The general convention is, if the intended operation sounds simple (convert to floats,
strings, types, ...), then most likely it will convert iterator to iterator, as you
can always use the function directly if you only want to apply it on 1 object.
If it sounds complicated (convert to PIL image, tensor, ...) then most likely it will
convert object to object. Lastly, there are some that just feels right to input
an iterator and output a single object (like getting max, min, std, mean values)."""
from k1lib.cli.init import patchDefaultDelim, BaseCli, Table, T
import k1lib.cli as cli, numbers, torch, numpy as np, dis
from typing import overload, Iterator, Any, List, Set, Union, Callable
import k1lib, time, math, os
__all__ = ["size", "shape", "item", "iden", "join", "wrapList",
"equals", "reverse", "ignore", "rateLimit", "timeLimit", "tab", "indent",
"clipboard", "headerIdx", "deref", "bindec", "smooth", "disassemble",
"tree", "lookup"]
settings = k1lib.settings.cli
def exploreSize(it):
"""Returns first element and length of array"""
if isinstance(it, str): return None, len(it)
sentinel = object(); it = iter(it)
o = next(it, sentinel); count = 1
if o is sentinel: return None, 0
try:
while True: next(it); count += 1
except StopIteration: pass
return o, count
[docs]class size(BaseCli):
[docs] def __init__(self, idx=None):
"""Returns number of rows and columns in the input.
Example::
# returns (3, 2)
[[2, 3], [4, 5, 6], [3]] | size()
# returns 3
[[2, 3], [4, 5, 6], [3]] | size(0)
# returns 2
[[2, 3], [4, 5, 6], [3]] | size(1)
# returns (2, 0)
[[], [2, 3]] | size()
# returns (3,)
[2, 3, 5] | size()
# returns 3
[2, 3, 5] | size(0)
# returns (3, 2, 2)
[[[2, 1], [0, 6, 7]], 3, 5] | size()
# returns (1, 3)
["abc"] | size()
# returns (1, 2, 3)
[torch.randn(2, 3)] | size()
# returns (2, 3, 5)
size()(np.random.randn(2, 3, 5))
There's also :class:`lengths`, which is sort of a simplified/faster version of
this, but only use it if you are sure that ``len(it)`` can be called.
If encounter PyTorch tensors or Numpy arrays, then this will just get the shape
instead of actually looping over them.
:param idx: if idx is None return (rows, columns). If 0 or 1, then rows or
columns"""
super().__init__(); self.idx = idx
[docs] def __ror__(self, it:Iterator[str]):
if self.idx is None:
answer = []
try:
while True:
if isinstance(it, settings.arrayTypes):
return tuple(answer + list(it.shape))
it, s = exploreSize(it); answer.append(s)
except TypeError: pass
return tuple(answer)
else:
return exploreSize(it | cli.item(self.idx))[1]
shape = size
noFill = object()
[docs]class item(BaseCli):
[docs] def __init__(self, amt:int=1, fill=noFill):
"""Returns the first row.
Example::
# returns 0
iter(range(5)) | item()
# returns torch.Size([5])
torch.randn(3,4,5) | item(2) | shape()
# returns 3
[] | item(fill=3)
:param amt: how many times do you want to call item() back to back?
:param fill: if iterator length is 0, return this"""
self.amt = amt; self.fill = fill
self.fillP = [fill] if fill != noFill else [] # preprocessed, to be faster
[docs] def __ror__(self, it:Iterator[str]):
if self.amt != 1:
return it | cli.serial(*(item(fill=self.fill) for _ in range(self.amt)))
return next(iter(it), *self.fillP)
[docs]class iden(BaseCli):
[docs] def __init__(self):
"""Yields whatever the input is. Useful for multiple streams.
Example::
# returns range(5)
range(5) | iden()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[Any]): return it
[docs]class join(BaseCli):
[docs] def __init__(self, delim:str=None):
r"""Merges all strings into 1, with `delim` in the middle. Basically
:meth:`str.join`. Example::
# returns '2\na'
[2, "a"] | join("\n")"""
super().__init__(); self.delim = patchDefaultDelim(delim)
[docs] def __ror__(self, it:Iterator[str]):
return self.delim.join(it | cli.toStr())
[docs]class wrapList(BaseCli):
[docs] def __init__(self):
"""Wraps inputs inside a list. There's a more advanced cli tool
built from this, which is :meth:`~k1lib.cli.structural.unsqueeze`."""
super().__init__()
[docs] def __ror__(self, it:T) -> List[T]: return [it]
class _EarlyExp(Exception): pass
[docs]class equals:
[docs] def __init__(self):
"""Checks if all incoming columns/streams are identical"""
super().__init__()
[docs] def __ror__(self, streams:Iterator[Iterator[str]]):
streams = list(streams)
for row in zip(*streams):
sampleElem = row[0]
try:
for elem in row:
if sampleElem != elem: yield False; raise _EarlyExp()
yield True
except _EarlyExp: pass
[docs]class reverse(BaseCli):
[docs] def __init__(self):
"""Reverses incoming list.
Example::
# returns [3, 5, 2]
[2, 5, 3] | reverse() | deref()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[str]) -> List[str]:
return reversed(list(it))
[docs]class ignore(BaseCli):
[docs] def __init__(self):
r"""Just loops through everything, ignoring the output.
Example::
# will just return an iterator, and not print anything
[2, 3] | apply(lambda x: print(x))
# will prints "2\n3"
[2, 3] | apply(lambda x: print(x)) | ignore()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[Any]):
for _ in it: pass
[docs]class rateLimit(BaseCli):
[docs] def __init__(self, f, delay=0.1):
"""Limits the execution flow rate upon a condition.
Example::
s = 0; semaphore = 0
def heavyAsyncOperation(i):
global semaphore, s
semaphore += 1
s += i; time.sleep(1)
semaphore -= 1; return i**2
# returns (20,), takes 1s to run
range(20) | applyTh(heavyAsyncOperation, 100) | shape()
# returns (20,), takes 4s to run (20/5 = 4)
range(20) | rateLimit(lambda: semaphore < 5) | applyTh(heavyAsyncOperation, 100) | shape()
The first test case is not rate-limited, so it will run all 20 threads at the
same time, and all of them will finish after 1 second.
The second test case is rate-limited, so that there can only be 5 concurrently
executing threads because of the semaphore count check. Therefore this takes
around 4 seconds to run.
:param f: checking function. Should return true if execution is allowed
:param delay: delay in seconds between calling ``f()``"""
self.f = f; self.delay = delay
[docs] def __ror__(self, it):
f = self.f; delay = self.delay
for e in it:
while not f(): time.sleep(delay)
yield e
[docs] @staticmethod
def cpu(maxUtilization=90):
"""Limits flow rate when cpu utilization is more than a specified
percentage amount. Needs to install the package ``psutil`` to actually work.
Example::
# returns [0, 1, 4, 9, 16]
range(5) | rateLimit.cpu() | apply(op()**2) | deref()"""
import psutil
return rateLimit(lambda: psutil.cpu_percent() < maxUtilization)
[docs]class timeLimit(BaseCli):
[docs] def __init__(self, t):
"""Caps the flow after a specified amount of time has
passed. Example::
# returns 20, or roughly close to that
repeatF(lambda: time.sleep(0.1)) | timeLimit(2) | shape(0)"""
self.t = t
[docs] def __ror__(self, it):
_time = time.time; endTime = _time() + self.t
for e in it:
yield e
if _time() > endTime: break
[docs]def tab(pad:str=" "*4):
"""Indents incoming string iterator.
Example::
# prints out indented 0 to 9
range(10) | tab() | headOut()"""
return cli.apply(lambda x: f"{pad}{x}")
indent = tab
[docs]class clipboard(BaseCli):
[docs] def __init__(self):
"""Saves the input to clipboard.
Example::
# copies "abc" into the clipboard. Just use Ctrl+V to paste as usual
"abc" | clipboard()"""
import pyperclip; self.pyperclip = pyperclip
[docs] def __ror__(self, s): self.pyperclip.copy(s)
settings.atomic.add("deref", (numbers.Number, np.number, str, bool, bytes, torch.nn.Module, k1lib.UValue), "used by deref")
Tensor = torch.Tensor; atomic = settings.atomic
class inv_dereference(BaseCli):
def __init__(self, igT=False):
"""Kinda the inverse to :class:`dereference`"""
super().__init__(); self.igT = igT
def __ror__(self, it:Iterator[Any]) -> List[Any]:
for e in it:
if e is None or isinstance(e, atomic.deref): yield e
elif isinstance(e, settings.arrayTypes):
if not self.igT and len(e.shape) == 0: yield e.item()
else: yield e
else:
try: yield e | self
except: yield e
[docs]class deref(BaseCli):
[docs] def __init__(self, maxDepth=float("inf"), igT=True):
"""Recursively converts any iterator into a list. Only :class:`str`,
:class:`numbers.Number` and :class:`~torch.nn.Module` are not converted. Example::
# returns something like "<range_iterator at 0x7fa8c52ca870>"
iter(range(5))
# returns [0, 1, 2, 3, 4]
iter(range(5)) | deref()
# returns [2, 3], yieldSentinel stops things early
[2, 3, yieldSentinel, 6] | deref()
You can also specify a ``maxDepth``::
# returns something like "<list_iterator at 0x7f810cf0fdc0>"
iter([range(3)]) | deref(0)
# returns [range(3)]
iter([range(3)]) | deref(1)
# returns [[0, 1, 2]]
iter([range(3)]) | deref(2)
There are a few classes/types that are considered atomic, and :class:`deref`
will never try to iterate over it. If you wish to change it, do something like::
settings.cli.atomic.deref = (int, float, ...)
:param maxDepth: maximum depth to dereference. Starts at 0 for not doing anything
at all
:param igT: short for "ignore tensor". If True, then don't loop over :class:`torch.Tensor`
and :class:`numpy.ndarray` internals"""
super().__init__(); self.igT = igT
self.maxDepth = maxDepth; self.depth = 0
self.arrayType = (torch.Tensor, np.ndarray) if k1lib.settings.startup.or_patch else torch.Tensor
[docs] def __ror__(self, it:Iterator[T]) -> List[T]:
if self.depth >= self.maxDepth: return it
elif isinstance(it, atomic.deref): return it
elif isinstance(it, self.arrayType):
if self.igT: return it
if len(it.shape) == 0: return it.item()
elif isinstance(it, dict):
self.depth += 1; _d = {k: self.__ror__(v) for k, v in it.items()}; self.depth -= 1; return _d
try: iter(it)
except: return it
self.depth += 1; answer = []
for e in it:
if e is cli.yieldSentinel: return answer
answer.append(self.__ror__(e))
self.depth -= 1; return answer
[docs] def __invert__(self) -> BaseCli:
"""Returns a :class:`~k1lib.cli.init.BaseCli` that makes
everything an iterator. Not entirely sure when this comes in handy, but it's
there."""
return inv_dereference(self.igT)
[docs]class bindec(BaseCli):
[docs] def __init__(self, cats:List[Any], f=None):
"""Binary decodes the input.
Example::
# returns ['a', 'c']
5 | bindec("abcdef")
# returns 'a,c'
5 | bindec("abcdef", join(","))
:param cats: categories
:param f: transformation function of the selected elements. Defaulted to :class:`toList`, but others like :class:`join` is useful too"""
self.cats = cats; self.f = f or cli.toList()
[docs] def __ror__(self, it):
it = bin(int(it))[2:][::-1]
return (e for i, e in zip(it, self.cats) if i == '1') | self.f
settings.add("smooth", 10, "default smooth amount, used in utils.smooth")
[docs]class smooth(BaseCli):
[docs] def __init__(self, consecutives=None):
"""Smoothes out the input stream.
Literally just a shortcut for::
batched(consecutives) | toMean().all()
Example::
# returns [4.5, 14.5, 24.5]
range(30) | smooth(10) | deref()
Smoothing over :class:`torch.Tensor` or :class:`numpy.ndarray` will
be much faster, and produce high dimensional results::
# returns torch.Tensor with shape (2, 3, 4)
torch.randn(10, 3, 4) | smooth(4)
The default consecutive value is in ``settings.cli.smooth``. This
is useful if you are smoothing over multiple lists at the same
time, like this::
# can change a single smooth value temporarily here, and all sequences will be smoothed in the same way
with settings.cli.context(smooth=5):
x = list(np.linspace(-2, 2, 50))
y = x | apply(op()**2) | deref()
plt.plot(x | smooth() | deref(), y | smooth() | deref())
:param consecutives: if not defined, then used the value inside ``settings.cli.smooth``"""
self.b = cli.batched(consecutives or settings.smooth)
[docs] def __ror__(self, it):
it = it | self.b
if isinstance(it, settings.arrayTypes): return it.mean(1)
return it | cli.toMean().all()
def _f(): pass
_code = type(_f.__code__)
[docs]def disassemble(f=None):
"""Disassembles anything piped into it.
Normal usage::
def f(a, b):
return a**2 + b
# both of these print out disassembled info
f | disassemble()
disassemble(f)
# you can pass in lambdas
disassemble(lambda x: x + 3)
# or even raw code
"lambda x: x + 3" | disassemble()"""
c = f
if c is None: return cli.aS(disassemble)
if isinstance(c, str): c = compile(c, "", "exec")
try: c = c.__code__
except: pass
if not isinstance(c, _code): raise RuntimeError(f"`{c}` is not a code object/function/class method/string code")
print(f"co_argcount: {c.co_argcount}")
print(f"co_cellvars: {c.co_cellvars}")
print(f"co_consts: {c.co_consts}")
print(f"co_filename: {c.co_filename}")
print(f"co_firstlineno: {c.co_firstlineno}")
print(f"co_flags: {c.co_flags}")
print(f"co_freevars: {c.co_freevars}")
print(f"co_kwonlyargcount: {c.co_kwonlyargcount}")
print(f"co_lnotab: {c.co_lnotab | cli.toStr() | join(' ')}")
print(f"co_name: {c.co_name}")
print(f"co_names: {c.co_names}")
print(f"co_nlocals: {c.co_nlocals}")
print(f"co_posonlyargcount: {c.co_posonlyargcount}")
print(f"co_stacksize: {c.co_stacksize}")
print(f"co_varnames: {c.co_varnames}")
print(f"Disassembly:"); dis.disassemble(c)
with k1lib.captureStdout() as out:
c.co_consts | cli.filt(lambda x: "code" in str(type(x))) | cli.tee(lambda _: "----------------------- inner code object -----------------------\n") | cli.apply(disassemble) | cli.ignore()
out() | cli.filt(cli.op().strip() != "") | cli.apply("|" + cli.op()) | cli.indent() | cli.stdout()
shortName = lambda s: s.split(os.sep)[-1]
[docs]def tree(fL=10, dL=10, depth=float("inf"), ff:Callable[[str], bool]=(lambda s: True), df:Callable[[str], bool]=(lambda s: True)):
"""Recursively gets all files and folders. Output format might be a bit
strange, so this is mainly for visualization. Example::
"." | tree() | deref()
:param fL: max number of file per directory included in output
:param dL: max number of child directories per directory included in output
:param depth: explore depth
:param ff: optional file filter function
:param df: optional directory filter function"""
processFolders = cli.apply(lambda x: [shortName(x), x]) | cli.apply(lambda x: x | tree(fL, dL, depth-1, ff, df) if depth > 0 else [], 1) | cli.transpose() | cli.toDict()
return cli.ls() | ~cli.sortF(os.path.isfile) | ((cli.filt(os.path.isfile) | cli.filt(ff) | cli.head(fL) | cli.apply(shortName)) & (~cli.filt(os.path.isfile) | cli.filt(df) | cli.head(dL) | processFolders))
[docs]class lookup(BaseCli):
[docs] def __init__(self, d:dict, col:int=None):
"""Looks up items from a dictionary/object. Example::
d = {"a": 3, "b": 5, "c": 52}
# returns [3, 5, 52, 52, 3]
"abcca" | lookup(d) | deref()
# returns [[0, 3], [1, 5], [2, 52], [3, 52], [4, 3]]
[range(5), "abcca"] | transpose() | lookup(d, 1) | deref()
:param d: any object that can be sliced with the inputs
:param col: if None, lookup on each row, else lookup a specific
column only"""
self.d = d; self.col = col
[docs] def __ror__(self, it):
d = self.d
return it | cli.apply(lambda e: d[e], self.col)