# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for quick modifiers, think of them as changing formats
"""
__all__ = ["applyS", "aS", "apply", "applyMp", "parallel",
"applyTh", "applySerial",
"toFloat", "toInt",
"sort", "sortF", "consume", "randomize", "stagger", "op",
"integrate"]
from typing import Callable, Iterator, Any, Union, List
from k1lib.cli.init import patchDefaultDelim, BaseCli, T, fastF
import k1lib.cli as cli, numpy as np, torch, threading
import torch.multiprocessing as mp; from collections import deque
from functools import partial, update_wrapper, lru_cache
import dill, pickle, k1lib, warnings, atexit, signal, time, os, random
[docs]class applyS(BaseCli):
[docs] def __init__(self, f:Callable[[T], T], **kwargs):
"""Like :class:`apply`, but much simpler, just operating on the entire input
object, essentially. The "S" stands for "single". There's
also an alias shorthand for this called :class:`aS`. Example::
# returns 5
3 | aS(lambda x: x+2)
Like :class:`apply`, you can also use this as a decorator like this::
@aS
def f(x):
return x+2
# returns 5
3 | f
This also decorates the returned object so that it has same qualname, docstring
and whatnot.
:param f: the function to be executed
:param kwargs: other keyword arguments to pass to the function"""
super().__init__(fs=[f]); self.kwargs = kwargs
self.f = f; update_wrapper(self, f, updated=())
[docs] def __ror__(self, it:T) -> T: return self.f(it, **self.kwargs)
[docs] def __invert__(self):
"""Configures it so that it expand the arguments out.
Example::
# returns 5
[2, 3] | ~aS(lambda x, y: x + y)
def f(x, y, a=4):
return x*y + a
# returns 10
[2, 3] | ~aS(f)
# returns 11
[2, 3] | ~aS(f, a=5)"""
f = self.f; kw = self.kwargs; return applyS(lambda x: f(*x, **kw));
aS = applyS
[docs]class apply(BaseCli):
[docs] def __init__(self, f:Callable[[T], T], column:int=None, cache:int=0):
"""Applies a function f to every line.
Example::
# returns [0, 1, 4, 9, 16]
range(5) | apply(lambda x: x**2) | deref()
# returns [[3.0, 1.0, 1.0], [3.0, 1.0, 1.0]]
torch.ones(2, 3) | apply(lambda x: x+2, 0) | deref()
You can also use this as a decorator, like this::
@apply
def f(x):
return x**2
# returns [0, 1, 4, 9, 16]
range(5) | f | deref()
You can also add a cache, like this::
def calc(i): time.sleep(0.5); return i**2
# takes 2.5s
range(5) | repeatFrom(2) | apply(calc, cache=10) | deref()
# takes 5s
range(5) | repeatFrom(2) | apply(calc) | deref()
:param column: if not None, then applies the function to that column only
:param cache: if specified, then caches this much number of values"""
super().__init__(fs=[f]);
self.f = f; self.column = column; self._fC = fastF(f)
if cache > 0: self._fC = lru_cache(cache)(self._fC)
[docs] def __ror__(self, it:Iterator[str]):
c = self.column; f = self._fC
if c is None: return (f(line) for line in it)
else: return ([(e if i != c else f(e))
for i, e in enumerate(row)] for row in it)
def executeFunc(common, line):
import dill
f, kwargs = dill.loads(common)
return f(dill.loads(line), **kwargs)
def terminateGraceful(): signal.signal(signal.SIGINT, signal.SIG_IGN)
[docs]class applyMp(BaseCli):
_pools = set()
[docs] def __init__(self, f:Callable[[T], T], prefetch:int=None, timeout:float=8, utilization:float=0.8, bs:int=1, **kwargs):
"""Like :class:`apply`, but execute ``f(row)`` of each row in
multiple processes. Example::
# returns [3, 2]
["abc", "de"] | applyMp(lambda s: len(s)) | deref()
# returns [5, 6, 9]
range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | deref()
# returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work
someList = [1, 2, 3]
["abc", "de"] | applyMp(lambda s: someList) | deref()
Internally, this will continuously spawn new jobs up until 80% of all CPU
cores are utilized. On posix systems, the default multiprocessing start method is
``fork()``. This sort of means that all the variables in memory will be copied
over. This might be expensive (might also not, with copy-on-write), so you might
have to think about that. On windows and macos, the default start method is
``spawn``, meaning each child process is a completely new interpreter, so you have
to pass in all required variables and reimport every dependencies. Read more at
https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
If you don't wish to schedule all jobs at once, you can specify a ``prefetch``
amount, and it will only schedule that much jobs ahead of time. Example::
range(10000) | applyMp(lambda x: x**2) | head() | deref() # 700ms
range(10000) | applyMp(lambda x: x**2, 5) | head() | deref() # 300ms
# demonstrating there're no huge penalties even if we want all results at the same time
range(10000) | applyMp(lambda x: x**2) | deref() # 900ms
range(10000) | applyMp(lambda x: x**2, 5) | deref() # 1000ms
The first line will schedule all jobs at once, and thus will require more RAM and
compute power, even though we discard most of the results anyway (the
:class:`~k1lib.cli.filt.head` cli). The second line only schedules 5 jobs ahead of
time, and thus will be extremely more efficient if you don't need all results right
away.
.. note::
Remember that every :class:`~k1lib.cli.init.BaseCli` is also a
function, meaning that you can do stuff like::
# returns [['ab', 'ac']]
[["ab", "cd", "ac"]] | applyMp(filt(op().startswith("a")) | deref()) | deref()
Also remember that the return result of ``f`` should not be a generator.
That's why in the example above, there's a ``deref()`` inside f.
Most of the time, you would probably want to specify ``bs`` to something bigger than 1
(may be 32 or sth like that). This will executes ``f`` multiple times in a single job,
instead of executing ``f`` only once per job. Should reduce overhead of process
creation dramatically.
If you encounter strange errors not seen on :class:`apply`, you can try to clear all
pools (using :meth:`clearPools`), to terminate all child processes and thus free
resources. On earlier versions, you have to do this manually before exiting, but now
:class:`applyMp` is much more robust.
Also, you should not immediately assume that :class:`applyMp` will always be faster
than :class:`apply`. Remember that :class:`applyMp` will create new processes,
serialize and transfer data to them, execute it, then transfer data back. If your code
transfers a lot of data back and forth (compared to the amount of computation done), or
the child processes don't have a lot of stuff to do before returning, it may very well
be a lot slower than :class:`apply`.
:param prefetch: if not specified, schedules all jobs at the same time. If
specified, schedules jobs so that there'll only be a specified amount of
jobs, and will only schedule more if results are actually being used.
:param timeout: seconds to wait for job before raising an error
:param utilization: how many percent cores are we running? 0 for no cores, 1 for
all the cores. Defaulted to 0.8
:param bs: if specified, groups ``bs`` number of transforms into 1 job to be more
efficient.
:param kwargs: extra arguments to be passed to the function. ``args`` not
included as there're a couple of options you can pass for this cli."""
super().__init__(fs=[f]); self.f = fastF(f)
self.prefetch = prefetch or 1_000_000
self.timeout = timeout; self.utilization = utilization
self.bs = bs; self.kwargs = kwargs
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
timeout = self.timeout; it = iter(it) # really make sure it's an iterator, for prefetch
if self.bs > 1:
return it | cli.batched(self.bs, True) | applyMp(apply(self.f) | cli.deref(), self.prefetch, timeout, **self.kwargs) | cli.joinStreams()
os.environ["py_k1lib_in_applyMp"] = "True"
self.p = p = mp.Pool(int(mp.cpu_count()*self.utilization), terminateGraceful)
applyMp._pools.add(p); common = dill.dumps([self.f, self.kwargs])
def gen():
try:
fs = deque()
for i, line in zip(range(self.prefetch), it):
fs.append(p.apply_async(executeFunc, [common, dill.dumps(line)]))
for line in it:
yield fs.popleft().get(timeout)
fs.append(p.apply_async(executeFunc, [common, dill.dumps(line)]))
for f in fs: yield f.get(timeout)
except KeyboardInterrupt as e:
print("applyMp interrupted. Terminating pool now")
self.p.terminate(); applyMp._pools.remove(self.p); raise e
except Exception as e:
print("applyMp encounter errors. Terminating pool now")
self.p.terminate(); applyMp._pools.remove(self.p); raise e
else: self.p.terminate(); applyMp._pools.remove(self.p)
return gen()
[docs] @staticmethod
def clearPools():
"""Terminate all existing pools. Do this before restarting/quitting the
script/notebook to make sure all resources (like GPU) are freed. **Update**:
you probably won't have to call this manually anymore since version 0.9, but
if you run into problems, try doing this."""
for p in applyMp._pools:
try: p.terminate()
except: pass
applyMp._pools = set()
[docs] @staticmethod
def pools():
"""Get set of all pools. Meant for debugging purposes only."""
return applyMp._pools
def __del__(self):
if hasattr(self, "p"):
self.p.terminate();
if self.p in applyMp._pools: applyMp._pools.remove(self.p)
# apparently, this doesn't do anything, at least in jupyter environment
atexit.register(lambda: applyMp.clearPools())
parallel = applyMp
thEmptySentinel = object()
[docs]class applyTh(BaseCli):
[docs] def __init__(self, f, prefetch:int=2, timeout:float=5, bs:int=1):
"""Kinda like the same as :class:`applyMp`, but executes ``f`` on multiple
threads, instead of on multiple processes. Advantages:
- Relatively low overhead for thread creation
- Fast, if ``f`` is io-bound
- Does not have to serialize and deserialize the result, meaning iterators can be
exchanged
Disadvantages:
- Still has thread creation overhead, so it's still recommended to specify ``bs``
- Is slow if ``f`` has to obtain the GIL to be able to do anything
All examples from :class:`applyMp` should work perfectly here."""
fs = [f]; super().__init__(fs=fs); self.f = fs[0]; self.bs = bs
self.prefetch = prefetch; self.timeout = timeout
[docs] def __ror__(self, it):
if self.bs > 1:
yield from (it | cli.batched(self.bs, True) | applyTh(apply(self.f), self.prefetch, self.timeout) | cli.joinStreams()); return
datas = deque(); it = iter(it)
innerF = fastF(self.f); timeout = self.timeout
def f(line, wrapper): wrapper.value = innerF(line)
for _, line in zip(range(self.prefetch), it):
w = k1lib.Wrapper(thEmptySentinel)
t = threading.Thread(target=f, args=(line,w))
t.start(); datas.append((t, w))
for line in it:
data = datas.popleft(); data[0].join(timeout)
if data[1].value is thEmptySentinel:
for data in datas: data[0].join(0.01)
raise RuntimeError("Thread timed out!")
yield data[1].value; w = k1lib.Wrapper(thEmptySentinel)
t = threading.Thread(target=f, args=(line,w))
t.start(); datas.append((t, w))
for i in range(len(datas)): # do it this way so that python can remove threads early, due to ref counting
data = datas.popleft(); data[0].join(timeout)
if data[1].value is thEmptySentinel:
for data in datas: data[0].join(0.01)
raise RuntimeError("Thread timed out!")
yield data[1].value
[docs]class applySerial(BaseCli):
[docs] def __init__(self, f, includeFirst=False, unpack=False):
"""Applies a function repeatedly. First yields input iterator ``x``. Then
yields ``f(x)``, then ``f(f(x))``, then ``f(f(f(x)))`` and so on. Example::
# returns [4, 8, 16, 32, 64]
2 | applySerial(op()*2) | head(5) | deref()
If the result of your operation is an iterator, you might want to
:class:`~k1lib.cli.utils.deref` it, like this::
rs = iter(range(8)) | applySerial(rows()[::2])
# returns [0, 2, 4, 6]
next(rs) | deref()
# returns []. This is because all the elements are taken by the previous deref()
next(rs) | deref()
# returns [[10, -6], [4, 16], [20, -12]]
[2, 8] | applySerial(lambda a, b: (a + b, a - b), unpack=True) | head(3) | deref()
rs = iter(range(8)) | applySerial(rows()[::2] | deref())
# returns [0, 2, 4, 6]
next(rs)
# returns [0, 4]
next(rs)
# returns [0]
next(rs)
:param f: function to apply repeatedly
:param includeFirst: whether to include the raw input value or not
:param unpack: whether to unpack values into the function or not for
aesthetic purposes"""
fs = [f]; super().__init__(fs=fs); self.f = fs[0]
self.includeFirst = includeFirst; self.unpack = unpack
[docs] def __ror__(self, it):
f = fastF(self.f)
if self.unpack:
if not self.includeFirst: it = f(*it)
while True: yield it; it = f(*it)
else:
if not self.includeFirst: it = f(it)
while True: yield it; it = f(it)
def _toop(toOp, c, force, defaultValue):
return apply(toOp, c) | (apply(lambda x: x or defaultValue, c) if force else cli.filt(cli.op() != None, c))
def _toFloat(e) -> Union[float, None]:
try: return float(e)
except: return None
[docs]class toFloat(BaseCli):
[docs] def __init__(self, *columns, mode=2):
"""Converts every row into a float. Example::
# returns [1, 3, -2.3]
["1", "3", "-2.3"] | toFloat() | deref()
# returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']]
[["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | deref()
With weird rows::
# returns [[1.0, 'a'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0) | deref()
# returns [[1.0, 'a'], [0.0, 'b'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0, force=True) | deref()
This also works well with :class:`torch.Tensor` and :class:`numpy.ndarray`,
as they will not be broken up into an iterator::
# returns a numpy array, instead of an iterator
np.array(range(10)) | toFloat()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param mode: different conversion styles
- 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed
- 1: if there are errors, then replace it with zero
- 2: if there are errors, then eliminate the row"""
self.columns = columns; self.mode = mode;
[docs] def __ror__(self, it):
columns = self.columns; mode = self.mode
if len(columns) == 0:
if isinstance(it, np.ndarray): return it.astype(float)
if isinstance(it, torch.Tensor): return it.float()
if mode == 0: return it | apply(float)
return it | _toop(_toFloat, None, mode == 1, 0.0)
else: return it | cli.init.serial(*(_toop(_toFloat, c, mode == 1, 0.0) for c in columns))
def _toInt(e) -> Union[int, None]:
try: return int(float(e))
except: return None
[docs]class toInt(BaseCli):
[docs] def __init__(self, *columns, mode=2):
"""Converts every row into an integer. Example::
# returns [1, 3, -2]
["1", "3", "-2.3"] | toInt() | deref()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param mode: different conversion styles
- 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed
- 1: if there are errors, then replace it with zero
- 2: if there are errors, then eliminate the row
See also: :meth:`toFloat`"""
self.columns = columns; self.mode = mode;
[docs] def __ror__(self, it):
columns = self.columns; mode = self.mode
if len(columns) == 0:
if isinstance(it, np.ndarray): return it.astype(int)
if isinstance(it, torch.Tensor): return it.int()
if mode == 0: return it | apply(int)
return it | _toop(_toInt, None, mode == 1, 0.0)
else: return it | cli.init.serial(*(_toop(_toInt, c, mode == 1, 0.0) for c in columns))
[docs]class sort(BaseCli):
[docs] def __init__(self, column:int=0, numeric=True, reverse=False):
"""Sorts all lines based on a specific `column`.
Example::
# returns [[5, 'a'], [1, 'b']]
[[1, "b"], [5, "a"]] | ~sort(0) | deref()
# returns [[2, 3]]
[[1, "b"], [5, "a"], [2, 3]] | ~sort(1) | deref()
# errors out, as you can't really compare str with int
[[1, "b"], [2, 3], [5, "a"]] | sort(1, False) | deref()
# returns [-1, 2, 3, 5, 8]
[2, 5, 3, -1, 8] | sort(None) | deref()
:param column: if None, sort rows based on themselves and not an element
:param numeric: whether to convert column to float
:param reverse: False for smaller to bigger, True for bigger to smaller. Use
:meth:`__invert__` to quickly reverse the order instead of using this param"""
self.column = column; self.reverse = reverse; self.numeric = numeric
self.filterF = (lambda x: float(x)) if numeric else (lambda x: x)
[docs] def __ror__(self, it:Iterator[str]):
c = self.column
if c is None:
return it | cli.wrapList() | cli.transpose() | sort(0, self.numeric, self.reverse) | cli.op()[0].all()
f = self.filterF
rows = (it | cli.isNumeric(c) if self.numeric else it) | cli.deref(maxDepth=2)
def sortF(row):
if len(row) > c: return f(row[c])
return float("inf")
return iter(sorted(rows, key=sortF, reverse=self.reverse))
[docs] def __invert__(self):
"""Creates a clone that has the opposite sort order"""
return sort(self.column, self.numeric, not self.reverse)
[docs]class sortF(BaseCli):
[docs] def __init__(self, f:Callable[[T], float], reverse=False):
"""Sorts rows using a function.
Example::
# returns ['a', 'aa', 'aaa', 'aaaa', 'aaaaa']
["a", "aaa", "aaaaa", "aa", "aaaa"] | sortF(lambda r: len(r)) | deref()
# returns ['aaaaa', 'aaaa', 'aaa', 'aa', 'a']
["a", "aaa", "aaaaa", "aa", "aaaa"] | ~sortF(lambda r: len(r)) | deref()"""
fs = [f]; super().__init__(fs=fs); self.f = fs[0]; self.reverse = reverse
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
return iter(sorted(list(it), key=self.f, reverse=self.reverse))
[docs] def __invert__(self) -> "sortF":
return sortF(self.f, not self.reverse)
[docs]class consume(BaseCli):
[docs] def __init__(self, f:Union[BaseCli, Callable[[T], None]]):
r"""Consumes the iterator in a side stream. Returns the iterator.
Kinda like the bash command ``tee``. Example::
# prints "0\n1\n2" and returns [0, 1, 2]
range(3) | consume(headOut()) | toList()
# prints "range(0, 3)" and returns [0, 1, 2]
range(3) | consume(lambda it: print(it)) | toList()
This is useful whenever you want to mutate something, but don't want to
include the function result into the main stream."""
fs = [f]; super().__init__(fs=fs); self.f = fs[0]
[docs] def __ror__(self, it:T) -> T:
self.f(it); return it
[docs]class randomize(BaseCli):
[docs] def __init__(self, bs=100, seed=None):
"""Randomize input stream. In order to be efficient, this does not
convert the input iterator to a giant list and yield random values from that.
Instead, this fetches ``bs`` items at a time, randomizes them, returns and
fetch another ``bs`` items. If you want to do the giant list, then just pass
in ``float("inf")``, or ``None``. Example::
# returns [0, 1, 2, 3, 4], effectively no randomize at all
range(5) | randomize(1) | deref()
# returns something like this: [1, 0, 2, 3, 5, 4, 6, 8, 7, 9]. You can clearly see the batches
range(10) | randomize(3) | deref()
# returns something like this: [7, 0, 5, 2, 4, 9, 6, 3, 1, 8]
range(10) | randomize(float("inf")) | deref()
# same as above
range(10) | randomize(None) | deref()
# returns True, as the seed is the same
range(10) | randomize(seed=4) | deref() == range(10) | randomize(seed=4) | deref()"""
self.bs = bs if bs != None else float("inf")
r = random.Random(seed)
self.gen = torch.Generator().manual_seed(r.getrandbits(63))
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
for batch in it | cli.batched(self.bs, True):
batch = list(batch); perms = torch.randperm(len(batch), generator=self.gen)
for idx in perms: yield batch[idx]
class StaggeredStream:
def __init__(self, stream:Iterator[T], every:int):
"""Not intended to be instantiated by the end user. Use :class:`stagger`
instead."""
self.stream = stream; self.every = every
def __iter__(self):
for i, v in zip(range(self.every), self.stream): yield v
def __len__(self):
"""Length of window (length of result if you were to deref it)."""
return self.every
[docs]class stagger(BaseCli):
[docs] def __init__(self, every:int):
"""Staggers input stream into multiple stream "windows" placed serially. Best
explained with an example::
o = range(10) | stagger(3)
o | deref() # returns [0, 1, 2], 1st "window"
o | deref() # returns [3, 4, 5], 2nd "window"
o | deref() # returns [6, 7, 8]
o | deref() # returns [9]
o | deref() # returns []
This might be useful when you're constructing a data loader::
dataset = [range(20), range(30, 50)] | transpose()
dl = dataset | batched(3) | (transpose() | toTensor()).all() | stagger(4)
for epoch in range(3):
for xb, yb in dl: # looping over a window
print(epoch)
# then something like: model(xb)
The above code will print 6 lines. 4 of them is "0" (because we stagger every 4
batches), and xb's shape' will be (3,) (because we batched every 3 samples).
You should also keep in mind that this doesn't really change the property of the
stream itself. Essentially, treat these pairs of statement as being the same thing::
o = range(11, 100)
# both returns 11
o | stagger(20) | item()
o | item()
# both returns [11, 12, ..., 20]
o | head(10) | deref()
o | stagger(20) | head(10) | deref()
Lastly, multiple iterators might be getting values from the same stream window,
meaning::
o = range(11, 100) | stagger(10)
it1 = iter(o); it2 = iter(o)
next(it1) # returns 11
next(it2) # returns 12
This may or may not be desirable. Also this should be obvious, but I want to
mention this in case it's not clear to you."""
self.every = int(every)
[docs] def __ror__(self, it:Iterator[T]) -> StaggeredStream:
return StaggeredStream(iter(it), self.every)
[docs] @staticmethod
def tv(every:int, ratio:float=0.8):
"""Convenience method to quickly stagger train and valid datasets.
Example::
# returns [[16], [4]]
[range(100)]*2 | stagger.tv(20) | shape().all() | deref()"""
return stagger(round(every*ratio)) + stagger(round(every*(1-ratio)))
[docs]class op(k1lib.Absorber, BaseCli):
_op_contains = deque([], 1) # last op in the form `4 in op()`
_op_contains_inv = deque([], 1) # last op in the form `op() in [1, 2, 3]`
[docs] def __init__(self):
"""Absorbs operations done on it and applies it on the stream. Based
on :class:`~k1lib.Absorber`. Example::
t = torch.tensor([[1, 2, 3], [4, 5, 6.0]])
# returns [torch.tensor([[4., 5., 6., 7., 8., 9.]])]
[t] | (op() + 3).view(1, -1).all() | deref()
Basically, you can treat ``op()`` as the input tensor. Tbh, you
can do the same thing with this::
[t] | applyS(lambda t: (t+3).view(-1, 1)).all() | deref()
But that's kinda long and may not be obvious. This can be surprisingly resilient, as
you can still combine with other cli tools as usual, for example::
# returns [2, 3], demonstrating "&" operator
torch.randn(2, 3) | (op().shape & iden()) | deref() | item()
a = torch.tensor([[1, 2, 3], [7, 8, 9]])
# returns torch.tensor([4, 5, 6]), demonstrating "+" operator for clis and not clis
(a | op() + 3 + iden() | item() == torch.tensor([4, 5, 6])).all()
# returns [[3], [3]], demonstrating .all() and "|" serial chaining
torch.randn(2, 3) | (op().shape.all() | deref())
# returns [[8, 18], [9, 19]], demonstrating you can treat `op()` as a regular function
[range(10), range(10, 20)] | transpose() | filt(op() > 7, 0) | deref()
This can only deal with simple operations only. For complex operations, resort
to the longer version ``applyS(lambda x: ...)`` instead!
Performance-wise, there are some, but not a lot of degradation, so don't worry
about it. Simple operations executes pretty much on par with native lambdas::
n = 10_000_000
# takes 1.48s
for i in range(n): i**2
# takes 1.89s, 1.28x worse than for loop
range(n) | apply(lambda x: x**2) | ignore()
# takes 1.86s, 1.26x worse than for loop
range(n) | apply(op()**2) | ignore()
# takes 1.86s
range(n) | (op()**2).all() | ignore()
More complex operations can take more of a hit::
# takes 1.66s
for i in range(n): i**2-3
# takes 2.02s, 1.22x worse than for loop
range(n) | apply(lambda x: x**2-3) | ignore()
# takes 2.81s, 1.69x worse than for loop
range(n) | apply(op()**2-3) | ignore()
Experimental features:
# returns [2, 3, 4]
range(10) | filt(op() in range(2, 5)) | deref()
# returns [0, 1, 5, 6, 7, 8, 9]
range(10) | ~filt(op() in range(2, 5)) | deref()
# will not work, so if your set potentially does not have any element, then don't use op()
range(10) | filt(op() in []) | deref()
Reserved operations that are not absorbed are:
- all
- __ror__ (__or__ still works!)
- op_solidify"""
super().__init__({"_op_solidified": False, "_op_in_set": set()})
[docs] def op_solidify(self):
"""Use this to not absorb ``__call__`` operations anymore and makes it
feel like a regular function (still absorbs other operations though)::
f = op()**2
3 | f # returns 9, but may be you don't want to pipe it in
f.op_solidify()
f(3) # returns 9"""
self._ab_sentinel = True
self._op_solidified = True
self._ab_sentinel = False
return self
[docs] def __ror__(self, it):
return self.ab_operate(it)
def __or__(self, o):
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__or__(o)
return super().__add__(o)
def __add__(self, o):
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__add__(o)
return super().__add__(o)
def __and__(self, o):
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__and__(o)
return super().__and__(o)
def __call__(self, *args, **kwargs):
if self._op_solidified: return self.ab_operate(*args, **kwargs)
return super().__call__(*args, **kwargs)
def __contains__(self, k):
cli.op._op_contains.append(self.ab_contains(k)); return True
@staticmethod
def _op_pop_contains(): # mechanism for `4 in op()`
s = "Supposed to be unreachable, tried to recover `4 in op()`, but couldn't find any."
if len(cli.op._op_contains) == 0:
raise RuntimeError(f"""{s} May be you're doing something like `filt(op() not in [1, 2, 3])`? Use `~filt(op() in [1, 2, 3])` instead!""")
return cli.op._op_contains.pop()
@staticmethod
def _op_pop_contains_inv(): # mechanism for `op() in [1, 2, 3]`
s = "Supposed to be unreachable, tried to recover `op() in [1, 2, 3]`, but couldn't find any."
if len(cli.op._op_contains_inv) == 0:
if len(cli.op._op_contains) > 0:
raise RuntimeError(f"""{s} May be you're doing something like `filt(4 not in op())`? Use `~filt(4 in op())` instead!""")
else: raise RuntimeError(f"{s} Seems like you're doing something like `filt(op() in [])`? Because of complicated reasons, the set can't be empty. Just use vanilla `inSet([])` instead")
return op._op_contains_inv.pop()
def __bool__(self): # pops last eq from the operators, mechanism for `op() in [1, 2, 3]`
step = self._ab_steps[-1]; _op_in_set = self._op_in_set
if step[0][0] != "__eq__": raise RuntimeError(f"Supposed to be unreachable, tried to pop last __eq__ step from _ab_steps, but it's actually a {step[0][0]} operator instead")
self._op_in_set.add(step[0][1]); self._ab_steps.pop()
if len(_op_in_set) == 1 and (len(self._ab_steps) == 0 or self._ab_steps[-1][0][0] != "<in set>"):
self._ab_steps.append([["<in set>", _op_in_set], lambda x: x in _op_in_set])
cli.op._op_contains_inv.append(self); return False
cli.op = op
[docs]class integrate(BaseCli):
[docs] def __init__(self, dt=1):
"""Integrates the input.
Example::
# returns [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]
range(10) | integrate() | deref()
# returns [0, 2, 6, 12, 20, 30, 42, 56, 72, 90]
range(10) | integrate(2) | deref()
:param dt: Optional small step"""
self.dt = dt
[docs] def __ror__(self, it):
if self.dt == 1:
s = 0
for e in it: s += e; yield s
else:
dt = self.dt; s = 0
for e in it: s += e*dt; yield s