# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for all short utilities that has the boilerplate feeling. Conversion clis
might feel they have different styles, as :class:`toFloat` converts object iterator to
float iterator, while :class:`toPIL` converts single image url to single PIL image,
whereas :class:`toSum` converts float iterator into a single float value.
The general convention is, if the intended operation sounds simple (convert to floats,
strings, types, ...), then most likely it will convert iterator to iterator, as you
can always use the function directly if you only want to apply it on 1 object.
If it sounds complicated (convert to PIL image, tensor, ...) then most likely it will
convert object to object. Lastly, there are some that just feels right to input
an iterator and output a single object (like getting max, min, std, mean values)."""
from k1lib.cli.init import patchDefaultDelim, BaseCli, Table, T
import k1lib.cli as cli, numbers, torch, numpy as np, dis
from typing import overload, Iterator, Any, List, Set, Union
import k1lib, time, math
__all__ = ["size", "shape", "item", "identity", "iden",
"toStr", "join", "toNumpy", "toTensor",
"toList", "wrapList", "toSet", "toIter", "toRange", "toType",
"equals", "reverse", "ignore", "rateLimit", "tab", "indent",
"toSum", "toProd", "toAvg", "toMean", "toMax", "toMin", "toPIL",
"toImg", "toRgb", "toRgba", "toBin", "toIdx", "clipboard",
"lengths", "headerIdx", "deref", "bindec", "smooth", "disassemble"]
settings = k1lib.settings.cli
def exploreSize(it):
"""Returns first element and length of array"""
if isinstance(it, str): return None, len(it)
sentinel = object(); it = iter(it)
o = next(it, sentinel); count = 1
if o is sentinel: return None, 0
try:
while True: next(it); count += 1
except StopIteration: pass
return o, count
[docs]class size(BaseCli):
[docs] def __init__(self, idx=None):
"""Returns number of rows and columns in the input.
Example::
# returns (3, 2)
[[2, 3], [4, 5, 6], [3]] | size()
# returns 3
[[2, 3], [4, 5, 6], [3]] | size(0)
# returns 2
[[2, 3], [4, 5, 6], [3]] | size(1)
# returns (2, 0)
[[], [2, 3]] | size()
# returns (3,)
[2, 3, 5] | size()
# returns 3
[2, 3, 5] | size(0)
# returns (3, 2, 2)
[[[2, 1], [0, 6, 7]], 3, 5] | size()
# returns (1, 3)
["abc"] | size()
# returns (1, 2, 3)
[torch.randn(2, 3)] | size()
# returns (2, 3, 5)
size()(np.random.randn(2, 3, 5))
There's also :class:`lengths`, which is sort of a simplified/faster version of
this, but only use it if you are sure that ``len(it)`` can be called.
If encounter PyTorch tensors or Numpy arrays, then this will just get the shape
instead of actually looping over them.
:param idx: if idx is None return (rows, columns). If 0 or 1, then rows or
columns"""
super().__init__(); self.idx = idx
[docs] def __ror__(self, it:Iterator[str]):
if self.idx is None:
answer = []
try:
while True:
if isinstance(it, settings.arrayTypes):
return tuple(answer + list(it.shape))
it, s = exploreSize(it); answer.append(s)
except TypeError: pass
return tuple(answer)
else:
return exploreSize(it | cli.item(self.idx))[1]
shape = size
noFill = object()
[docs]class item(BaseCli):
[docs] def __init__(self, amt:int=1, fill=noFill):
"""Returns the first row.
Example::
# returns 0
iter(range(5)) | item()
# returns torch.Size([5])
torch.randn(3,4,5) | item(2) | shape()
# returns 3
[] | item(fill=3)
:param amt: how many times do you want to call item() back to back?
:param fill: if iterator length is 0, return this"""
self.amt = amt; self.fill = fill
self.fillP = [fill] if fill != noFill else [] # preprocessed, to be faster
[docs] def __ror__(self, it:Iterator[str]):
if self.amt != 1:
return it | cli.serial(*(item(fill=self.fill) for _ in range(self.amt)))
return next(iter(it), *self.fillP)
[docs]class identity(BaseCli):
[docs] def __init__(self):
"""Yields whatever the input is. Useful for multiple streams.
Example::
# returns range(5)
range(5) | identity()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[Any]): return it
iden = identity
[docs]class toStr(BaseCli):
[docs] def __init__(self, column:int=None):
"""Converts every line to a string.
Example::
# returns ['2', 'a']
[2, "a"] | toStr() | deref()
# returns [[2, 'a'], [3, '5']]
assert [[2, "a"], [3, 5]] | toStr(1) | deref()"""
super().__init__(); self.column = column
[docs] def __ror__(self, it:Iterator[str]):
c = self.column
if c is None:
for line in it: yield str(line)
else:
for row in it:
yield [e if i != c else str(e) for i, e in enumerate(row)]
[docs]class join(BaseCli):
[docs] def __init__(self, delim:str=None):
r"""Merges all strings into 1, with `delim` in the middle. Basically
:meth:`str.join`. Example::
# returns '2\na'
[2, "a"] | join("\n")"""
super().__init__(); self.delim = patchDefaultDelim(delim)
[docs] def __ror__(self, it:Iterator[str]):
return self.delim.join(it | toStr())
[docs]class toNumpy(BaseCli):
[docs] def __init__(self):
"""Converts generator to numpy array. Essentially ``np.array(list(it))``"""
super().__init__()
[docs] def __ror__(self, it:Iterator[float]) -> np.array:
return np.array(list(it))
[docs]class toTensor(BaseCli):
[docs] def __init__(self, dtype=torch.float32):
"""Converts generator to :class:`torch.Tensor`. Essentially
``torch.tensor(list(it))``.
Also checks if input is a PIL Image. If yes, turn it into a :class:`torch.Tensor`
and return."""
self.dtype = dtype
[docs] def __ror__(self, it:Iterator[float]) -> torch.Tensor:
try:
import PIL; pic=it
if isinstance(pic, PIL.Image.Image): # stolen from torchvision ToTensor transform
mode_to_nptype = {'I': np.int32, 'I;16': np.int16, 'F': np.float32}
img = torch.from_numpy(np.array(pic, mode_to_nptype.get(pic.mode, np.uint8), copy=True))
if pic.mode == '1': img = 255 * img
img = img.view(pic.size[1], pic.size[0], len(pic.getbands()))
return img.permute((2, 0, 1)).contiguous().to(self.dtype) # put it from HWC to CHW format
except: pass
return torch.tensor(list(it)).to(self.dtype)
[docs]class toList(BaseCli):
[docs] def __init__(self):
"""Converts generator to list. :class:`list` would do the
same, but this is just to maintain the style"""
super().__init__()
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: return list(it)
[docs]class wrapList(BaseCli):
[docs] def __init__(self):
"""Wraps inputs inside a list. There's a more advanced cli tool
built from this, which is :meth:`~k1lib.cli.structural.unsqueeze`."""
super().__init__()
[docs] def __ror__(self, it:T) -> List[T]: return [it]
[docs]class toSet(BaseCli):
[docs] def __init__(self):
"""Converts generator to set. :class:`set` would do the
same, but this is just to maintain the style"""
super().__init__()
[docs] def __ror__(self, it:Iterator[T]) -> Set[T]: return set(it)
[docs]class toIter(BaseCli):
[docs] def __init__(self):
"""Converts object to iterator. `iter()` would do the
same, but this is just to maintain the style"""
super().__init__()
[docs] def __ror__(self, it:List[T]) -> Iterator[T]: return iter(it)
[docs]class toRange(BaseCli):
[docs] def __init__(self):
"""Returns iter(range(len(it))), effectively"""
super().__init__()
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[int]:
for i, _ in enumerate(it): yield i
[docs]class toType(BaseCli):
[docs] def __init__(self):
"""Converts object to its type.
Example::
# returns [int, float, str, torch.Tensor]
[2, 3.5, "ah", torch.randn(2, 3)] | toType() | deref()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[type]:
for e in it: yield type(e)
class _EarlyExp(Exception): pass
[docs]class equals:
[docs] def __init__(self):
"""Checks if all incoming columns/streams are identical"""
super().__init__()
[docs] def __ror__(self, streams:Iterator[Iterator[str]]):
streams = list(streams)
for row in zip(*streams):
sampleElem = row[0]
try:
for elem in row:
if sampleElem != elem: yield False; raise _EarlyExp()
yield True
except _EarlyExp: pass
[docs]class reverse(BaseCli):
[docs] def __init__(self):
"""Reverses incoming list.
Example::
# returns [3, 5, 2]
[2, 5, 3] | reverse() | deref()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[str]) -> List[str]:
return reversed(list(it))
[docs]class ignore(BaseCli):
[docs] def __init__(self):
r"""Just loops through everything, ignoring the output.
Example::
# will just return an iterator, and not print anything
[2, 3] | apply(lambda x: print(x))
# will prints "2\n3"
[2, 3] | apply(lambda x: print(x)) | ignore()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[Any]):
for _ in it: pass
[docs]class rateLimit(BaseCli):
[docs] def __init__(self, f, delay=0.1):
"""Limits the execution flow rate upon a condition.
Example::
s = 0; semaphore = 0
def heavyAsyncOperation(i):
global semaphore, s
semaphore += 1
s += i; time.sleep(1)
semaphore -= 1; return i**2
# returns (20,), takes 1s to run
range(20) | applyTh(heavyAsyncOperation, 100) | shape()
# returns (20,), takes 4s to run (20/5 = 4)
range(20) | rateLimit(lambda: semaphore < 5) | applyTh(heavyAsyncOperation, 100) | shape()
The first test case is not rate-limited, so it will run all 20 threads at the
same time, and all of them will finish after 1 second.
The second test case is rate-limited, so that there can only be 5 concurrently
executing threads because of the semaphore count check. Therefore this takes
around 4 seconds to run.
:param f: checking function. Should return true if execution is allowed
:param delay: delay in seconds between calling ``f()``"""
self.f = f; self.delay = delay
[docs] def __ror__(self, it):
f = self.f; delay = self.delay
for e in it:
while not f(): time.sleep(delay)
yield e
[docs] @staticmethod
def cpu(maxUtilization=90):
"""Limits flow rate when cpu utilization is more than a specified
percentage amount. Needs to install the package ``psutil`` to actually work.
Example::
# returns [0, 1, 4, 9, 16]
range(5) | rateLimit.cpu() | apply(op()**2) | deref()"""
import psutil
return rateLimit(lambda: psutil.cpu_percent() < maxUtilization)
[docs]def tab(pad:str=" "*4):
"""Indents incoming string iterator.
Example::
# prints out indented 0 to 9
range(10) | tab() | headOut()"""
return cli.apply(lambda x: f"{pad}{x}")
indent = tab
settings.add("arrayTypes", (torch.Tensor, np.ndarray), "default array types used to accelerate clis")
[docs]class toSum(BaseCli):
[docs] def __init__(self):
"""Calculates the sum of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`np.ndarray`.
Example::
# returns 45
range(10) | toSum()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[float]):
if isinstance(it, settings.arrayTypes): return it.sum()
return sum(it)
[docs]class toProd(BaseCli):
[docs] def __init__(self):
"""Calculates the product of a list of numbers. Can pipe in :class:`torch.Tensor` or :class:`np.ndarray`.
Example::
# returns 362880
range(1,10) | toProd()"""
super().__init__()
[docs] def __ror__(self, it):
if isinstance(it, settings.arrayTypes): return it.prod()
else: return math.prod(it)
[docs]class toAvg(BaseCli):
[docs] def __init__(self):
"""Calculates average of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`np.ndarray`.
Example::
# returns 4.5
range(10) | toAvg()
# returns nan
[] | toAvg()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[float]):
if isinstance(it, settings.arrayTypes): return it.mean()
s = 0; i = -1
for i, v in enumerate(it): s += v
i += 1
if not k1lib.settings.cli.strict and i == 0: return float("nan")
return s / i
toMean = toAvg
[docs]class toMax(BaseCli):
[docs] def __init__(self):
"""Calculates the max of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`np.ndarray`.
Example::
# returns 6
[2, 5, 6, 1, 2] | toMax()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[float]) -> float:
if isinstance(it, settings.arrayTypes): return it.max()
return max(it)
[docs]class toMin(BaseCli):
[docs] def __init__(self):
"""Calculates the min of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`np.ndarray`.
Example::
# returns 1
[2, 5, 6, 1, 2] | toMin()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[float]) -> float:
if isinstance(it, settings.arrayTypes): return it.min()
return min(it)
[docs]class toPIL(BaseCli):
[docs] def __init__(self):
"""Converts a path to a PIL image.
Example::
ls(".") | toPIL().all() | item() # get first image"""
import PIL; self.PIL = PIL
[docs] def __ror__(self, path) -> "PIL.Image.Image":
return self.PIL.Image.open(path)
toImg = toPIL
[docs]class toRgb(BaseCli):
[docs] def __init__(self):
"""Converts greyscale/rgb PIL image to rgb image.
Example::
# reads image file and converts it to rgb
"a.png" | toPIL() | toRgb()"""
import PIL; self.PIL = PIL
[docs] def __ror__(self, i):
rgbI = self.PIL.Image.new("RGB", i.size)
rgbI.paste(i); return rgbI
[docs]class toRgba(BaseCli):
[docs] def __init__(self):
"""Converts random PIL image to rgba image.
Example::
# reads image file and converts it to rgba
"a.png" | toPIL() | toRgba()"""
import PIL; self.PIL = PIL
[docs] def __ror__(self, i):
rgbI = self.PIL.Image.new("RGBA", i.size)
rgbI.paste(i); return rgbI
[docs]class toBin(BaseCli):
[docs] def __init__(self):
"""Converts integer to binary string.
Example::
# returns "101"
5 | toBin()"""
super().__init__()
[docs] def __ror__(self, it): return bin(int(it))[2:]
[docs]class toIdx(BaseCli):
[docs] def __init__(self, chars:str):
"""Get index of characters according to a reference.
Example::
# returns [1, 4, 4, 8]
"#&&*" | toIdx("!#$%&'()*+") | deref()"""
self.chars = {v:k for k, v in enumerate(chars)}
[docs] def __ror__(self, it):
chars = self.chars
for e in it: yield chars[e]
[docs]class clipboard(BaseCli):
[docs] def __init__(self):
"""Saves the input to clipboard.
Example::
# copies "abc" into the clipboard. Just use Ctrl+V to paste as usual
"abc" | clipboard()"""
import pyperclip; self.pyperclip = pyperclip
[docs] def __ror__(self, s): self.pyperclip.copy(s)
[docs]class lengths(BaseCli):
[docs] def __init__(self):
"""Returns the lengths of each element.
Example::
[range(5), range(10)] | lengths() == [5, 10]
This is a simpler (and faster!) version of :class:`shape`. It assumes each element
can be called with ``len(x)``, while :class:`shape` iterates through every elements
to get the length, and thus is much slower."""
super().__init__()
[docs] def __ror__(self, it:Iterator[List[Any]]) -> Iterator[int]:
for e in it: yield len(e)
settings.atomic.add("deref", (numbers.Number, np.number, str, dict, bool, bytes, torch.nn.Module, k1lib.UValue), "used by deref")
Tensor = torch.Tensor; atomic = settings.atomic
class inv_dereference(BaseCli):
def __init__(self, igT=False):
"""Kinda the inverse to :class:`dereference`"""
super().__init__(); self.igT = igT
def __ror__(self, it:Iterator[Any]) -> List[Any]:
for e in it:
if e is None or isinstance(e, atomic.deref): yield e
elif isinstance(e, settings.arrayTypes):
if not self.igT and len(e.shape) == 0: yield e.item()
else: yield e
else:
try: yield e | self
except: yield e
[docs]class deref(BaseCli):
[docs] def __init__(self, maxDepth=float("inf"), igT=True):
"""Recursively converts any iterator into a list. Only :class:`str`,
:class:`numbers.Number` and :class:`~torch.nn.Module` are not converted. Example::
# returns something like "<range_iterator at 0x7fa8c52ca870>"
iter(range(5))
# returns [0, 1, 2, 3, 4]
iter(range(5)) | deref()
# returns [2, 3], yieldSentinel stops things early
[2, 3, yieldSentinel, 6] | deref()
You can also specify a ``maxDepth``::
# returns something like "<list_iterator at 0x7f810cf0fdc0>"
iter([range(3)]) | deref(0)
# returns [range(3)]
iter([range(3)]) | deref(1)
# returns [[0, 1, 2]]
iter([range(3)]) | deref(2)
There are a few classes/types that are considered atomic, and :class:`deref`
will never try to iterate over it. If you wish to change it, do something like::
settings.cli.atomic.deref = (int, float, ...)
.. warning::
Can work well with PyTorch Tensors, but not Numpy arrays as they screw things up
with the __ror__ operator, so do torch.from_numpy(...) first. Don't worry about
unnecessary copying, as numpy and torch both utilizes the buffer protocol.
:param maxDepth: maximum depth to dereference. Starts at 0 for not doing anything
at all
:param igT: short for "ignore tensor". If True, then don't loop over :class:`torch.Tensor`
and :class:`numpy.ndarray` internals"""
super().__init__(); self.igT = igT
self.maxDepth = maxDepth; self.depth = 0
self.arrayType = (torch.Tensor, np.ndarray) if k1lib.settings.startup.or_patch else torch.Tensor
[docs] def __ror__(self, it:Iterator[T]) -> List[T]:
if self.depth >= self.maxDepth: return it
elif isinstance(it, atomic.deref): return it
elif isinstance(it, self.arrayType):
if self.igT: return it
if len(it.shape) == 0: return it.item()
try: iter(it)
except: return it
self.depth += 1; answer = []
for e in it:
if e is cli.yieldSentinel: return answer
answer.append(self.__ror__(e))
self.depth -= 1; return answer
[docs] def __invert__(self) -> BaseCli:
"""Returns a :class:`~k1lib.cli.init.BaseCli` that makes
everything an iterator. Not entirely sure when this comes in handy, but it's
there."""
return inv_dereference(self.igT)
[docs]class bindec(BaseCli):
[docs] def __init__(self, cats:List[Any], f=None):
"""Binary decodes the input.
Example::
# returns ['a', 'c']
5 | bindec("abcdef")
# returns 'a,c'
5 | bindec("abcdef", join(","))
:param cats: categories
:param f: transformation function of the selected elements. Defaulted to :class:`toList`, but others like :class:`join` is useful too"""
self.cats = cats; self.f = f or toList()
[docs] def __ror__(self, it):
it = bin(int(it))[2:][::-1]
return (e for i, e in zip(it, self.cats) if i == '1') | self.f
settings.add("smooth", 10, "default smooth amount, used in utils.smooth")
[docs]class smooth(BaseCli):
[docs] def __init__(self, consecutives=None):
"""Smoothes out the input stream.
Literally just a shortcut for::
batched(consecutives) | toMean().all()
Example::
# returns [4.5, 14.5, 24.5]
range(30) | smooth(10) | deref()
Smoothing over :class:`torch.Tensor` or :class:`np.ndarray` will
be much faster, and produce high dimensional results::
# returns torch.Tensor with shape (2, 3, 4)
torch.randn(10, 3, 4) | smooth(4)
The default consecutive value is in ``settings.cli.smooth``. This
is useful if you are smoothing over multiple lists at the same
time, like this::
# can change a single smooth value temporarily here, and all sequences will be smoothed in the same way
with settings.cli.context(smooth=5):
x = list(np.linspace(-2, 2, 50))
y = x | apply(op()**2) | deref()
plt.plot(x | smooth() | deref(), y | smooth() | deref())
:param consecutives: if not defined, then used the value inside ``settings.cli.smooth``"""
self.b = cli.batched(consecutives or settings.smooth)
[docs] def __ror__(self, it):
it = it | self.b
if isinstance(it, settings.arrayTypes): return it.mean(1)
return it | toMean().all()
def _f(): pass
_code = type(_f.__code__)
[docs]def disassemble(f=None):
"""Disassembles anything piped into it.
Normal usage::
def f(a, b):
return a**2 + b
# both of these print out disassembled info
f | disassemble()
disassemble(f)
# you can pass in lambdas
disassemble(lambda x: x + 3)
# or even raw code
"lambda x: x + 3" | disassemble()"""
c = f
if c is None: return cli.aS(disassemble)
if isinstance(c, str): c = compile(c, "", "exec")
try: c = c.__code__
except: pass
if not isinstance(c, _code): raise RuntimeError(f"`{c}` is not a code object/function/class method/string code")
print(f"co_argcount: {c.co_argcount}")
print(f"co_cellvars: {c.co_cellvars}")
print(f"co_consts: {c.co_consts}")
print(f"co_filename: {c.co_filename}")
print(f"co_firstlineno: {c.co_firstlineno}")
print(f"co_flags: {c.co_flags}")
print(f"co_freevars: {c.co_freevars}")
print(f"co_kwonlyargcount: {c.co_kwonlyargcount}")
print(f"co_lnotab: {c.co_lnotab | toStr() | join(' ')}")
print(f"co_name: {c.co_name}")
print(f"co_names: {c.co_names}")
print(f"co_nlocals: {c.co_nlocals}")
print(f"co_posonlyargcount: {c.co_posonlyargcount}")
print(f"co_stacksize: {c.co_stacksize}")
print(f"co_varnames: {c.co_varnames}")
print(f"Disassembly:"); dis.disassemble(c)
with k1lib.captureStdout() as out:
c.co_consts | cli.filt(lambda x: "code" in str(type(x))) | cli.tee(lambda _: "----------------------- inner code object -----------------------\n") | cli.apply(disassemble) | cli.ignore()
out() | cli.filt(cli.op().strip() != "") | cli.apply("|" + cli.op()) | cli.indent() | cli.stdout()