Source code for k1lib.cli.modifier

# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for quick modifiers, think of them as changing formats
"""
__all__ = ["applyS", "apply", "applyMp", "applyTh", "applySerial",
           "replace", "remove", "toFloat", "toInt",
           "sort", "sortF", "consume", "randomize", "stagger", "op"]
from typing import Callable, Iterator, Any, Union, List
from k1lib.cli.init import patchDefaultDelim, BaseCli, T, fastF
import k1lib.cli as cli, numpy as np, torch, threading
import torch.multiprocessing as mp; from collections import deque
from functools import partial, update_wrapper, lru_cache
import dill, pickle, k1lib, warnings, atexit, signal
[docs]class applyS(BaseCli):
[docs] def __init__(self, f:Callable[[T], T]): """Like :class:`apply`, but much simpler, just operating on the entire input object, essentially. The "S" stands for "single". Example:: # returns 5 3 | applyS(lambda x: x+2) Like :class:`apply`, you can also use this as a decorator like this:: @applyS def f(x): return x+2 # returns 5 3 | f This also decorates the returned object so that it has same qualname, docstring and whatnot.""" super().__init__(fs=[f]); self.f = f update_wrapper(self, f)
[docs] def __ror__(self, it:T) -> T: return self.f(it)
[docs]class apply(BaseCli):
[docs] def __init__(self, f:Callable[[str], str], column:int=None, cache:int=0): """Applies a function f to every line. Example:: # returns [0, 1, 4, 9, 16] range(5) | apply(lambda x: x**2) | deref() # returns [[3.0, 1.0, 1.0], [3.0, 1.0, 1.0]] torch.ones(2, 3) | apply(lambda x: x+2, 0) | deref() You can also use this as a decorator, like this:: @apply def f(x): return x**2 # returns [0, 1, 4, 9, 16] range(5) | f | deref() You can also adds a cache, like this:: def calc(i): time.sleep(0.5); return i**2 # takes 2.5s range(5) | repeatFrom(2) | apply(calc, cache=10) | deref() # takes 5s range(5) | repeatFrom(2) | apply(calc) | deref() :param column: if not None, then applies the function to that column only :param cache: if specified, then caches this much number of values""" super().__init__(fs=[f]); self.f = f; self.column = column; self.cache = cache
[docs] def __ror__(self, it:Iterator[str]): super().__ror__(it); f = fastF(self.f); c = self.column if self.cache > 0: f = lru_cache(self.cache)(f) if c is None: return (f(line) for line in it) else: return ([(e if i != c else f(e)) for i, e in enumerate(row)] for row in it)
def executeFunc(common, line): import dill f, kwargs = dill.loads(common) return f(dill.loads(line), **kwargs)
[docs]class applyMp(BaseCli): _pools = set()
[docs] def __init__(self, f:Callable[[T], T], prefetch:int=None, timeout:float=8, utilization:float=0.8, bs:int=1, **kwargs): """Like :class:`apply`, but execute ``f(row)`` of each row in multiple processes. Example:: # returns [3, 2] ["abc", "de"] | applyMp(lambda s: len(s)) | deref() # returns [5, 6, 9] range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | deref() # returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work someList = [1, 2, 3] ["abc", "de"] | applyMp(lambda s: someList) | deref() Internally, this will continuously spawn new jobs up until 80% of all CPU cores are utilized. On posix systems, the default multiprocessing start method is ``fork()``. This sort of means that all the variables in memory will be copied over. This might be expensive (might also not, with copy-on-write), so you might have to think about that. On windows and macos, the default start method is ``spawn``, meaning each child process is a completely new interpreter, so you have to pass in all required variables and reimport every dependencies. Read more at https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods If you don't wish to schedule all jobs at once, you can specify a ``prefetch`` amount, and it will only schedule that much jobs ahead of time. Example:: range(10000) | applyMp(lambda x: x**2) | head() | deref() # 700ms range(10000) | applyMp(lambda x: x**2, 5) | head() | deref() # 300ms # demonstrating there're no huge penalties even if we want all results at the same time range(10000) | applyMp(lambda x: x**2) | deref() # 900ms range(10000) | applyMp(lambda x: x**2, 5) | deref() # 1000ms The first line will schedule all jobs at once, and thus will require more RAM and compute power, even though we discard most of the results anyway (the :class:`~k1lib.cli.filt.head` cli). The second line only schedules 5 jobs ahead of time, and thus will be extremely more efficient if you don't need all results right away. .. note:: Remember that every :class:`~k1lib.cli.init.BaseCli` is also a function, meaning that you can do stuff like:: # returns [['ab', 'ac']] [["ab", "cd", "ac"]] | applyMp(filt(op().startswith("a")) | deref()) | deref() Also remember that the return result of ``f`` should not be a generator. That's why in the example above, there's a ``deref()`` inside f. Most of the time, you would probably want to specify ``bs`` to something bigger than 1 (may be 32 or sth like that). This will executes ``f`` multiple times in a single job, instead of executing ``f`` only once per job. Should reduce overhead of process creation dramatically. If you encounter strange errors not seen on :class:`apply`, you can try to clear all pools (using :meth:`clearPools`), to terminate all child processes and thus free resources. On earlier versions, you have to do this manually before exiting, but now :class:`applyMp` is much more robust. :param prefetch: if not specified, schedules all jobs at the same time. If specified, schedules jobs so that there'll only be a specified amount of jobs, and will only schedule more if results are actually being used. :param timeout: seconds to wait for job before raising an error :param utilization: how many percent cores are we running? 0 for no cores, 1 for all the cores. Defaulted to 0.8 :param bs: if specified, groups ``bs`` number of transforms into 1 job to be more efficient. :param kwargs: extra arguments to be passed to the function. ``args`` not included as there're a couple of options you can pass for this cli.""" super().__init__(fs=[f]); self.f = fastF(f) self.prefetch = prefetch or 1_000_000 self.timeout = timeout; self.utilization = utilization self.bs = bs; self.kwargs = kwargs
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]: super().__ror__(it); it = iter(it) # really make sure it's an iterator, for prefetch bs = self.bs; timeout = self.timeout if bs > 1: return it | cli.batched(bs, True) | applyMp(apply(self.f) | cli.deref(), self.prefetch, timeout, **self.kwargs) | cli.joinStreams() self.p = p = mp.Pool(int(mp.cpu_count()*self.utilization), lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)) applyMp._pools.add(p) common = dill.dumps([self.f, self.kwargs]) def gen(): try: fs = deque() for i, line in zip(range(self.prefetch), it): fs.append(p.apply_async(executeFunc, [common, dill.dumps(line)])) for line in it: yield fs.popleft().get(timeout) fs.append(p.apply_async(executeFunc, [common, dill.dumps(line)])) for f in fs: yield f.get(timeout) except KeyboardInterrupt as e: print("applyMp interrupted. Terminating pool now") self.p.terminate(); applyMp._pools.remove(self.p); raise e except Exception as e: print("applyMp encounter errors. Terminating pool now") self.p.terminate(); applyMp._pools.remove(self.p); raise e else: self.p.terminate(); applyMp._pools.remove(self.p) return gen()
[docs] @staticmethod def clearPools(): """Terminate all existing pools. Do this before restarting/quitting the script/notebook to make sure all resources (like GPU) are freed. **Update**: you probably won't have to call this manually anymore since version 0.9, but if you run into problems, try doing this.""" for p in applyMp._pools: try: p.terminate() except: pass applyMp._pools = set()
[docs] @staticmethod def pools(): """Get set of all pools. Meant for debugging purposes only.""" return applyMp._pools
def __del__(self): if hasattr(self, "p"): self.p.terminate(); if self.p in applyMp._pools: applyMp._pools.remove(self.p)
# apparently, this doesn't do anything, at least in jupyter environment atexit.register(lambda: applyMp.clearPools()) thEmptySentinel = object()
[docs]class applyTh(BaseCli):
[docs] def __init__(self, f, prefetch:int=2, timeout:float=5, bs:int=1): """Kinda like the same as :class:`applyMp`, but executes ``f`` on multiple threads, instead of on multiple processes. Advantages: - Relatively low overhead for thread creation - Fast, if ``f`` is io-bound - Does not have to serialize and deserialize the result, meaning iterators can be exchanged Disadvantages: - Still has thread creation overhead, so it's still recommended to specify ``bs`` - Is slow if ``f`` has to obtain the GIL to be able to do anything All examples from :class:`applyMp` should work perfectly here.""" super().__init__(fs=[f]); self.f = f; self.bs = bs self.prefetch = prefetch; self.timeout = timeout
[docs] def __ror__(self, it): if self.bs > 1: yield from (it | cli.batched(self.bs, True) | applyTh(apply(self.f), self.prefetch, self.timeout) | cli.joinStreams()); return datas = deque(); it = iter(it) innerF = fastF(self.f); timeout = self.timeout def f(line, wrapper): wrapper.value = innerF(line) for _, line in zip(range(self.prefetch), it): w = k1lib.Wrapper(thEmptySentinel) t = threading.Thread(target=f, args=(line,w)) t.start(); datas.append((t, w)) for line in it: data = datas.popleft(); data[0].join(timeout) if data[1].value is thEmptySentinel: for data in datas: data[0].join(0.01) raise RuntimeError("Thread timed out!") yield data[1].value; w = k1lib.Wrapper(thEmptySentinel) t = threading.Thread(target=f, args=(line,w)) t.start(); datas.append((t, w)) for i in range(len(datas)): # do it this way so that python can remove threads early, due to ref counting data = datas.popleft(); data[0].join(timeout) if data[1].value is thEmptySentinel: for data in datas: data[0].join(0.01) raise RuntimeError("Thread timed out!") yield data[1].value
[docs]class applySerial(BaseCli):
[docs] def __init__(self, f, includeFirst=False): """Applies a function repeatedly. First yields input iterator ``x``. Then yields ``f(x)``, then ``f(f(x))``, then ``f(f(f(x)))`` and so on. Example:: # returns [4, 8, 16, 32, 64] 2 | applySerial(op()*2) | head(5) | deref() If the result of your operation is an iterator, you might want to :class:`~k1lib.cli.utils.deref` it, like this:: rs = iter(range(8)) | applySerial(rows()[::2]) # returns [0, 2, 4, 6] next(rs) | deref() # returns []. This is because all the elements are taken by the previous deref() next(rs) | deref() rs = iter(range(8)) | applySerial(rows()[::2] | deref()) # returns [0, 2, 4, 6] next(rs) # returns [0, 4] next(rs) # returns [0] next(rs) :param f: function to apply repeatedly :param includeFirst: whether to include the raw input value or not""" self.f = f; self.includeFirst = includeFirst
[docs] def __ror__(self, it): f = fastF(self.f) if not self.includeFirst: it = f(it) while True: yield it; it = f(it)# | cli.deref()
[docs]def replace(s:str, target:str=None, column:int=None): """Replaces substring `s` with `target` for each line. Example:: # returns ['104', 'ab0c'] ["1234", "ab23c"] | replace("23", "0") | deref() :param target: if not specified, then use the default delimiter specified in ``cliSettings``""" t = patchDefaultDelim(target) return apply(lambda e: e.replace(s, t), column)
[docs]def remove(s:str, column:int=None): """Removes a specific substring in each line.""" return replace(s, "", column)
def _toop(toOp, c, force, defaultValue): return apply(toOp, c) | (apply(lambda x: x or defaultValue, c) if force else cli.filt(cli.op() != None, c)) def _toFloat(e) -> Union[float, None]: try: return float(e) except: return None
[docs]def toFloat(*columns:List[int], force=False): """Converts every row into a float. Example:: # returns [1, 3, -2.3] ["1", "3", "-2.3"] | toFloat() | deref() # returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']] [["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | deref() With weird rows:: # returns [[1.0, 'a'], [8.0, 'c']] [["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0) | deref() # returns [[1.0, 'a'], [0.0, 'b'], [8.0, 'c']] [["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0, force=True) | deref() :param columns: if nothing, then will convert each row. If available, then convert all the specified columns :param force: if True, forces weird values to 0.0, else filters out all weird rows""" if len(columns) > 0: return cli.init.serial(*(_toop(_toFloat, c, force, 0.0) for c in columns)) else: return _toop(_toFloat, None, force, 0.0)
def _toInt(e) -> Union[int, None]: try: return int(float(e)) except: return None
[docs]def toInt(*columns:List[int], force=False): """Converts every row into an integer. Example:: # returns [1, 3, -2] ["1", "3", "-2.3"] | toInt() | deref() :param columns: if nothing, then will convert each row. If available, then convert all the specified columns :param force: if True, forces weird values to 0, else filters out all weird rows See also: :meth:`toFloat`""" if len(columns) > 0: return cli.init.serial(*(_toop(_toInt, c, force, 0) for c in columns)) else: return _toop(_toInt, None, force, 0)
[docs]class sort(BaseCli):
[docs] def __init__(self, column:int=0, numeric=True, reverse=False): """Sorts all lines based on a specific `column`. Example:: # returns [[5, 'a'], [1, 'b']] [[1, "b"], [5, "a"]] | ~sort(0) | deref() # returns [[2, 3]] [[1, "b"], [5, "a"], [2, 3]] | ~sort(1) | deref() # errors out, as you can't really compare str with int [[1, "b"], [2, 3], [5, "a"]] | sort(1, False) | deref() :param column: if None, sort rows based on themselves and not an element :param numeric: whether to convert column to float :param reverse: False for smaller to bigger, True for bigger to smaller. Use :meth:`__invert__` to quickly reverse the order instead of using this param""" super().__init__() self.column = column; self.reverse = reverse; self.numeric = numeric self.filterF = (lambda x: float(x)) if numeric else (lambda x: x)
[docs] def __ror__(self, it:Iterator[str]): super().__ror__(it); c = self.column if c is None: return it | cli.wrapList() | cli.transpose() | sort(0, self.numeric, self.reverse) f = self.filterF rows = (it | cli.isNumeric(c) if self.numeric else it) | cli.deref(maxDepth=2) def sortF(row): if len(row) > c: return f(row[c]) return float("inf") return iter(sorted(rows, key=sortF, reverse=self.reverse))
[docs] def __invert__(self): """Creates a clone that has the opposite sort order""" return sort(self.column, self.numeric, not self.reverse)
[docs]class sortF(BaseCli):
[docs] def __init__(self, f:Callable[[T], float], reverse=False): """Sorts rows using a function. Example:: # returns ['a', 'aa', 'aaa', 'aaaa', 'aaaaa'] ["a", "aaa", "aaaaa", "aa", "aaaa"] | sortF(lambda r: len(r)) | deref() # returns ['aaaaa', 'aaaa', 'aaa', 'aa', 'a'] ["a", "aaa", "aaaaa", "aa", "aaaa"] | ~sortF(lambda r: len(r)) | deref()""" super().__init__(fs=[f]); self.f = f; self.reverse = reverse
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]: super().__ror__(it) return iter(sorted(list(it), key=self.f, reverse=self.reverse))
[docs] def __invert__(self) -> "sortF": return sortF(self.f, not self.reverse)
[docs]class consume(BaseCli):
[docs] def __init__(self, f:Union[BaseCli, Callable[[T], None]]): r"""Consumes the iterator in a side stream. Returns the iterator. Kinda like the bash command ``tee``. Example:: # prints "0\n1\n2" and returns [0, 1, 2] range(3) | consume(headOut()) | toList() # prints "range(0, 3)" and returns [0, 1, 2] range(3) | consume(lambda it: print(it)) | toList() This is useful whenever you want to mutate something, but don't want to include the function result into the main stream.""" super().__init__(fs=[f]); self.f = f
[docs] def __ror__(self, it:T) -> T: super().__ror__(it); self.f(it); return it
[docs]class randomize(BaseCli):
[docs] def __init__(self, bs=100): """Randomize input stream. In order to be efficient, this does not convert the input iterator to a giant list and yield random values from that. Instead, this fetches ``bs`` items at a time, randomizes them, returns and fetch another ``bs`` items. If you want to do the giant list, then just pass in ``float("inf")``, or ``None``. Example:: # returns [0, 1, 2, 3, 4], effectively no randomize at all range(5) | randomize(1) | deref() # returns something like this: [1, 0, 2, 3, 5, 4, 6, 8, 7, 9]. You can clearly see the batches range(10) | randomize(3) | deref() # returns something like this: [7, 0, 5, 2, 4, 9, 6, 3, 1, 8] range(10) | randomize(float("inf")) | deref() # same as above range(10) | randomize(None) | deref()""" super().__init__(); self.bs = bs if bs != None else float("inf")
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]: super().__ror__(it) for batch in it | cli.batched(self.bs, True): batch = list(batch); perms = torch.randperm(len(batch)) for idx in perms: yield batch[idx]
class StaggeredStream: def __init__(self, stream:Iterator[T], every:int): """Not intended to be instantiated by the end user. Use :class:`stagger` instead.""" self.stream = stream; self.every = every def __iter__(self): for i, v in zip(range(self.every), self.stream): yield v def __len__(self): """Length of window (length of result if you were to deref it).""" return self.every
[docs]class stagger(BaseCli): """Staggers input stream into multiple stream "windows" placed serially. Best explained with an example:: o = range(10) | stagger(3) o | deref() # returns [0, 1, 2], 1st "window" o | deref() # returns [3, 4, 5], 2nd "window" o | deref() # returns [6, 7, 8] o | deref() # returns [9] o | deref() # returns [] This might be useful when you're constructing a data loader:: dataset = [range(20), range(30, 50)] | transpose() dl = dataset | batched(3) | (transpose() | toTensor()).all() | stagger(4) for epoch in range(3): for xb, yb in dl: # looping over a window print(epoch) # then something like: model(xb) The above code will print 6 lines. 4 of them is "0" (because we stagger every 4 batches), and xb's shape' will be (3,) (because we batched every 3 samples). You should also keep in mind that this doesn't really change the property of the stream itself. Essentially, treat these pairs of statement as being the same thing:: o = range(11, 100) # both returns 11 o | stagger(20) | item() o | item() # both returns [11, 12, ..., 20] o | head(10) | deref() o | stagger(20) | head(10) | deref() Lastly, multiple iterators might be getting values from the same stream window, meaning:: o = range(11, 100) | stagger(10) it1 = iter(o); it2 = iter(o) next(it1) # returns 11 next(it2) # returns 12 This may or may not be desirable. Also this should be obvious, but I want to mention this in case it's not clear to you.""" def __init__(self, every:int): self.every = int(every)
[docs] def __ror__(self, it:Iterator[T]) -> StaggeredStream: return StaggeredStream(iter(it), self.every)
[docs]class op(k1lib.Absorber, BaseCli): """Absorbs operations done on it and applies it on the stream. Based on :class:`~k1lib.Absorber`. Example:: t = torch.tensor([[1, 2, 3], [4, 5, 6.0]]) # returns [torch.tensor([[4., 5., 6., 7., 8., 9.]])] [t] | (op() + 3).view(1, -1).all() | deref() Basically, you can treat ``op()`` as the input tensor. Tbh, you can do the same thing with this:: [t] | applyS(lambda t: (t+3).view(-1, 1)).all() | deref() But that's kinda long and may not be obvious. This can be surprisingly resilient, as you can still combine with other cli tools as usual, for example:: # returns [2, 3], demonstrating "&" operator torch.randn(2, 3) | (op().shape & identity()) | deref() | item() a = torch.tensor([[1, 2, 3], [7, 8, 9]]) # returns torch.tensor([4, 5, 6]), demonstrating "+" operator for clis and not clis (a | op() + 3 + identity() | item() == torch.tensor([4, 5, 6])).all() # returns [[3], [3]], demonstrating .all() and "|" serial chaining torch.randn(2, 3) | (op().shape.all() | deref()) # returns [[8, 18], [9, 19]], demonstrating you can treat `op()` as a regular function [range(10), range(10, 20)] | transpose() | filt(op() > 7, 0) | deref() This can only deal with simple operations only. For complex operations, resort to the longer version ``applyS(lambda x: ...)`` instead! Performance-wise, there are some, but not a lot of degradation, so don't worry about it. Simple operations executes pretty much on par with native lambdas:: n = 10_000_000 # takes 1.48s for i in range(n): i**2 # takes 1.89s, 1.28x worse than for loop range(n) | apply(lambda x: x**2) | ignore() # takes 1.86s, 1.26x worse than for loop range(n) | apply(op()**2) | ignore() # takes 1.86s range(n) | (op()**2).all() | ignore() More complex operations can take more of a hit:: # takes 1.66s for i in range(n): i**2-3 # takes 2.02s, 1.22x worse than for loop range(n) | apply(lambda x: x**2-3) | ignore() # takes 2.81s, 1.69x worse than for loop range(n) | apply(op()**2-3) | ignore() Reserved operations that are not absorbed are: - all - __ror__ (__or__ still works!) - op_solidify""" def __init__(self): super().__init__({"_op_solidified": False})
[docs] def op_solidify(self): """Use this to not absorb ``__call__`` operations anymore and makes it feel like a regular function (still absorbs other operations though):: f = op()**2 3 | f # returns 9, but may be you don't want to pipe it in f.op_solidify() f(3) # returns 9""" self._ab_sentinel = True self._op_solidified = True self._ab_sentinel = False return self
[docs] def __ror__(self, it): return self.ab_operate(it)
def __or__(self, o): if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__or__(o) return super().__add__(o) def __add__(self, o): if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__add__(o) return super().__add__(o) def __and__(self, o): if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__and__(o) return super().__and__(o) def __call__(self, *args, **kwargs): if self._op_solidified: return self.ab_operate(*args, **kwargs) return super().__call__(*args, **kwargs)