# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for quick modifiers, think of them as changing formats
"""
__all__ = ["apply", "applyMp", "applyMpBatched", "applyS",
"lstrip", "rstrip", "strip",
"upper", "lower", "replace", "remove", "toFloat", "toInt",
"sort", "sortF", "consume", "randomize", "stagger", "op"]
from typing import Callable, Iterator, Any, Union, List
from k1lib.cli.init import patchDefaultDelim, BaseCli, settings, T
import k1lib.cli as cli, numpy as np, torch
import multiprocessing as mp; from collections import deque
from functools import partial, update_wrapper
import dill, pickle, k1lib, warnings
def executeFunc(common, line):
import dill
f, args, kwargs = dill.loads(common)
return f(dill.loads(line), *args, **kwargs)
[docs]class applyMp(BaseCli):
_pools = set()
[docs] def __init__(self, f:Callable[[T], T], prefetch:int=None, timeout:float=2, *args, **kwargs):
"""Like :class:`apply`, but execute ``f(row)`` of each row in
multiple processes. Example::
# returns [3, 2]
["abc", "de"] | applyMp(lambda s: len(s)) | deref()
# returns [5, 6, 9]
range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | deref()
# returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work
someList = [1, 2, 3]
["abc", "de"] | applyMp(lambda s: someList) | deref()
Internally, this will continuously spawn new jobs up until 80% of all CPU
cores are utilized. On posix systems, the default multiprocessing start method is
``fork()``. This sort of means that all the variables in memory will be copied
over. This might be expensive (might also not, with copy-on-write), so you might
have to think about that. On windows and macos, the default start method is
``spawn``, meaning each child process is a completely new interpreter, so you have
to pass in all required variables and reimport every dependencies. Read more at
https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
If you don't wish to schedule all jobs at once, you can specify a ``prefetch``
amount, and it will only schedule that much jobs ahead of time. Example::
range(10000) | applyMp(lambda x: x**2) | head() | deref() # 700ms
range(10000) | applyMp(lambda x: x**2, 5) | head() | deref() # 300ms
# demonstrating there're no huge penalties even if we want all results at the same time
range(10000) | applyMp(lambda x: x**2) | deref() # 900ms
range(10000) | applyMp(lambda x: x**2, 5) | deref() # 1000ms
The first line will schedule all jobs at once, and thus will require more RAM and
compute power, even though we discard most of the results anyway (the
:class:`~k1lib.cli.filt.head` cli). The second line only schedules 5 jobs ahead of
time, and thus will be extremely more efficient if you don't need all results right
away.
.. note::
Remember that every :class:`~k1lib.cli.init.BaseCli` is also a
function, meaning that you can do stuff like::
# returns [['ab', 'ac']]
[["ab", "cd", "ac"]] | applyMp(startswith("a") | deref()) | deref()
Also remember that the return result of ``f`` should not be a generator.
That's why in the example above, there's a ``deref()`` inside f.
One last thing. Remember to close all pools (using :meth:`clearPools`) so that
all child processes are terminated, and that resources are freed. Let's say if
you use CUDA tensors, but have not close all pools yet, then it is possible
that CUDA memory is not freed. I learned this the hard way.
:param prefetch: if not specified, schedules all jobs at the same time. If
specified, schedules jobs so that there'll only be a specified amount of
jobs, and will only schedule more if results are actually being used.
:param timeout: seconds to wait for job before raising an error
:param args: extra arguments to be passed to the function. ``kwargs`` too"""
super().__init__(); self.f = f; self.prefetch = prefetch or 1_000_000
self.timeout = timeout; self.args = args; self.kwargs = kwargs
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
super().__ror__(it); it = iter(it) # really make sure it's an iterator, for prefetch
self.p = p = mp.Pool(mp.cpu_count()*4//5)
applyMp._pools.add(p); timeout = self.timeout
common = dill.dumps([self.f, self.args, self.kwargs])
def gen():
fs = deque()
for i, line in zip(range(self.prefetch), it):
fs.append(p.apply_async(executeFunc, [common, dill.dumps(line)]))
for line in it:
yield fs.popleft().get(timeout)
fs.append(p.apply_async(executeFunc, [common, dill.dumps(line)]))
for f in fs: yield f.get(timeout)
return gen()
[docs] @staticmethod
def clearPools():
"""Terminate all existing pools. Do this before restarting/quitting the
script/notebook to make sure all resources (like GPU) are freed."""
for p in applyMp._pools:
try: p.terminate()
except: pass
applyMp._pools = set()
[docs] @staticmethod
def pools():
"""Get set of all pools. Meant for debugging purposes only."""
return applyMp._pools
[docs]class applyS(BaseCli):
[docs] def __init__(self, f:Callable[[T], T]):
"""Like :class:`apply`, but much simpler, just operating on the entire input
object, essentially. The "S" stands for "single". Example::
# returns 5
3 | applyS(lambda x: x+2)
Like :class:`apply`, you can also use this as a decorator like this::
@applyS
def f(x):
return x+2
# returns 5
3 | f
This also decorates the returned object so that it has same qualname, docstring
and whatnot."""
super().__init__(); self.f = f
update_wrapper(self, f)
[docs] def __ror__(self, it:T) -> T:
if settings["useCtx"]: super().__ror__(it)
return self.f(it)
[docs] def all(self):
return apply(self.f)
[docs]class apply(BaseCli):
[docs] def __init__(self, f:Callable[[str], str], column:int=None):
"""Applies a function f to every line.
Example::
# returns [0, 1, 4, 9, 16]
range(5) | apply(lambda x: x**2) | deref()
# returns [[3.0, 1.0, 1.0], [3.0, 1.0, 1.0]]
torch.ones(2, 3) | apply(lambda x: x+2, 0) | deref()
You can also use this as a decorator, like this::
@apply
def f(x):
return x**2
# returns [0, 1, 4, 9, 16]
range(5) | f | deref()
:param column: if not None, then applies the function to that column only"""
super().__init__();
self.f = f.f if isinstance(f, applyS) else f
self.column = column
[docs] def __ror__(self, it:Iterator[str]):
super().__ror__(it); f = self.f; c = self.column
if c is None: return (f(line) for line in it)
else: return ([(e if i != c else f(e))
for i, e in enumerate(row)] for row in it)
[docs]def applyMpBatched(f, bs=32, prefetch=2, timeout=5):
"""Pretty much the same as :class:`applyMp` and has the same feel to it
too. Iterator[A] goes in, Iterator[B] goes out, and you specify `f(A) -> B`.
However, this will launch jobs that will execute multiple f(), instead of
1 job per execution. All examples from :class:`applyMp` should work perfectly
here."""
return cli.batched(bs) | applyMp(apply(f) | cli.deref(True), prefetch, timeout) | cli.joinStreams()
[docs]def lstrip(column:int=None, char:str=None):
"""Strips left of every line.
Example::
# returns ['12 ', '34']
[" 12 ", " 34"] | lstrip() | deref()"""
return apply(lambda e: e.lstrip(char), column)
[docs]def rstrip(column:int=None, char:str=None):
"""Strips right of every line"""
return apply(lambda e: e.rstrip(char), column)
[docs]def strip(column:int=None, char:str=None):
"""Strips both sides of every line"""
return apply(lambda e: e.strip(char), column)
[docs]def upper(column:int=None):
"""Makes all characters uppercase.
Example::
# returns ['ABCDE', '123R']
["abcde", "123r"] | upper() | deref()"""
return apply(lambda e: e.upper(), column)
[docs]def lower(column:int=None):
"""Makes all characters lowercase"""
return apply(lambda e: e.lower(), column)
[docs]def replace(s:str, target:str=None, column:int=None):
"""Replaces substring `s` with `target` for each line.
Example::
# returns ['104', 'ab0c']
["1234", "ab23c"] | replace("23", "0") | deref()
:param target: if not specified, then use the default delimiter specified
in ``cliSettings``"""
t = patchDefaultDelim(target)
return apply(lambda e: e.replace(s, t), column)
[docs]def remove(s:str, column:int=None):
"""Removes a specific substring in each line."""
return replace(s, "", column)
def _op(toOp, c, force, defaultValue):
return apply(toOp, c) | (apply(lambda x: x or defaultValue, c) if force else (~cli.isValue(None, c)))
def _toFloat(e) -> Union[float, None]:
try: return float(e)
except: return None
[docs]def toFloat(*columns:List[int], force=False):
"""Converts every row into a float. Example::
# returns [1, 3, -2.3]
["1", "3", "-2.3"] | toFloat() | deref()
# returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']]
[["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | deref()
With weird rows::
# returns [[1.0, 'a'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0) | deref()
# returns [[1.0, 'a'], [0.0, 'b'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0, force=True) | deref()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param force: if True, forces weird values to 0.0, else filters out all weird rows"""
if len(columns) > 0:
return cli.init.serial(*(_op(_toFloat, c, force, 0.0) for c in columns))
else: return _op(_toFloat, None, force, 0.0)
def _toInt(e) -> Union[int, None]:
try: return int(float(e))
except: return None
[docs]def toInt(*columns:List[int], force=False):
"""Converts every row into an integer. Example::
# returns [1, 3, -2]
["1", "3", "-2.3"] | toInt() | deref()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param force: if True, forces weird values to 0, else filters out all weird rows
See also: :meth:`toFloat`"""
if len(columns) > 0:
return cli.init.serial(*(_op(_toInt, c, force, 0) for c in columns))
else: return _op(_toInt, None, force, 0)
[docs]class sort(BaseCli):
[docs] def __init__(self, column:int=0, numeric=True, reverse=False):
"""Sorts all lines based on a specific `column`.
Example::
# returns [[5, 'a'], [1, 'b']]
[[1, "b"], [5, "a"]] | ~sort(0) | deref()
# returns [[2, 3]]
[[1, "b"], [5, "a"], [2, 3]] | ~sort(1) | deref()
# errors out, as you can't really compare str with int
[[1, "b"], [2, 3], [5, "a"]] | sort(1, False) | deref()
:param column: if None, sort rows based on themselves and not an element
:param numeric: whether to convert column to float
:param reverse: False for smaller to bigger, True for bigger to smaller. Use
:meth:`__invert__` to quickly reverse the order instead of using this param"""
super().__init__()
self.column = column; self.reverse = reverse; self.numeric = numeric
self.filterF = (lambda x: float(x)) if numeric else (lambda x: x)
[docs] def __ror__(self, it:Iterator[str]):
super().__ror__(it); c = self.column
if c is None:
return it | cli.wrapList() | cli.transpose() | sort(0, self.numeric, self.reverse)
f = self.filterF
rows = list(it | cli.isNumeric(c) if self.numeric else it)
def sortF(row):
if len(row) > c: return f(row[c])
return float("inf")
return iter(sorted(rows, key=sortF, reverse=self.reverse))
[docs] def __invert__(self):
"""Creates a clone that has the opposite sort order"""
return sort(self.column, self.numeric, not self.reverse)
[docs]class sortF(BaseCli):
[docs] def __init__(self, f:Callable[[T], float], reverse=False):
"""Sorts rows using a function.
Example::
# returns ['a', 'aa', 'aaa', 'aaaa', 'aaaaa']
["a", "aaa", "aaaaa", "aa", "aaaa"] | sortF(lambda r: len(r)) | deref()
# returns ['aaaaa', 'aaaa', 'aaa', 'aa', 'a']
["a", "aaa", "aaaaa", "aa", "aaaa"] | ~sortF(lambda r: len(r)) | deref()"""
super().__init__(); self.f = f; self.reverse = reverse
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
super().__ror__(it)
return iter(sorted(list(it), key=self.f, reverse=self.reverse))
[docs] def __invert__(self) -> "sortF":
return sortF(self.f, not self.reverse)
[docs]class consume(BaseCli):
[docs] def __init__(self, f:Union[BaseCli, Callable[[T], None]]):
r"""Consumes the iterator in a side stream. Returns the iterator.
Kinda like the bash command ``tee``. Example::
# prints "0\n1\n2" and returns [0, 1, 2]
range(3) | consume(headOut()) | toList()
# prints "range(0, 3)" and returns [0, 1, 2]
range(3) | consume(lambda it: print(it)) | toList()
This is useful whenever you want to mutate something, but don't want to
include the function result into the main stream."""
super().__init__(); self.f = f
[docs] def __ror__(self, it:T) -> T:
super().__ror__(it); self.f(it); return it
[docs]class randomize(BaseCli):
[docs] def __init__(self, bs=100):
"""Randomize input stream. In order to be efficient, this does not
convert the input iterator to a giant list and yield random values from that.
Instead, this fetches ``bs`` items at a time, randomizes them, returns and
fetch another ``bs`` items. If you want to do the giant list, then just pass
in ``float("inf")``, or ``None``. Example::
# returns [0, 1, 2, 3, 4], effectively no randomize at all
range(5) | randomize(1) | deref()
# returns something like this: [1, 0, 2, 3, 5, 4, 6, 8, 7, 9]. You can clearly see the batches
range(10) | randomize(3) | deref()
# returns something like this: [7, 0, 5, 2, 4, 9, 6, 3, 1, 8]
range(10) | randomize(float("inf")) | deref()
# same as above
range(10) | randomize(None) | deref()"""
super().__init__(); self.bs = bs if bs != None else float("inf")
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
super().__ror__(it)
for batch in it | cli.batched(self.bs, True):
batch = list(batch); perms = torch.randperm(len(batch))
for idx in perms: yield batch[idx]
class StaggeredStream:
def __init__(self, stream:Iterator[T], every:int):
"""Not intended to be instantiated by the end user. Use :class:`stagger`
instead."""
self.stream = stream; self.every = every
def __iter__(self):
for i, v in zip(range(self.every), self.stream): yield v
def __len__(self):
"""Length of window (length of result if you were to deref it)."""
return self.every
[docs]class stagger(BaseCli):
"""Staggers input stream into multiple stream "windows" placed serially. Best
explained with an example::
o = range(10) | stagger(3)
o | deref() # returns [0, 1, 2], 1st "window"
o | deref() # returns [3, 4, 5], 2nd "window"
o | deref() # returns [6, 7, 8]
o | deref() # returns [9]
o | deref() # returns []
This might be useful when you're constructing a data loader::
dataset = [range(20), range(30, 50)] | transpose()
dl = dataset | batched(3) | (transpose() | toTensor()).all() | stagger(4)
for epoch in range(3):
for xb, yb in dl: # looping over a window
print(epoch)
# then something like: model(xb)
The above code will print 6 lines. 4 of them is "0" (because we stagger every 4
batches), and xb's shape' will be (3,) (because we batched every 3 samples).
You should also keep in mind that this doesn't really change the property of the
stream itself. Essentially, treat these pairs of statement as being the same thing::
o = range(11, 100)
# both returns 11
o | stagger(20) | item()
o | item()
# both returns [11, 12, ..., 20]
o | head(10) | deref()
o | stagger(20) | head(10) | deref()
Lastly, multiple iterators might be getting values from the same stream window,
meaning::
o = range(11, 100) | stagger(10)
it1 = iter(o); it2 = iter(o)
next(it1) # returns 11
next(it2) # returns 12
This may or may not be desirable. Also this should be obvious, but I want to
mention this in case it's not clear to you."""
def __init__(self, every:int):
self.every = int(every)
[docs] def __ror__(self, it:Iterator[T]) -> StaggeredStream:
return StaggeredStream(iter(it), self.every)
[docs]class op(k1lib.Absorber, BaseCli):
"""Absorbs operations done on it and applies it on the stream. Based
on :class:`~k1lib.Absorber`. Example::
t = torch.tensor([[1, 2, 3], [4, 5, 6.0]])
# returns [torch.tensor([[4., 5., 6., 7., 8., 9.]])]
[t] | (op() + 3).view(1, -1).all() | deref(True)
Basically, you can treat ``op()`` as the input tensor. Tbh, you
can do the same thing with this::
[t] | applyS(lambda t: (t+3).view(-1, 1)).all() | deref(True)
But that's kinda long and may not be obvious. This can be surprisingly resilient, as
you can still combine with other cli tools as usual, for example::
# returns [2, 3], demonstrating "&" operator
torch.randn(2, 3) | (op().shape & identity()) | deref(True) | item()
a = torch.tensor([[1, 2, 3], [7, 8, 9]])
# returns torch.tensor([4, 5, 6]), demonstrating "+" operator for clis and not clis
(a | op() + 3 + identity() | item() == torch.tensor([4, 5, 6])).all()
# returns [[3], [3]], demonstrating .all() and "|" serial chaining
torch.randn(2, 3) | (op().shape.all() | deref())"""
[docs] def __ror__(self, it):
return self.ab_operate(it)
def __or__(self, o):
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__or__(o)
return super().__add__(o)
def __add__(self, o):
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__add__(o)
return super().__add__(o)
def __and__(self, o):
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__and__(o)
return super().__and__(o)