# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for quick modifiers, think of them as changing formats
"""
__all__ = ["applyS", "aS", "apply", "applyMp", "parallel",
"applyTh", "applySerial",
"sort", "sortF", "consume", "randomize", "stagger", "op",
"integrate"]
from typing import Callable, Iterator, Any, Union, List
from k1lib.cli.init import patchDefaultDelim, BaseCli, T, fastF
import k1lib.cli as cli, numpy as np, torch, threading
import torch.multiprocessing as mp; from collections import deque
from functools import partial, update_wrapper, lru_cache
import dill, pickle, k1lib, warnings, atexit, signal, time, os, random
[docs]class applyS(BaseCli):
[docs] def __init__(self, f:Callable[[T], T], *args, **kwargs):
"""Like :class:`apply`, but much simpler, just operating on the entire input
object, essentially. The "S" stands for "single". There's
also an alias shorthand for this called :class:`aS`. Example::
# returns 5
3 | aS(lambda x: x+2)
Like :class:`apply`, you can also use this as a decorator like this::
@aS
def f(x):
return x+2
# returns 5
3 | f
This also decorates the returned object so that it has same qualname, docstring
and whatnot.
:param f: the function to be executed
:param kwargs: other keyword arguments to pass to the function, together with ``args``"""
super().__init__(fs=[f]); self.args = args; self.kwargs = kwargs
self.f = f; update_wrapper(self, f, updated=())
[docs] def __ror__(self, it:T) -> T: return self.f(it, *self.args, **self.kwargs)
[docs] def __invert__(self):
"""Configures it so that it expand the arguments out.
Example::
# returns 5
[2, 3] | ~aS(lambda x, y: x + y)
def f(x, y, a=4):
return x*y + a
# returns 10
[2, 3] | ~aS(f)
# returns 11
[2, 3] | ~aS(f, a=5)"""
f = self.f; a = self.args; kw = self.kwargs; return applyS(lambda x: f(*x, *a, **kw));
aS = applyS
[docs]class apply(BaseCli):
[docs] def __init__(self, f:Callable[[T], T], column:int=None, cache:int=0):
"""Applies a function f to every line.
Example::
# returns [0, 1, 4, 9, 16]
range(5) | apply(lambda x: x**2) | deref()
# returns [[3.0, 1.0, 1.0], [3.0, 1.0, 1.0]]
torch.ones(2, 3) | apply(lambda x: x+2, 0) | deref()
You can also use this as a decorator, like this::
@apply
def f(x):
return x**2
# returns [0, 1, 4, 9, 16]
range(5) | f | deref()
You can also add a cache, like this::
def calc(i): time.sleep(0.5); return i**2
# takes 2.5s
range(5) | repeatFrom(2) | apply(calc, cache=10) | deref()
# takes 5s
range(5) | repeatFrom(2) | apply(calc) | deref()
:param column: if not None, then applies the function to that column only
:param cache: if specified, then caches this much number of values"""
super().__init__(fs=[f]); self.f = f;
self.column = column; self.cache = cache; self._fC = fastF(f)
if cache > 0: self._fC = lru_cache(cache)(self._fC)
[docs] def __ror__(self, it:Iterator[str]):
c = self.column; f = self._fC
if c is None: return (f(line) for line in it)
else: return ([(e if i != c else f(e))
for i, e in enumerate(row)] for row in it)
[docs] def __invert__(self):
"""Same mechanism as in :class:`applyS`, it expands the
arguments out. Just for convenience really. Example::
# returns [10, 12, 14, 16, 18]
[range(5), range(10, 15)] | transpose() | ~apply(lambda x, y: x+y) | deref()"""
return apply(lambda x: self.f(*x), self.column, self.cache)
def executeFunc(common, line):
import dill
f, kwargs = dill.loads(common)
return f(dill.loads(line), **kwargs)
def terminateGraceful(): signal.signal(signal.SIGINT, signal.SIG_IGN)
[docs]class applyMp(BaseCli):
_pools = set()
[docs] def __init__(self, f:Callable[[T], T], prefetch:int=None, timeout:float=8, utilization:float=0.8, bs:int=1, **kwargs):
"""Like :class:`apply`, but execute ``f(row)`` of each row in
multiple processes. Example::
# returns [3, 2]
["abc", "de"] | applyMp(lambda s: len(s)) | deref()
# returns [5, 6, 9]
range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | deref()
# returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work
someList = [1, 2, 3]
["abc", "de"] | applyMp(lambda s: someList) | deref()
Internally, this will continuously spawn new jobs up until 80% of all CPU
cores are utilized. On posix systems, the default multiprocessing start method is
``fork()``. This sort of means that all the variables in memory will be copied
over. This might be expensive (might also not, with copy-on-write), so you might
have to think about that. On windows and macos, the default start method is
``spawn``, meaning each child process is a completely new interpreter, so you have
to pass in all required variables and reimport every dependencies. Read more at
https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
If you don't wish to schedule all jobs at once, you can specify a ``prefetch``
amount, and it will only schedule that much jobs ahead of time. Example::
range(10000) | applyMp(lambda x: x**2) | head() | deref() # 700ms
range(10000) | applyMp(lambda x: x**2, 5) | head() | deref() # 300ms
# demonstrating there're no huge penalties even if we want all results at the same time
range(10000) | applyMp(lambda x: x**2) | deref() # 900ms
range(10000) | applyMp(lambda x: x**2, 5) | deref() # 1000ms
The first line will schedule all jobs at once, and thus will require more RAM and
compute power, even though we discard most of the results anyway (the
:class:`~k1lib.cli.filt.head` cli). The second line only schedules 5 jobs ahead of
time, and thus will be extremely more efficient if you don't need all results right
away.
.. note::
Remember that every :class:`~k1lib.cli.init.BaseCli` is also a
function, meaning that you can do stuff like::
# returns [['ab', 'ac']]
[["ab", "cd", "ac"]] | applyMp(filt(op().startswith("a")) | deref()) | deref()
Also remember that the return result of ``f`` should not be a generator.
That's why in the example above, there's a ``deref()`` inside f.
Most of the time, you would probably want to specify ``bs`` to something bigger than 1
(may be 32 or sth like that). This will executes ``f`` multiple times in a single job,
instead of executing ``f`` only once per job. Should reduce overhead of process
creation dramatically.
If you encounter strange errors not seen on :class:`apply`, you can try to clear all
pools (using :meth:`clearPools`), to terminate all child processes and thus free
resources. On earlier versions, you have to do this manually before exiting, but now
:class:`applyMp` is much more robust.
Also, you should not immediately assume that :class:`applyMp` will always be faster
than :class:`apply`. Remember that :class:`applyMp` will create new processes,
serialize and transfer data to them, execute it, then transfer data back. If your code
transfers a lot of data back and forth (compared to the amount of computation done), or
the child processes don't have a lot of stuff to do before returning, it may very well
be a lot slower than :class:`apply`.
There's a potential loophole here that can make your code faster. Because the main
process is forked (at least on linux), every variable is still there, even the big
ones. So, you can potentially do something like this::
bigData = [] # 1B items in the list
# summing up all items together. No input data transfers (because it's forked instead)
range(1_000_000_000) | batched(100) | applyMp(lambda r: r | apply(lambda i: bigData[i]) | toSum()) | toSum()
:param prefetch: if not specified, schedules all jobs at the same time. If
specified, schedules jobs so that there'll only be a specified amount of
jobs, and will only schedule more if results are actually being used.
:param timeout: seconds to wait for job before raising an error
:param utilization: how many percent cores are we running? 0 for no cores, 1 for
all the cores. Defaulted to 0.8
:param bs: if specified, groups ``bs`` number of transforms into 1 job to be more
efficient.
:param kwargs: extra arguments to be passed to the function. ``args`` not
included as there're a couple of options you can pass for this cli."""
super().__init__(fs=[f]); self.f = fastF(f)
self.prefetch = prefetch or 1_000_000
self.timeout = timeout; self.utilization = utilization
self.bs = bs; self.kwargs = kwargs
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
timeout = self.timeout; it = iter(it) # really make sure it's an iterator, for prefetch
if self.bs > 1:
return it | cli.batched(self.bs, True) | applyMp(apply(self.f) | cli.deref(), self.prefetch, timeout, **self.kwargs) | cli.joinStreams()
os.environ["py_k1lib_in_applyMp"] = "True"
self.p = p = mp.Pool(int(mp.cpu_count()*self.utilization), terminateGraceful)
applyMp._pools.add(p); common = dill.dumps([self.f, self.kwargs])
def gen():
try:
fs = deque()
for i, line in zip(range(self.prefetch), it):
fs.append(p.apply_async(executeFunc, [common, dill.dumps(line)]))
for line in it:
yield fs.popleft().get(timeout)
fs.append(p.apply_async(executeFunc, [common, dill.dumps(line)]))
for f in fs: yield f.get(timeout)
except KeyboardInterrupt as e:
print("applyMp interrupted. Terminating pool now")
self.p.terminate(); applyMp._pools.remove(self.p); raise e
except Exception as e:
print("applyMp encounter errors. Terminating pool now")
self.p.terminate(); applyMp._pools.remove(self.p); raise e
else: self.p.terminate(); applyMp._pools.remove(self.p)
return gen()
[docs] @staticmethod
def clearPools():
"""Terminate all existing pools. Do this before restarting/quitting the
script/notebook to make sure all resources (like GPU) are freed. **Update**:
you probably won't have to call this manually anymore since version 0.9, but
if you run into problems, try doing this."""
for p in applyMp._pools:
try: p.terminate()
except: pass
applyMp._pools = set()
[docs] @staticmethod
def pools():
"""Get set of all pools. Meant for debugging purposes only."""
return applyMp._pools
def __del__(self):
return
if hasattr(self, "p"):
self.p.terminate();
if self.p in applyMp._pools: applyMp._pools.remove(self.p)
# apparently, this doesn't do anything, at least in jupyter environment
atexit.register(lambda: applyMp.clearPools())
parallel = applyMp
thEmptySentinel = object()
[docs]class applyTh(BaseCli):
[docs] def __init__(self, f, prefetch:int=2, timeout:float=5, bs:int=1):
"""Kinda like the same as :class:`applyMp`, but executes ``f`` on multiple
threads, instead of on multiple processes. Advantages:
- Relatively low overhead for thread creation
- Fast, if ``f`` is io-bound
- Does not have to serialize and deserialize the result, meaning iterators can be
exchanged
Disadvantages:
- Still has thread creation overhead, so it's still recommended to specify ``bs``
- Is slow if ``f`` has to obtain the GIL to be able to do anything
All examples from :class:`applyMp` should work perfectly here."""
fs = [f]; super().__init__(fs=fs); self.f = fs[0]; self.bs = bs
self.prefetch = prefetch; self.timeout = timeout
[docs] def __ror__(self, it):
if self.bs > 1:
yield from (it | cli.batched(self.bs, True) | applyTh(apply(self.f), self.prefetch, self.timeout) | cli.joinStreams()); return
datas = deque(); it = iter(it)
innerF = fastF(self.f); timeout = self.timeout
def f(line, wrapper): wrapper.value = innerF(line)
for _, line in zip(range(self.prefetch), it):
w = k1lib.Wrapper(thEmptySentinel)
t = threading.Thread(target=f, args=(line,w))
t.start(); datas.append((t, w))
for line in it:
data = datas.popleft(); data[0].join(timeout)
if data[1].value is thEmptySentinel:
for data in datas: data[0].join(0.01)
raise RuntimeError("Thread timed out!")
yield data[1].value; w = k1lib.Wrapper(thEmptySentinel)
t = threading.Thread(target=f, args=(line,w))
t.start(); datas.append((t, w))
for i in range(len(datas)): # do it this way so that python can remove threads early, due to ref counting
data = datas.popleft(); data[0].join(timeout)
if data[1].value is thEmptySentinel:
for data in datas: data[0].join(0.01)
raise RuntimeError("Thread timed out!")
yield data[1].value
[docs]class applySerial(BaseCli):
[docs] def __init__(self, f, includeFirst=False, unpack=False):
"""Applies a function repeatedly. First yields input iterator ``x``. Then
yields ``f(x)``, then ``f(f(x))``, then ``f(f(f(x)))`` and so on. Example::
# returns [4, 8, 16, 32, 64]
2 | applySerial(op()*2) | head(5) | deref()
If the result of your operation is an iterator, you might want to
:class:`~k1lib.cli.utils.deref` it, like this::
rs = iter(range(8)) | applySerial(rows()[::2])
# returns [0, 2, 4, 6]
next(rs) | deref()
# returns []. This is because all the elements are taken by the previous deref()
next(rs) | deref()
# returns [[10, -6], [4, 16], [20, -12]]
[2, 8] | applySerial(lambda a, b: (a + b, a - b), unpack=True) | head(3) | deref()
rs = iter(range(8)) | applySerial(rows()[::2] | deref())
# returns [0, 2, 4, 6]
next(rs)
# returns [0, 4]
next(rs)
# returns [0]
next(rs)
:param f: function to apply repeatedly
:param includeFirst: whether to include the raw input value or not
:param unpack: whether to unpack values into the function or not for
aesthetic purposes"""
fs = [f]; super().__init__(fs=fs); self.f = fs[0]
self.includeFirst = includeFirst; self.unpack = unpack
[docs] def __ror__(self, it):
f = fastF(self.f)
if self.unpack:
if not self.includeFirst: it = f(*it)
while True: yield it; it = f(*it)
else:
if not self.includeFirst: it = f(it)
while True: yield it; it = f(it)
[docs]class sort(BaseCli):
[docs] def __init__(self, column:int=0, numeric=True, reverse=False):
"""Sorts all lines based on a specific `column`.
Example::
# returns [[5, 'a'], [1, 'b']]
[[1, "b"], [5, "a"]] | ~sort(0) | deref()
# returns [[2, 3]]
[[1, "b"], [5, "a"], [2, 3]] | ~sort(1) | deref()
# errors out, as you can't really compare str with int
[[1, "b"], [2, 3], [5, "a"]] | sort(1, False) | deref()
# returns [-1, 2, 3, 5, 8]
[2, 5, 3, -1, 8] | sort(None) | deref()
:param column: if None, sort rows based on themselves and not an element
:param numeric: whether to convert column to float
:param reverse: False for smaller to bigger, True for bigger to smaller. Use
:meth:`__invert__` to quickly reverse the order instead of using this param"""
self.column = column; self.reverse = reverse; self.numeric = numeric
self.filterF = (lambda x: float(x)) if numeric else (lambda x: x)
[docs] def __ror__(self, it:Iterator[str]):
c = self.column
if c is None:
return it | cli.wrapList() | cli.transpose() | sort(0, self.numeric, self.reverse) | cli.op()[0].all()
f = self.filterF
rows = (it | cli.isNumeric(c) if self.numeric else it) | cli.deref(maxDepth=2)
def sortF(row):
if len(row) > c: return f(row[c])
return float("inf")
return sorted(rows, key=sortF, reverse=self.reverse)
[docs] def __invert__(self):
"""Creates a clone that has the opposite sort order"""
return sort(self.column, self.numeric, not self.reverse)
[docs]class sortF(BaseCli):
[docs] def __init__(self, f:Callable[[T], float], column:int=None, reverse=False):
"""Sorts rows using a function.
Example::
# returns ['a', 'aa', 'aaa', 'aaaa', 'aaaaa']
["a", "aaa", "aaaaa", "aa", "aaaa"] | sortF(lambda r: len(r)) | deref()
# returns ['aaaaa', 'aaaa', 'aaa', 'aa', 'a']
["a", "aaa", "aaaaa", "aa", "aaaa"] | ~sortF(lambda r: len(r)) | deref()"""
fs = [f]; super().__init__(fs=fs); self.f = fs[0]
self.column = column; self.reverse = reverse
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
c = self.column; f = self.f
if c is None: return sorted(list(it), key=f, reverse=self.reverse)
def sortF(row):
if len(row) > c: return f(row[c])
return float("inf")
return sorted(list(it), key=sortF, reverse=self.reverse)
[docs] def __invert__(self) -> "sortF":
return sortF(self.f, self.column, not self.reverse)
[docs]class consume(BaseCli):
[docs] def __init__(self, f:Union[BaseCli, Callable[[T], None]]):
r"""Consumes the iterator in a side stream. Returns the iterator.
Kinda like the bash command ``tee``. Example::
# prints "0\n1\n2" and returns [0, 1, 2]
range(3) | consume(headOut()) | toList()
# prints "range(0, 3)" and returns [0, 1, 2]
range(3) | consume(lambda it: print(it)) | toList()
This is useful whenever you want to mutate something, but don't want to
include the function result into the main stream."""
fs = [f]; super().__init__(fs=fs); self.f = fs[0]
[docs] def __ror__(self, it:T) -> T:
self.f(it); return it
[docs]class randomize(BaseCli):
[docs] def __init__(self, bs=100, seed=None):
"""Randomize input stream. In order to be efficient, this does not
convert the input iterator to a giant list and yield random values from that.
Instead, this fetches ``bs`` items at a time, randomizes them, returns and
fetch another ``bs`` items. If you want to do the giant list, then just pass
in ``float("inf")``, or ``None``. Example::
# returns [0, 1, 2, 3, 4], effectively no randomize at all
range(5) | randomize(1) | deref()
# returns something like this: [1, 0, 2, 3, 5, 4, 6, 8, 7, 9]. You can clearly see the batches
range(10) | randomize(3) | deref()
# returns something like this: [7, 0, 5, 2, 4, 9, 6, 3, 1, 8]
range(10) | randomize(float("inf")) | deref()
# same as above
range(10) | randomize(None) | deref()
# returns True, as the seed is the same
range(10) | randomize(seed=4) | deref() == range(10) | randomize(seed=4) | deref()"""
self.bs = bs if bs != None else float("inf")
r = random.Random(seed)
self.gen = torch.Generator().manual_seed(r.getrandbits(63))
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
for batch in it | cli.batched(self.bs, True):
batch = list(batch); perms = torch.randperm(len(batch), generator=self.gen)
for idx in perms: yield batch[idx]
class StaggeredStream:
def __init__(self, stream:Iterator[T], every:int):
"""Not intended to be instantiated by the end user. Use :class:`stagger`
instead."""
self.stream = stream; self.every = every
def __iter__(self):
for i, v in zip(range(self.every), self.stream): yield v
def __len__(self):
"""Length of window (length of result if you were to deref it)."""
return self.every
[docs]class stagger(BaseCli):
[docs] def __init__(self, every:int):
"""Staggers input stream into multiple stream "windows" placed serially. Best
explained with an example::
o = range(10) | stagger(3)
o | deref() # returns [0, 1, 2], 1st "window"
o | deref() # returns [3, 4, 5], 2nd "window"
o | deref() # returns [6, 7, 8]
o | deref() # returns [9]
o | deref() # returns []
This might be useful when you're constructing a data loader::
dataset = [range(20), range(30, 50)] | transpose()
dl = dataset | batched(3) | (transpose() | toTensor()).all() | stagger(4)
for epoch in range(3):
for xb, yb in dl: # looping over a window
print(epoch)
# then something like: model(xb)
The above code will print 6 lines. 4 of them is "0" (because we stagger every 4
batches), and xb's shape' will be (3,) (because we batched every 3 samples).
You should also keep in mind that this doesn't really change the property of the
stream itself. Essentially, treat these pairs of statement as being the same thing::
o = range(11, 100)
# both returns 11
o | stagger(20) | item()
o | item()
# both returns [11, 12, ..., 20]
o | head(10) | deref()
o | stagger(20) | head(10) | deref()
Lastly, multiple iterators might be getting values from the same stream window,
meaning::
o = range(11, 100) | stagger(10)
it1 = iter(o); it2 = iter(o)
next(it1) # returns 11
next(it2) # returns 12
This may or may not be desirable. Also this should be obvious, but I want to
mention this in case it's not clear to you."""
self.every = int(every)
[docs] def __ror__(self, it:Iterator[T]) -> StaggeredStream:
return StaggeredStream(iter(it), self.every)
[docs] @staticmethod
def tv(every:int, ratio:float=0.8):
"""Convenience method to quickly stagger train and valid datasets.
Example::
# returns [[16], [4]]
[range(100)]*2 | stagger.tv(20) | shape().all() | deref()"""
return stagger(round(every*ratio)) + stagger(round(every*(1-ratio)))
[docs]class op(k1lib.Absorber, BaseCli):
_op_contains = deque([], 1) # last op in the form `4 in op()`
_op_contains_inv = deque([], 1) # last op in the form `op() in [1, 2, 3]`
[docs] def __init__(self):
"""Absorbs operations done on it and applies it on the stream. Based
on :class:`~k1lib.Absorber`. Example::
t = torch.tensor([[1, 2, 3], [4, 5, 6.0]])
# returns [torch.tensor([[4., 5., 6., 7., 8., 9.]])]
[t] | (op() + 3).view(1, -1).all() | deref()
Basically, you can treat ``op()`` as the input tensor. Tbh, you
can do the same thing with this::
[t] | applyS(lambda t: (t+3).view(-1, 1)).all() | deref()
But that's kinda long and may not be obvious. This can be surprisingly resilient, as
you can still combine with other cli tools as usual, for example::
# returns [2, 3], demonstrating "&" operator
torch.randn(2, 3) | (op().shape & iden()) | deref() | item()
a = torch.tensor([[1, 2, 3], [7, 8, 9]])
# returns torch.tensor([4, 5, 6]), demonstrating "+" operator for clis and not clis
(a | op() + 3 + iden() | item() == torch.tensor([4, 5, 6])).all()
# returns [[3], [3]], demonstrating .all() and "|" serial chaining
torch.randn(2, 3) | (op().shape.all() | deref())
# returns [[8, 18], [9, 19]], demonstrating you can treat `op()` as a regular function
[range(10), range(10, 20)] | transpose() | filt(op() > 7, 0) | deref()
This can only deal with simple operations only. For complex operations, resort
to the longer version ``applyS(lambda x: ...)`` instead!
Performance-wise, in most cases, there are no degradation, so don't worry
about it. Everything is pretty much on par with native lambdas::
n = 10_000_000
# takes 1.48s
for i in range(n): i**2
# takes 1.89s, 1.28x worse than for loop
range(n) | apply(lambda x: x**2) | ignore()
# takes 1.86s, 1.26x worse than for loop
range(n) | apply(op()**2) | ignore()
# takes 1.86s
range(n) | (op()**2).all() | ignore()
More complex operations still retains the same speeds, as there's a JIT compiler embedded in::
# takes 2.15s
for i in range(n): (i**2-3)*0.1
# takes 2.53s, 1.18x worse than for loop
range(n) | apply(lambda x: (x**2-3)*0.1) | ignore()
# takes 2.46s, 1.14x worse than for loop
range(n) | apply((op()**2-3)*0.1) | ignore()
Experimental features:
# returns [2, 3, 4]
range(10) | filt(op() in range(2, 5)) | deref()
# returns [0, 1, 5, 6, 7, 8, 9]
range(10) | ~filt(op() in range(2, 5)) | deref()
# will not work, so if your set potentially does not have any element, then don't use op()
range(10) | filt(op() in []) | deref()
Reserved operations that are not absorbed are:
- all
- __ror__ (__or__ still works!)
- op_solidify"""
super().__init__({"_op_solidified": False, "_op_in_set": set()})
[docs] @staticmethod
def solidify(f):
"""Static equivalent of ``a.op_solidify()``.
Example::
f = op()**2
f = op.solidify(f)
If ``f`` is not an ``op``, then just return it without doing anything to it"""
if f.__class__.__name__.split(".")[-1] == "op":
f.op_solidify()
return f
[docs] def op_solidify(self):
"""Use this to not absorb ``__call__`` operations anymore and makes it
feel like a regular function (still absorbs other operations though)::
f = op()**2
3 | f # returns 9, but may be you don't want to pipe it in
f.op_solidify()
f(3) # returns 9"""
self._ab_sentinel = True
self._op_solidified = True
self._ab_sentinel = False
return self
[docs] def __ror__(self, it):
return self.ab_operate(it)
def __or__(self, o):
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__or__(o)
return super().__add__(o)
def __add__(self, o):
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__add__(o)
return super().__add__(o)
def __and__(self, o):
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__and__(o)
return super().__and__(o)
def __call__(self, *args, **kwargs):
if self._op_solidified: return self.ab_operate(*args, **kwargs)
return super().__call__(*args, **kwargs)
def __contains__(self, k):
cli.op._op_contains.append(self.ab_contains(k)); return True
@staticmethod
def _op_pop_contains(): # mechanism for `4 in op()`
s = "Supposed to be unreachable, tried to recover `4 in op()`, but couldn't find any."
if len(cli.op._op_contains) == 0:
raise RuntimeError(f"""{s} May be you're doing something like `filt(op() not in [1, 2, 3])`? Use `~filt(op() in [1, 2, 3])` instead!""")
return cli.op._op_contains.pop()
@staticmethod
def _op_pop_contains_inv(): # mechanism for `op() in [1, 2, 3]`
s = "Supposed to be unreachable, tried to recover `op() in [1, 2, 3]`, but couldn't find any."
if len(cli.op._op_contains_inv) == 0:
if len(cli.op._op_contains) > 0:
raise RuntimeError(f"""{s} May be you're doing something like `filt(4 not in op())`? Use `~filt(4 in op())` instead!""")
else: raise RuntimeError(f"{s} Seems like you're doing something like `filt(op() in [])`? Because of complicated reasons, the set can't be empty. Just use vanilla `inSet([])` instead")
return op._op_contains_inv.pop()
def __bool__(self): # pops last eq from the operators, mechanism for `op() in [1, 2, 3]`
step = self._ab_steps[-1]; _op_in_set = self._op_in_set
if step[0][0] != "__eq__": raise RuntimeError(f"Supposed to be unreachable, tried to pop last __eq__ step from _ab_steps, but it's actually a {step[0][0]} operator instead")
self._op_in_set.add(step[0][1]); self._ab_steps.pop()
if len(_op_in_set) == 1 and (len(self._ab_steps) == 0 or self._ab_steps[-1][0][0] != "<in set>"):
self._ab_steps.append([["<in set>", _op_in_set], lambda x: x in _op_in_set])
cli.op._op_contains_inv.append(self); return False
cli.op = op
[docs]class integrate(BaseCli):
[docs] def __init__(self, dt=1):
"""Integrates the input.
Example::
# returns [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]
range(10) | integrate() | deref()
# returns [0, 2, 6, 12, 20, 30, 42, 56, 72, 90]
range(10) | integrate(2) | deref()
:param dt: Optional small step"""
self.dt = dt
[docs] def __ror__(self, it):
if self.dt == 1:
s = 0
for e in it: s += e; yield s
else:
dt = self.dt; s = 0
for e in it: s += e*dt; yield s