Source code for k1lib.cli.utils

# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for all short utilities that has the boilerplate feeling. Conversion clis
might feel they have different styles, as :class:`toFloat` converts object iterator to
float iterator, while :class:`toPIL` converts single image url to single PIL image,
whereas :class:`toSum` converts float iterator into a single float value.

The general convention is, if the intended operation sounds simple (convert to floats,
strings, types, ...), then most likely it will convert iterator to iterator, as you
can always use the function directly if you only want to apply it on 1 object.

If it sounds complicated (convert to PIL image, tensor, ...) then most likely it will
convert object to object. Lastly, there are some that just feels right to input
an iterator and output a single object (like getting max, min, std, mean values)."""
from k1lib.cli.init import patchDefaultDelim, BaseCli, Table, T
import k1lib.cli as cli, numbers, torch, numpy as np
from typing import overload, Iterator, Any, List, Set, Union
import k1lib, time
__all__ = ["size", "shape", "item", "identity", "iden",
           "toStr", "join", "toNumpy", "toTensor",
           "toList", "wrapList", "toSet", "toIter", "toRange", "toType",
           "equals", "reverse", "ignore", "rateLimit",
           "toSum", "toAvg", "toMean", "toMax", "toMin", "toPIL",
           "toImg", "toRgb", "toRgba", "toBin", "toIdx", "clipboard",
           "lengths", "headerIdx", "deref", "bindec", "smooth"]
settings = k1lib.settings.cli
def exploreSize(it):
    """Returns first element and length of array"""
    if isinstance(it, str): raise TypeError("Just here to terminate shape()")
    sentinel = object(); it = iter(it)
    o = next(it, sentinel); count = 1
    if o is sentinel: return None, 0
    try:
        while True: next(it); count += 1
    except StopIteration: pass
    return o, count
[docs]class size(BaseCli):
[docs] def __init__(self, idx=None): """Returns number of rows and columns in the input. Example:: # returns (3, 2) [[2, 3], [4, 5, 6], [3]] | size() # returns 3 [[2, 3], [4, 5, 6], [3]] | size(0) # returns 2 [[2, 3], [4, 5, 6], [3]] | size(1) # returns (2, 0) [[], [2, 3]] | size() # returns (3,) [2, 3, 5] | size() # returns 3 [2, 3, 5] | size(0) # returns (3, 2, 2) [[[2, 1], [0, 6, 7]], 3, 5] | size() # returns (1,) and not (1, 3) ["abc"] | size() # returns (1, 2, 3) [torch.randn(2, 3)] | size() # returns (2, 3, 5) size()(np.random.randn(2, 3, 5)) There's also :class:`lengths`, which is sort of a simplified/faster version of this, but only use it if you are sure that ``len(it)`` can be called. If encounter PyTorch tensors or Numpy arrays, then this will just get the shape instead of actually looping over them. :param idx: if idx is None return (rows, columns). If 0 or 1, then rows or columns""" super().__init__(); self.idx = idx
[docs] def __ror__(self, it:Iterator[str]): if self.idx is None: answer = [] try: while True: if isinstance(it, (torch.Tensor, np.ndarray)): return tuple(answer + list(it.shape)) it, s = exploreSize(it); answer.append(s) except TypeError: pass return tuple(answer) else: return exploreSize(it | cli.item(self.idx))[1]
shape = size noFill = object()
[docs]class item(BaseCli):
[docs] def __init__(self, amt:int=1, fill=noFill): """Returns the first row. Example:: # returns 0 iter(range(5)) | item() # returns torch.Size([5]) torch.randn(3,4,5) | item(2) | shape() # returns 3 [] | item(fill=3) :param amt: how many times do you want to call item() back to back? :param fill: if iterator length is 0, return this""" self.amt = amt; self.fill = fill self.fillP = [fill] if fill != noFill else [] # preprocessed, to be faster
[docs] def __ror__(self, it:Iterator[str]): if self.amt != 1: return it | cli.serial(*(item(fill=self.fill) for _ in range(self.amt))) return next(iter(it), *self.fillP)
[docs]class identity(BaseCli):
[docs] def __init__(self): """Yields whatever the input is. Useful for multiple streams. Example:: # returns range(5) range(5) | identity()""" super().__init__()
[docs] def __ror__(self, it:Iterator[Any]): return it
iden = identity
[docs]class toStr(BaseCli):
[docs] def __init__(self, column:int=None): """Converts every line to a string. Example:: # returns ['2', 'a'] [2, "a"] | toStr() | deref() # returns [[2, 'a'], [3, '5']] assert [[2, "a"], [3, 5]] | toStr(1) | deref()""" super().__init__(); self.column = column
[docs] def __ror__(self, it:Iterator[str]): c = self.column if c is None: for line in it: yield str(line) else: for row in it: yield [e if i != c else str(e) for i, e in enumerate(row)]
[docs]class join(BaseCli):
[docs] def __init__(self, delim:str=None): r"""Merges all strings into 1, with `delim` in the middle. Basically :meth:`str.join`. Example:: # returns '2\na' [2, "a"] | join("\n")""" super().__init__(); self.delim = patchDefaultDelim(delim)
[docs] def __ror__(self, it:Iterator[str]): return self.delim.join(it | toStr())
[docs]class toNumpy(BaseCli):
[docs] def __init__(self): """Converts generator to numpy array. Essentially ``np.array(list(it))``""" super().__init__()
[docs] def __ror__(self, it:Iterator[float]) -> np.array: return np.array(list(it))
[docs]class toTensor(BaseCli):
[docs] def __init__(self, dtype=torch.float32): """Converts generator to :class:`torch.Tensor`. Essentially ``torch.tensor(list(it))``. Also checks if input is a PIL Image. If yes, turn it into a :class:`torch.Tensor` and return.""" self.dtype = dtype
[docs] def __ror__(self, it:Iterator[float]) -> torch.Tensor: try: import PIL; pic=it if isinstance(pic, PIL.Image.Image): # stolen from torchvision ToTensor transform mode_to_nptype = {'I': np.int32, 'I;16': np.int16, 'F': np.float32} img = torch.from_numpy(np.array(pic, mode_to_nptype.get(pic.mode, np.uint8), copy=True)) if pic.mode == '1': img = 255 * img img = img.view(pic.size[1], pic.size[0], len(pic.getbands())) return img.permute((2, 0, 1)).contiguous().to(self.dtype) # put it from HWC to CHW format except: pass return torch.tensor(list(it)).to(self.dtype)
[docs]class toList(BaseCli):
[docs] def __init__(self): """Converts generator to list. :class:`list` would do the same, but this is just to maintain the style""" super().__init__()
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: return list(it)
[docs]class wrapList(BaseCli):
[docs] def __init__(self): """Wraps inputs inside a list. There's a more advanced cli tool built from this, which is :meth:`~k1lib.cli.structural.unsqueeze`.""" super().__init__()
[docs] def __ror__(self, it:T) -> List[T]: return [it]
[docs]class toSet(BaseCli):
[docs] def __init__(self): """Converts generator to set. :class:`set` would do the same, but this is just to maintain the style""" super().__init__()
[docs] def __ror__(self, it:Iterator[T]) -> Set[T]: return set(it)
[docs]class toIter(BaseCli):
[docs] def __init__(self): """Converts object to iterator. `iter()` would do the same, but this is just to maintain the style""" super().__init__()
[docs] def __ror__(self, it:List[T]) -> Iterator[T]: return iter(it)
[docs]class toRange(BaseCli):
[docs] def __init__(self): """Returns iter(range(len(it))), effectively""" super().__init__()
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[int]: for i, _ in enumerate(it): yield i
[docs]class toType(BaseCli):
[docs] def __init__(self): """Converts object to its type. Example:: # returns [int, float, str, torch.Tensor] [2, 3.5, "ah", torch.randn(2, 3)] | toType() | deref()""" super().__init__()
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[type]: for e in it: yield type(e)
class _EarlyExp(Exception): pass
[docs]class equals:
[docs] def __init__(self): """Checks if all incoming columns/streams are identical""" super().__init__()
[docs] def __ror__(self, streams:Iterator[Iterator[str]]): streams = list(streams) for row in zip(*streams): sampleElem = row[0] try: for elem in row: if sampleElem != elem: yield False; raise _EarlyExp() yield True except _EarlyExp: pass
[docs]class reverse(BaseCli):
[docs] def __init__(self): """Reverses incoming list. Example:: # returns [3, 5, 2] [2, 5, 3] | reverse() | deref()""" super().__init__()
[docs] def __ror__(self, it:Iterator[str]) -> List[str]: return reversed(list(it))
[docs]class ignore(BaseCli):
[docs] def __init__(self): r"""Just loops through everything, ignoring the output. Example:: # will just return an iterator, and not print anything [2, 3] | apply(lambda x: print(x)) # will prints "2\n3" [2, 3] | apply(lambda x: print(x)) | ignore()""" super().__init__()
[docs] def __ror__(self, it:Iterator[Any]): for _ in it: pass
[docs]class rateLimit(BaseCli):
[docs] def __init__(self, f, delay=0.1): """Limits the execution flow rate upon a condition. Example:: s = 0; semaphore = 0 def heavyAsyncOperation(i): global semaphore, s semaphore += 1 s += i; time.sleep(1) semaphore -= 1; return i**2 # returns (20,), takes 1s to run range(20) | applyTh(heavyAsyncOperation, 100) | shape() # returns (20,), takes 4s to run (20/5 = 4) range(20) | rateLimit(lambda: semaphore < 5) | applyTh(heavyAsyncOperation, 100) | shape() The first test case is not rate-limited, so it will run all 20 threads at the same time, and all of them will finish after 1 second. The second test case is rate-limited, so that there can only be 5 concurrently executing threads because of the semaphore count check. Therefore this takes around 4 seconds to run. :param f: checking function. Should return true if execution is allowed :param delay: delay in seconds between calling ``f()``""" self.f = f; self.delay = delay
[docs] def __ror__(self, it): f = self.f; delay = self.delay for e in it: while not f(): time.sleep(delay) yield e
[docs] @staticmethod def cpu(maxUtilization=90): """Limits flow rate when cpu utilization is more than a specified percentage amount. Needs to install the package ``psutil`` to actually work. Example:: # returns [0, 1, 4, 9, 16] range(5) | rateLimit.cpu() | apply(op()**2) | deref()""" import psutil return rateLimit(lambda: psutil.cpu_percent() < maxUtilization)
[docs]class toSum(BaseCli):
[docs] def __init__(self): """Calculates the sum of list of numbers. Can pipe in :class:`torch.Tensor`. Example:: # returns 45 range(10) | toSum()""" super().__init__()
[docs] def __ror__(self, it:Iterator[float]): if isinstance(it, torch.Tensor): return it.sum() return sum(it)
[docs]class toAvg(BaseCli):
[docs] def __init__(self): """Calculates average of list of numbers. Can pipe in :class:`torch.Tensor`. Example:: # returns 4.5 range(10) | toAvg() # returns nan [] | toAvg()""" super().__init__()
[docs] def __ror__(self, it:Iterator[float]): if isinstance(it, torch.Tensor): return it.mean() s = 0; i = -1 for i, v in enumerate(it): s += v i += 1 if not k1lib.settings.cli.strict and i == 0: return float("nan") return s / i
toMean = toAvg
[docs]class toMax(BaseCli):
[docs] def __init__(self): """Calculates the max of a bunch of numbers. Can pipe in :class:`torch.Tensor`. Example:: # returns 6 [2, 5, 6, 1, 2] | toMax()""" super().__init__()
[docs] def __ror__(self, it:Iterator[float]) -> float: if isinstance(it, torch.Tensor): return it.max() return max(it)
[docs]class toMin(BaseCli):
[docs] def __init__(self): """Calculates the min of a bunch of numbers. Can pipe in :class:`torch.Tensor`. Example:: # returns 1 [2, 5, 6, 1, 2] | toMin()""" super().__init__()
[docs] def __ror__(self, it:Iterator[float]) -> float: if isinstance(it, torch.Tensor): return it.min() return min(it)
[docs]class toPIL(BaseCli):
[docs] def __init__(self): """Converts a path to a PIL image. Example:: ls(".") | toPIL().all() | item() # get first image""" import PIL; self.PIL = PIL
[docs] def __ror__(self, path) -> "PIL.Image.Image": return self.PIL.Image.open(path)
toImg = toPIL
[docs]class toRgb(BaseCli):
[docs] def __init__(self): """Converts greyscale/rgb PIL image to rgb image. Example:: # reads image file and converts it to rgb "a.png" | toPIL() | toRgb()""" import PIL; self.PIL = PIL
[docs] def __ror__(self, i): rgbI = self.PIL.Image.new("RGB", i.size) rgbI.paste(i); return rgbI
[docs]class toRgba(BaseCli):
[docs] def __init__(self): """Converts random PIL image to rgba image. Example:: # reads image file and converts it to rgba "a.png" | toPIL() | toRgba()""" import PIL; self.PIL = PIL
[docs] def __ror__(self, i): rgbI = self.PIL.Image.new("RGBA", i.size) rgbI.paste(i); return rgbI
[docs]class toBin(BaseCli):
[docs] def __init__(self): """Converts integer to binary string. Example:: # returns "101" 5 | toBin()""" super().__init__()
[docs] def __ror__(self, it): return bin(int(it))[2:]
[docs]class toIdx(BaseCli):
[docs] def __init__(self, chars:str): """Get index of characters according to a reference. Example:: # returns [1, 4, 4, 8] "#&&*" | toIdx("!#$%&'()*+") | deref()""" self.chars = {v:k for k, v in enumerate(chars)}
[docs] def __ror__(self, it): chars = self.chars for e in it: yield chars[e]
[docs]class clipboard(BaseCli):
[docs] def __init__(self): """Saves the input to clipboard. Example:: # copies "abc" into the clipboard. Just use Ctrl+V to paste as usual "abc" | clipboard()""" import pyperclip; self.pyperclip = pyperclip
[docs] def __ror__(self, s): self.pyperclip.copy(s)
[docs]class lengths(BaseCli):
[docs] def __init__(self): """Returns the lengths of each element. Example:: [range(5), range(10)] | lengths() == [5, 10] This is a simpler (and faster!) version of :class:`shape`. It assumes each element can be called with ``len(x)``, while :class:`shape` iterates through every elements to get the length, and thus is much slower.""" super().__init__()
[docs] def __ror__(self, it:Iterator[List[Any]]) -> Iterator[int]: for e in it: yield len(e)
[docs]def headerIdx(): """Cuts out first line, put an index column next to it, and prints it out. Useful when you want to know what your column's index is to cut it out. Also sets the context variable "header", in case you need it later. Example:: # returns [[0, 'a'], [1, 'b'], [2, 'c']] ["abc"] | headerIdx() | deref()""" return item() | cli.wrapList() | cli.transpose() | cli.insertIdColumn(True)
settings.atomic.add("deref", (numbers.Number, np.number, str, dict, bool, bytes, torch.nn.Module), "used by deref") Tensor = torch.Tensor; atomic = settings.atomic class inv_dereference(BaseCli): def __init__(self, ignoreTensors=False): """Kinda the inverse to :class:`dereference`""" super().__init__(); self.ignoreTensors = ignoreTensors def __ror__(self, it:Iterator[Any]) -> List[Any]: for e in it: if e is None or isinstance(e, atomic.deref): yield e elif isinstance(e, Tensor): if not self.ignoreTensors and len(e.shape) == 0: yield e.item() else: yield e else: try: yield e | self except: yield e
[docs]class deref(BaseCli):
[docs] def __init__(self, maxDepth=float("inf"), ignoreTensors=True): """Recursively converts any iterator into a list. Only :class:`str`, :class:`numbers.Number` and :class:`~torch.nn.Module` are not converted. Example:: # returns something like "<range_iterator at 0x7fa8c52ca870>" iter(range(5)) # returns [0, 1, 2, 3, 4] iter(range(5)) | deref() # returns [2, 3], yieldSentinel stops things early [2, 3, yieldSentinel, 6] | deref() You can also specify a ``maxDepth``:: # returns something like "<list_iterator at 0x7f810cf0fdc0>" iter([range(3)]) | deref(0) # returns [range(3)] iter([range(3)]) | deref(1) # returns [[0, 1, 2]] iter([range(3)]) | deref(2) There are a few classes/types that are considered atomic, and :class:`deref` will never try to iterate over it. If you wish to change it, do something like:: settings.cli.atomic.deref = (int, float, ...) .. warning:: Can work well with PyTorch Tensors, but not Numpy arrays as they screw things up with the __ror__ operator, so do torch.from_numpy(...) first. Don't worry about unnecessary copying, as numpy and torch both utilizes the buffer protocol. :param maxDepth: maximum depth to dereference. Starts at 0 for not doing anything at all :param ignoreTensors: if True, then don't loop over :class:`torch.Tensor` internals""" super().__init__(); self.ignoreTensors = ignoreTensors self.maxDepth = maxDepth; self.depth = 0
[docs] def __ror__(self, it:Iterator[T]) -> List[T]: ignoreTensors = self.ignoreTensors if self.depth >= self.maxDepth: return it elif isinstance(it, atomic.deref): return it elif isinstance(it, Tensor): if ignoreTensors: return it if len(it.shape) == 0: return it.item() try: iter(it) except: return it self.depth += 1; answer = [] for e in it: if e is cli.yieldSentinel: return answer answer.append(self.__ror__(e)) self.depth -= 1; return answer
[docs] def __invert__(self) -> BaseCli: """Returns a :class:`~k1lib.cli.init.BaseCli` that makes everything an iterator. Not entirely sure when this comes in handy, but it's there.""" return inv_dereference(self.ignoreTensors)
[docs]class bindec(BaseCli):
[docs] def __init__(self, cats:List[Any], f=None): """Binary decodes the input. Example:: # returns ['a', 'c'] 5 | bindec("abcdef") # returns 'a,c' 5 | bindec("abcdef", join(",")) :param cats: categories :param f: transformation function of the selected elements. Defaulted to :class:`toList`, but others like :class:`join` is useful too""" self.cats = cats; self.f = f or toList()
[docs] def __ror__(self, it): it = bin(int(it))[2:][::-1] return (e for i, e in zip(it, self.cats) if i == '1') | self.f
settings.add("smooth", 10, "default smooth amount, used in utils.smooth")
[docs]def smooth(consecutives=None): """Smoothes out the input stream. Literally just a shortcut for:: batched(consecutives) | toMean().all() Example:: # returns [4.5, 14.5, 24.5] range(30) | smooth(10) | deref() :param consecutives: if not defined, then used the value inside ``settings.cli.smooth``""" return cli.batched(consecutives or settings.smooth) | toMean().all()