# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for quick modifiers, think of them as changing formats
"""
__all__ = ["applyS", "aS", "apply", "map_", "applyMp", "parallel", "applyCl",
"applyTh", "applySerial",
"sort", "sortF", "consume", "randomize", "stagger", "op",
"integrate"]
from typing import Callable, Iterator, Any, Union, List, Tuple
from k1lib.cli.init import patchDefaultDelim, BaseCli, T, fastF
import k1lib.cli as cli, numpy as np, threading, gc; import k1lib
from collections import deque
from functools import partial, update_wrapper, lru_cache
from k1lib.cli.typehint import *
import dill, pickle, k1lib, warnings, atexit, signal, time, os, random
try: import torch; import torch.multiprocessing as mp; hasTorch = True
except: import multiprocessing as mp; hasTorch = False
ray = k1lib.dep("ray")
settings = k1lib.settings.cli
[docs]class applyS(BaseCli):
[docs] def __init__(self, f:Callable[[T], T], *args, **kwargs):
"""Like :class:`apply`, but much simpler, just operating on the entire input
object, essentially. The "S" stands for "single". There's
also an alias shorthand for this called :class:`aS`. Example::
# returns 5
3 | aS(lambda x: x+2)
Like :class:`apply`, you can also use this as a decorator like this::
@aS
def f(x):
return x+2
# returns 5
3 | f
This also decorates the returned object so that it has same qualname, docstring
and whatnot.
.. admonition:: Shorthands
Writing out "lambda x:" all the time is annoying, and there are ways
to quickly say ``lambda x: x+2`` like so::
3 | op()+2 # returns 5
3 | aS("x+2") # returns 5. Behind the scenes, it compiles and execute `lambda x: x+2`
The first way is to use :class:`op`, that will absorb all operations done on it,
like "+", and returns a function that essentially replays all the operations.
In the second way, you only have to pass in the string containing code that you want
done on the variable "x". Then internally, it will compile to regular Python code.
In fact, you can pass in ``op()`` or just a string to any cli that accepts any kind
of function, like :class:`~k1lib.cli.filt.filt` or :class:`apply`::
range(4) | apply("x-2") | deref()
range(4) | apply(op()-2) | deref()
range(4) | filt("x%2") | deref()
range(4) | filt(op()%2) | deref()
:param f: the function to be executed
:param kwargs: other keyword arguments to pass to the function, together with ``args``"""
super().__init__(fs=[f]); self.args = args; self.kwargs = kwargs
self.f = f; self._fC = fastF(f); update_wrapper(self, f, updated=())
def _typehint(self, inp):
if self.hasHint: return self._hint
try: return self.f._typehint(inp)
except: return tAny()
[docs] def __ror__(self, it:T) -> T:
return self._fC(it, *self.args, **self.kwargs)
[docs] def __invert__(self):
"""Configures it so that it expand the arguments out.
Example::
# returns 5
[2, 3] | ~aS(lambda x, y: x + y)
def f(x, y, a=4):
return x*y + a
# returns 10
[2, 3] | ~aS(f)
# returns 11
[2, 3] | ~aS(f, a=5)"""
f = self.f; a = self.args; kw = self.kwargs; return applyS(lambda x: f(*x, *a, **kw));
aS = applyS
[docs]class apply(BaseCli):
[docs] def __init__(self, f:Callable[[T], T], column:Union[int, List[int]]=None, cache:int=0, **kwargs):
"""Applies a function f to every element in the incoming list/iterator.
Example::
# returns [0, 1, 4, 9, 16]
range(5) | apply(lambda x: x**2) | deref()
# returns [[3.0, 1.0, 1.0], [3.0, 1.0, 1.0]], running the function on the 0th column
torch.ones(2, 3) | apply(lambda x: x+2, 0) | deref()
# returns [[0, -1, 2, 3, -4], [2, -3, 4, 5, -6], [0, -1, 4, 9, -16]], running the function on the 1st (0-index btw) and 4th columns
[[0, 1, 2, 3, 4], [2, 3, 4, 5, 6], [0, 1, 4, 9, 16]] | apply(lambda x: -x, [1, 4]) | deref()
You can also use this as a decorator, like this::
@apply
def f(x):
return x**2
# returns [0, 1, 4, 9, 16]
range(5) | f | deref()
You can also add a cache, like this::
def calc(i): time.sleep(0.5); return i**2
# takes 2.5s
range(5) | repeatFrom(2) | apply(calc, cache=10) | deref()
# takes 5s
range(5) | repeatFrom(2) | apply(calc) | deref()
You can add custom keyword arguments into the function::
def f(x, y, z=3):
return x + y + z
# returns [15, 17, 19, 21, 23]
[range(5), range(10, 15)] | transpose() | ~apply(f, z=5) | deref()
If "apply" is too hard to remember, this cli also has an alias :class:`map_`
that kinda mimics Python's ``map()``. Also slight reminder that you can't pass
in extra positional args like in :class:`aS`, just extra keyword arguments.
:param column: if not None, then applies the function to that column or columns only
:param cache: if specified, then caches this much number of values
:param kwargs: extra keyword arguments to pass in the function"""
super().__init__(fs=[f]); self.f = f; self.kwargs = kwargs
if column:
ex = Exception(f"Applying a function on a negative-indexed column ({column}) is not supported")
if isinstance(column, int):
if column < 0: raise ex
else:
column = list(column)
if len([c for c in column if c < 0]): raise ex
self.column = column; self.cache = cache; self._fC = fastF(f)
if cache > 0: self._fC = lru_cache(cache)(self._fC)
self.normal = self.column is None and self.cache == 0 # cached value to say that this apply is just being used as a wrapper, nothing out of the ordinary
def _typehint(self, inp):
if self.column is None:
if isinstance(inp, tListIterSet):
try: return tIter(self.f._typehint(inp.child))
except: return tIter(tAny())
return super()._typehint(inp)
def _copy(self): return apply(self.f, self.column, self.cache, **self.kwargs) # ~apply() case handled automatically
[docs] def __ror__(self, it:Iterator[str]):
c = self.column; f = self._fC; kwargs = self.kwargs
if c is None: return (f(line, **kwargs) for line in it)
else:
if isinstance(c, int): return ([(e if i != c else f(e, **kwargs)) for i, e in enumerate(row)] for row in it)
else:
ops = []
for c_ in c: a = self._copy(); a.column = c_; ops.append(a)
return it | cli.serial(*ops)
[docs] def __invert__(self):
"""Same mechanism as in :class:`applyS`, it expands the
arguments out. Just for convenience really. Example::
# returns [10, 12, 14, 16, 18]
[range(5), range(10, 15)] | transpose() | ~apply(lambda x, y: x+y) | deref()"""
return apply(lambda x: self.f(*x, **self.kwargs), self.column, self.cache)
map_ = apply
def executeFunc(common, line):
import dill, time; f, kwargs = dill.loads(common)
res = f(dill.loads(line), **kwargs)
time.sleep(0.1); return res # suggestion by https://stackoverflow.com/questions/36359528/broken-pipe-error-with-multiprocessing-queue
def terminateGraceful(): signal.signal(signal.SIGINT, signal.SIG_IGN)
_k1_applyMp_global_ctx = {}; _k1_applyMp_global_ctx_autoInc = k1lib.AutoIncrement(prefix="_k1_applyMp")
[docs]class applyMp(BaseCli):
_pools = set()
_torchNumThreads = None
[docs] def __init__(self, f:Callable[[T], T], prefetch:int=None, timeout:float=8, utilization:float=0.8, bs:int=1, newPoolEvery:int=0, **kwargs):
"""Like :class:`apply`, but execute a function over the input iterator
in multiple processes. Example::
# returns [3, 2]
["abc", "de"] | applyMp(len) | deref()
# returns [5, 6, 9]
range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | deref()
# returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work
someList = [1, 2, 3]
["abc", "de"] | applyMp(lambda s: someList) | deref()
Internally, this will continuously spawn new jobs up until 80% of all CPU
cores are utilized. On posix systems, the default multiprocessing start method is
``fork()``. This sort of means that all the variables in memory will be copied
over. On windows and macos, the default start method is ``spawn``, meaning each
child process is a completely new interpreter, so you have to pass in all required
variables and reimport every dependencies. Read more at https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
If you don't wish to schedule all jobs at once, you can specify a ``prefetch``
amount, and it will only schedule that much jobs ahead of time. Example::
range(10000) | applyMp(lambda x: x**2) | head() | deref() # 700ms
range(10000) | applyMp(lambda x: x**2, 5) | head() | deref() # 300ms
# demonstrating there're no huge penalties even if we want all results at the same time
range(10000) | applyMp(lambda x: x**2) | deref() # 900ms
range(10000) | applyMp(lambda x: x**2, 5) | deref() # 1000ms
The first line will schedule all jobs at once, and thus will require more RAM and
compute power, even though we discard most of the results anyway (the
:class:`~k1lib.cli.filt.head` cli). The second line only schedules 5 jobs ahead of
time, and thus will be extremely more efficient if you don't need all results right
away.
.. note::
Remember that every :class:`~k1lib.cli.init.BaseCli` is also a
function, meaning that you can do stuff like::
# returns [['ab', 'ac']]
[["ab", "cd", "ac"]] | applyMp(filt(op().startswith("a")) | deref()) | deref()
Also remember that the return result of ``f`` should be serializable, meaning it
should not be a generator. That's why in the example above, there's a ``deref()``
inside f. You should also convert PyTorch tensors into Numpy arrays
Most of the time, you would probably want to specify ``bs`` to something bigger than 1
(may be 32 or sth like that). This will executes ``f`` multiple times in a single job,
instead of executing ``f`` only once per job. Should reduce overhead of process
creation dramatically.
If you encounter strange errors not seen on :class:`apply`, you can try to clear all
pools (using :meth:`clearPools`), to terminate all child processes and thus free
resources. On earlier versions, you have to do this manually before exiting, but now
:class:`applyMp` is much more robust.
Also, you should not immediately assume that :class:`applyMp` will always be faster
than :class:`apply`. Remember that :class:`applyMp` will create new processes,
serialize and transfer data to them, execute it, then transfer data back. If your code
transfers a lot of data back and forth (compared to the amount of computation done), or
the child processes don't have a lot of stuff to do before returning, it may very well
be a lot slower than :class:`apply`.
There's a potential loophole here that can make your code faster. Because the main
process is forked (at least on linux), every variable is still there, even the big
ones. So, you can potentially do something like this::
bigData = [] # 1B items in the list
# summing up all items together. No input data transfers (because it's forked instead)
range(1_000_000_000) | batched(100) | applyMp(lambda r: r | apply(lambda i: bigData[i]) | toSum()) | toSum()
In fact, I use this loophole all the time, and thus has made the function :meth:`shared`,
so check it out.
:param prefetch: if not specified, schedules all jobs at the same time. If
specified, schedules jobs so that there'll only be a specified amount of
jobs, and will only schedule more if results are actually being used.
:param timeout: seconds to wait for job before raising an error
:param utilization: how many percent cores are we running? 0 for no cores, 1 for
all the cores. Defaulted to 0.8
:param bs: if specified, groups ``bs`` number of transforms into 1 job to be more
efficient.
:param kwargs: extra arguments to be passed to the function. ``args`` not
included as there're a couple of options you can pass for this cli.
:param newPoolEvery: creates a new processing pool for every specific amount of input
fed. 0 for not refreshing any pools at all. Turn this on in case your process consumes
lots of memory and you have to kill them eventually to free up some memory"""
super().__init__(fs=[f]); self.f = fastF(f)
self.prefetch = prefetch or int(1e9)
self.timeout = timeout; self.utilization = utilization
self.bs = bs; self.kwargs = kwargs; self.p = None
self.newPoolEvery = newPoolEvery; self.ps = []; self._serializeF = True
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
timeout = self.timeout; it = iter(it); f = self.f # really make sure it's an iterator, for prefetch
if self.bs > 1: return it | cli.batched(self.bs, True) | applyMp(apply(f) | cli.toList(), self.prefetch, timeout, **self.kwargs) | cli.joinStreams()
def newPool():
if hasTorch:
try: applyMp._torchNumThreads = applyMp._torchNumThreads or torch.get_num_threads(); torch.set_num_threads(1)
except: pass # why do all of this? Because some strange interaction between PyTorch and multiprocessing, outlined here: https://github.com/pytorch/pytorch/issues/82843
os.environ["py_k1lib_in_applyMp"] = "True"
self.p = mp.Pool(int(mp.cpu_count()*self.utilization), terminateGraceful); self.ps.append(self.p)
if hasTorch and applyMp._torchNumThreads is not None: torch.set_num_threads(applyMp._torchNumThreads)
def intercept(it, n):
for i, e in enumerate(it):
if i % n == 0:
if self.p is not None: self.p.close(); self.ps.remove(self.p)
gc.collect(); newPool()
yield e
common = dill.dumps([f, self.kwargs])
def gen(it):
with k1lib.captureStdout(False, True) as out:
try:
if self.newPoolEvery > 0: it = intercept(it, self.newPoolEvery)
else: newPool()
fs = deque()
for i, line in zip(range(self.prefetch), it):
fs.append(self.p.apply_async(executeFunc, [common, dill.dumps(line)]))
for line in it:
yield fs.popleft().get(timeout)
fs.append(self.p.apply_async(executeFunc, [common, dill.dumps(line)]))
for f in fs: yield f.get(timeout)
except KeyboardInterrupt as e:
print("applyMp interrupted. Terminating pool now")
for p in self.ps: p.close(); p.terminate();
raise e
except Exception as e:
print("applyMp encounter errors. Terminating pool now")
for p in self.ps: p.close(); p.terminate();
raise e
else:
for p in self.ps: p.close(); p.terminate();
return gen(it)
[docs] @staticmethod
def cat(fileName: str, f:Callable, n:int=None, rS=None, **kwargs):
"""Like :meth:`applyCl.cat`, this will split a file up into multiple
sections, execute ``f`` over all sections and return the results.
Example::
fn = "~/repos/labs/k1lib/k1lib/cli/test/applyMp.cat"
"0123456789\\n"*100 | file(fn)
# returns [6, 6, 6, 7, 6, 6, 6, 7, 6, 6, 6, 7, 6, 6, 6, 8]
applyMp.cat(fn, shape(0), 16) | deref()
:param f: function to execute on an iterator of lines
:param n: how many chunks should it split the file into. Defaulted to the number of cpu cores available
:param rS: :class:`~k1lib.cli.inp.refineSeek` instance, if you need more fine-grained
control over section boundaries so as to not make everything corrupted
:param kwargs: extra keyword arguments for :class:`applyMp`"""
return fileName | cli.splitSeek(n or os.cpu_count()) | (rS or cli.iden()) | cli.window(2) | ~applyMp(lambda x,y: cli.cat(fileName, sB=x, eB=y) | f, **kwargs)
[docs] @staticmethod
def shared(f, **kwargs):
"""Execution model where the input iterator is dereferenced and shared across
all processes, bypassing serialization. Example::
a = range(1_000_000_000) | apply(lambda x: x*1.5 - 2000) | aS(list) # giant data structure
a | batched(50_000_000, True) | applyMp(toSum()) | toSum() # has to serialize and deserialize lists of numbers, which wastes lots of cpu cycles and memory
a | applyMp.shared(toSum()) | toSum() # giant data structure is forked, no serialization happens, no memory even gets copied, much faster
In the 2nd line, most of the time is spent on serializing the data and transferring
it to other processes, while in the 3rd line, most of the time is spent on calculating
the sum instead, as the giant data structure is forked, and Linux doesn't copy it internally."""
def inner(it):
try: n = len(it)
except: it = list(it); n = len(it)
# this is pretty unintuitive right? Why do it this way? Turns out, if you were to reference `it` directly, it will store it in f's co_freevars,
# which will be serialized, defeating the purpose. Moving it to a global variable forces it to move to co_names instead, avoiding serialization. This took forever to understand
idx = _k1_applyMp_global_ctx_autoInc(); _k1_applyMp_global_ctx[idx] = it
res = range(n) | cli.batched(round(n/os.cpu_count()+1), True) | applyMp(lambda r: f(_k1_applyMp_global_ctx[idx][r.start:r.stop]), **kwargs) | aS(list)
_k1_applyMp_global_ctx[idx] = None; return res
return aS(inner)
def _copy(self): return applyMp(self.f, self.prefetch, self.timeout, self.utilization, self.bs, self.newPoolEvery, **self.kwargs)
[docs] def __invert__(self):
"""Expands the arguments out, just like :class:`apply`.
Example::
# returns [20, 20, 18, 14, 8, 0, -10, -22, -36, -52]
[range(10), range(20, 30)] | transpose() | ~applyMp(lambda x, y: y-x**2) | deref()"""
res = self._copy(); f = res.f; res.f = lambda x: f(*x); return res
[docs] @staticmethod
def clearPools():
"""Terminate all existing pools. Do this before restarting/quitting the
script/notebook to make sure all resources (like GPU) are freed. **Update**:
you probably won't have to call this manually anymore since version 0.9, but
if you run into problems, try doing this."""
for p in applyMp._pools:
try: p.terminate()
except: pass
applyMp._pools = set()
[docs] @staticmethod
def pools():
"""Get set of all pools. Meant for debugging purposes only."""
return applyMp._pools
def __del__(self):
return
if hasattr(self, "p"):
self.p.terminate();
if self.p in applyMp._pools: applyMp._pools.remove(self.p)
# apparently, this doesn't do anything, at least in jupyter environment
atexit.register(lambda: applyMp.clearPools())
parallel = applyMp
s = k1lib.Settings(); settings.add("applyCl", s, "modifier.applyCl() settings")
s.add("sudoTimeout", 300, "seconds before deleting the stored password for sudo commands")
_password = k1lib.Wrapper(None)
def removePw():
while True: time.sleep(settings.applyCl.sudoTimeout); _password.value = None
t = threading.Thread(target=removePw, daemon=True).start()
def specificNode(obj, nodeId:str):
return obj.options(scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(node_id=nodeId, soft=False))
[docs]class applyCl(BaseCli):
[docs] def __init__(self, f, prefetch=None, timeout=60, bs=1, rss:Union[dict, str]={}, pre:bool=False, orPatch=True, num_cpus=1, **kwargs):
"""Like :class:`apply`, but execute a function over the input iterator
in multiple processes on multiple nodes inside of a cluster (hence "cl"). So, just a more
powerful version of :class:`applyMp`, assuming you have a cluster to run it on.
Example::
# returns [3, 2]
["abc", "de"] | applyCl(len) | deref()
# returns [5, 6, 9]
range(3) | applyCl(lambda x, bias: x**2+bias, bias=5) | deref()
# returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work
someList = [1, 2, 3]
["abc", "de"] | applyCl(lambda s: someList) | deref()
Internally, this uses the library Ray (https://www.ray.io) to do the heavy
lifting. So, :class:`applyCl` can be thought of as a thin wrapper around that
library, but still has the same consistent interface as :class:`apply` and
:class:`applyMp`. From all of my tests so far, it seems that :class:`applyCl`
works quite well and is quite robust, so if you have access to a cluster, use
it over :class:`applyMp`.
The library will connect to a Ray cluster automatically when you import
everything using ``from k1lib.imports import *``. It will execute
``import ray; ray.init()``, which is quite simple. If you have ray installed,
but does not want this default behavior, you can do this::
import k1lib
k1lib.settings.startup.init_ray = False
from k1lib.imports import *
As with :class:`applyMp`, there are pitfalls and weird quirks to multiprocessing,
on 1 or multiple nodes, so check out the docs over there to be aware of them,
as those translates well to here.
.. admonition:: Advanced use case
Not really advanced, but just a bit difficult to understand/follow. Let's say
that you want to scan through the home directory of all nodes, grab all files,
read them, and get the number of bytes they have. You can do something like this::
a = None | applyCl.aS(lambda: None | cmd("ls ~") | filt(os.path.isfile) | deref()) | deref()
b = a | ungroup(single=True, begin=True) | deref()
c = b | applyCl(cat(text=False) | shape(0), pre=True) | deref()
d = c | groupBy(0, True) | apply(item().all() | toSum(), 1) | deref()
Noted, this is relatively complex. Let's see what A, B, C and D looks like::
# A
[['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', ['Miniconda3-latest-Linux-x86_64.sh', 'mintupgrade-2023-04-01T232950.log']],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', ['5a', 'abc.jpg', 'a.txt']]]
# B
[['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 'Miniconda3-latest-Linux-x86_64.sh'],
['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 'mintupgrade-2023-04-01T232950.log'],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', '5a'],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 'abc.jpg'],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 'a.txt']]
# C
[['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 74403966],
['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 1065252],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 2601],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 16341],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 10177]]
# D
[['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 92185432],
['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 75469218]]
The steps we're concerned with is A and C. In step A, we're running 2 processes, 1 for each
node, to get all the file names in the home directory. In step C, we're running 5 processes
total, 2 on the first node and 3 on the second node. For each process, it's going to read as
bytes and count up those bytes. Finally in step D, the results are grouped together and the
sizes summed.
So yeah, it's pretty nice that we did all of that in a relatively short amount of code.
The data is distributed too (reading multiple files from multiple nodes), so we're truly
not bottlenecked by anything.
:param prefetch: if not specified, schedules all jobs at the same time. If
specified, schedules jobs so that there'll only be a specified amount of
jobs, and will only schedule more if results are actually being used.
:param timeout: seconds to wait for job before raising an error
:param bs: if specified, groups ``bs`` number of transforms into 1 job to be more
efficient.
:param rss: resources required for the task. Can be {"custom_resource1": 2} or "custom_resource1" as a shortcut
:param pre: "preserve", same convention as :meth:`applyCl.aS`. If True, then allow passing
through node ids as the first column to shedule jobs on those specific nodes only
:param orPatch: whether to automatically patch __or__ function so that cli tools can
work with numpy arrays on that remote worker
:param num_cpus: how many cpu does each task take?
:param kwargs: extra arguments to be passed to the function. ``args`` not
included as there're a couple of options you can pass for this cli."""
super().__init__(fs=[f]); _fC = fastF(f); self.ogF = f; self.pre = pre
self.rss = rss = {rss: 1} if isinstance(rss, str) else rss
def remoteF(e):
if orPatch:
import k1lib; k1lib.cli.init.patchNumpy()
k1lib.cli.init.patchDict(); k1lib.cli.init.patchPandas()
return _fC(e, **kwargs)
self.remoteF = remoteF; self.f = ray.remote(resources=rss, num_cpus=num_cpus)(remoteF)
self.prefetch = prefetch or int(1e9)
self.timeout = timeout; self.bs = bs
self._copyCtx = lambda: [f, [prefetch, timeout, bs, rss, pre, orPatch, num_cpus], kwargs]
def preprocessF(f, e): # return future (if pre=False), or [nodeId, future] (if pre=True)
if pre: nodeId, e = e; return [nodeId, specificNode(f, nodeId).remote(e)]
else: return f.remote(e)
def resolveF(e):
if pre: return [e[0], ray.get(e[1], timeout=timeout)]
else: return ray.get(e, timeout=timeout)
self.preprocessF = preprocessF; self.resolveF = resolveF
[docs] def __ror__(self, it):
f = self.f; timeout = self.timeout; bs = self.bs; ogF = self.ogF; preprocessF = self.preprocessF; resolveF = self.resolveF
if bs > 1: return it | cli.batched(bs, True) | applyCl(lambda x: x | apply(ogF) | cli.aS(list), self.prefetch, timeout) | cli.joinStreams()
def gen(it):
futures = deque(); it = iter(it)
for i, e in zip(range(self.prefetch), it): futures.append(preprocessF(f, e))
for e in it: yield resolveF(futures.popleft()); futures.append(preprocessF(f, e))
for e in futures: yield resolveF(e)
return gen(it)
[docs] def __invert__(self):
"""Expands the arguments out, just like :class:`apply`.
Example::
# returns [20, 20, 18, 14, 8, 0, -10, -22, -36, -52]
[range(10), range(20, 30)] | transpose() | ~applyCl(lambda x, y: y-x**2) | deref()"""
f, rest, kwargs = self._copyCtx(); return applyCl(lambda x: f(*x), *rest, **kwargs)
[docs] @staticmethod
def nodeIds(includeSelf=True) -> List[str]:
"""Returns a list of all node ids in the current cluster.
Example::
applyCl.nodeIds() # returns something like ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', '1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068']
If you want to get nodes' metadata, then just use ray's builtin function ``ray.nodes()``
:param includeSelf: whether to include node id of the current process or not"""
res = ray.nodes() | cli.filt(lambda x: x["Alive"]) | apply(lambda x: x["NodeID"]) | aS(list)
if includeSelf: return res
res.remove(applyCl.nodeId()); return res
[docs] @staticmethod
def nodeId() -> str:
"""Returns current node id"""
return ray.runtime_context.get_runtime_context().get_node_id()
[docs] @staticmethod
def cpu() -> int:
"""Grabs the number of cpus available on this node"""
return int(applyCl.meta()["Resources"]["CPU"])
[docs] @staticmethod
def aS(f, timeout:float=8):
"""Executes function f once for all node ids that are piped in.
Example::
# returns [['1051da...', ['Desktop', 'Downloads']], ['7bb387...', ['Pictures', 'Music']]]
applyCl.nodeIds() | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref()
# also returns [['1051da...', ['Desktop', 'Downloads']], ['7bb387...', ['Pictures', 'Music']]]
None | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref()
If you want to execute f for all nodes, you can pass in None instead.
As a reminder, this kinda follows the same logic as the popular cli :class:`aS`, where
f is executed once, hence the name "apply Single". Here, the meaning of "single" is
different. It just means execute once for each node ids.
:param f: main function to execute in each node. Not supposed to accept any arguments
:param timeout: seconds to wait for job before raising an error"""
f = fastF(f); g = lambda nodeId: specificNode(ray.remote(f), nodeId).remote()
final = cli.iden() & (apply(g) | aS(list) | apply(ray.get, timeout=timeout)) | cli.transpose()
return aS(lambda it: (applyCl.nodeIds() if it is None else it) | final)
[docs] @staticmethod
def cmd(s:str, timeout:float=8, sudo=False):
"""Convenience function to execute shell command on all nodes.
Example::
applyCl.cmd("mkdir -p /some/folder")
It returns [[nodeid1, output1], [nodeid2, output2]]. If you need more flexibility,
fall back to :meth:`applyCl.aS`
:param s: shell command to execute
:param sudo: if True, will execute the command with sudo privileges. Will ask for password
and then cache it internally for 5 minutes"""
global _password; import getpass
if sudo:
if _password() is None:
print("Enter password:"); _password.value = getpass.getpass(prompt="")
return None | applyCl.aS(lambda: _password() | cli.cmd(f"sudo -S {s}") | cli.deref(), timeout) | cli.deref()
else: return None | applyCl.aS(lambda: None | cli.cmd(s) | cli.deref(), timeout) | cli.deref()
[docs] @staticmethod
def replicateFile(fn:str, nodeIds=None):
"""Replicates a specific file in the current node to all the other nodes.
Example::
applyCl.replicate("~/cron.log")
Internally, this will read chunks of 100kB of the specified file and dump it
incrementally to all other nodes, which has implications on performance. To
increase or decrease it, check out :class:`~k1lib.cli.inp.cat`. This also means
you can replicate arbitrarily large files around as long as you have the disk
space for it, while ram size doesn't really matter
:param fn: file name"""
fn = os.path.expanduser(fn); dirname = os.path.dirname(fn)
if nodeIds is None: nodeIds = applyCl.nodeIds(False)
nodeIds = nodeIds | cli.wrapList().all() | cli.deref()
nodeIds | cli.insert(None, False).all() | applyCl(lambda _: None | cli.cmd(f"mkdir -p {dirname}; rm {fn}") | cli.deref(), pre=True) | cli.deref()
for chunk in cli.cat(fn, text=False, chunks=True):
nodeIds | cli.insert(chunk, False).all() | applyCl(lambda chunk: chunk >> cli.file(fn) | cli.deref(), pre=True) | cli.deref()
[docs] @staticmethod
def balanceFile(fn:str, nAs:List[str]=None, nBs:List[str]=None, rS=None):
"""Splits a specified file in node nAs and dumps other parts
to nodes nBs. Example::
applyCl.balanceFile("~/cron.log")
This will split the big files up into multiple segments (1 for each node). Then
for each segment, it will read through it chunk by chunk into memory, and then
deposits it into the respective nodes. Finally, it truncates the original files
down to its segment boundary.
The main goal of this is so that you can analyze a single big (say 200GB) file
quickly. If that file is on a single node, then it will take forever, even with
:class:`applyMp`. So splitting things up on multiple nodes will make analyzing
it a lot faster.
There's also the function :meth:`balanceFolder`, which has the opposite problem of
having lots of small (say 100MB) files. So it will try to move files around (keeping
them intact in the meantime) to different nodes so that the folder size ratio is
roughly proportional to the cpu count.
The exact split rule depends on the number of CPUs of each node. Best to see an
example::
Command: applyCl.balanceFile("~/cron.log")
Verbose command: applyCl.balanceFile("~/cron.log", ["1"], ["1", "2", "3", "4", "5"])
----------- Before -----------
Node: 1 2 3 4 5
Cpu: 8 12 16 8 8
Size (GB): 52 0 0 0 0
----------- After -----------
Node: 1 2 3 4 5
Cpu: 8 12 16 8 8
Size (GB): 8 12 16 8 8
This also works if you have files on existing nodes already, and are upgrading the
cluster::
Command: applyCl.balanceFile("~/cron.log")
Verbose command: applyCl.balanceFile("~/cron.log", ["1", "5"], ["1", "2", "3", "4", "5"])
----------- Before -----------
Node: 1 2 3 4 5
Cpu: 8 12 16 8 8
Size (GB): 26 0 0 26 0
----------- After -----------
Node: 1 2 3 4 5
Cpu: 8 12 16 8 8
Size (GB): 8 12 16 8 8
If you want to move files out of a node when decommissioning them, you can do
something like this::
Command: applyCl.decommission("~/cron.log", ["3", "4"])
Verbose command: applyCl.balanceFile("~/cron.log", ["1", "2", "3", "4", "5"], ["1", "2", "5"])
----------- Before -----------
Node: 1 2 3 4 5
Cpu: 8 12 16 8 8
Size (GB): 8 12 16 8 8
----------- After -----------
Node: 1 2 3 4 5
Cpu: 8 12 16 8 8
Size (GB): 15 22 0 0 15
Remember that the node ids "1", etc. is for illustrative purposes only. You should get
real node ids from :meth:`nodeIds`.
Why is the file size proportional to the number of cores on each node? Well, if
you have more cores, you should be able to process more, so as to make everything
balanced, right?
Again, this means that you can split arbitrarily large files as long as you have
the disk space for it, ram size is not a concern. How does this perform? Not
the best in the world if you don't have a lot of nodes. With sata 3 ssds, 750MB/s
ethernet, I got transfer speeds of roughly 100MB/s. This should increase as you
have more nodes based on the code structure, but I haven't tested it yet. Can
it be faster? Definitely. Am I willing to spend time optimizing it? No.
:param fn: file name
:param nAs: node ids that currently stores the file. If not specified, try to detect
what nodes the file exists in
:param nBs: node ids that will store the file after balancing everything out. If not
specified, will take all available nodes
:param rS: :class:`~k1lib.cli.inp.refineSeek` instance, if you need more fine-grained
control over section boundaries so as to not make everything corrupted
"""
from k1lib.cli._applyCl import balanceFile
balanceFile(fn, nAs, nBs, rS or cli.iden())
[docs] def decommission(self, fn, nAs:List[str], rS=None):
"""Convenience function for :meth:`balanceFile`. See docs over there."""
from k1lib.cli._applyCl import balanceFile
balanceFile(fn, None, applyCl.nodeIds() | ~cli.inSet(nAs) | deref(), rS or cli.iden())
[docs] @staticmethod
def cat(fn:str=None, f:Callable=None, timeout:float=60, keepNodeIds:bool=False, multiplier:int=1, includeId:bool=False):
"""Reads a file distributedly, does some operation on them, collects and
returns all of the data together. Example::
fn = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.cat.data"
("0123456789"*5 + "\\n") * 1000 | file(fn)
applyCl.splitFile(fn)
applyCl.cat(fn, shape(0), keepNodeIds=True) | deref()
That returns something like this (for a 2-node cluster, with 2 (node A) and 4 (node B) cpus respectively)::
[['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167],
['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166],
['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167]]
Here, we're creating an initial file with 1000 lines. Then we'll split it up into
2 fragments: 334 lines and 667 lines and store them on the respective nodes. Then,
on node A, we'll split the file up into 2 parts, each with 167 lines. On node B,
we'll split the file up into 4 parts, each with around 166 lines. Then we'll
schedule 6 processes total, each dealing with 166 lines. After all of that, results
are collected together and returned.
If you want to distinguish between different processes inside f, for example you
want to write results into different files, you can do something like this::
dir_ = "~/repos/labs/k1lib/k1lib/cli/test"
fn = f"{dir_}/applyCl.cat.data"
applyCl.cmd(f"rm -r {dir_}/applyCl") # clear out old folders
applyCl.cmd(f"mkdir -p {dir_}/applyCl") # creating folders
# do processing on fn distributedly, then dump results into multiple files
applyCl.cat(fn, ~aS(lambda idx, lines: lines | shape(0) | aS(dill.dumps) | file(f"{dir_}/applyCl/{idx}.pth")), includeId=True) | deref()
# reading all files and summing them together
None | applyCl.aS(lambda: ls(f"{dir_}/applyCl")) | ungroup(single=True, begin=True) | applyCl(cat(text=False) | aS(dill.loads), pre=True) | cut(1) | toSum()
.. admonition:: Simple mode
There's also another mode that's activated whenever f is not specified that feels
more like vanilla :class:`~inp.cat`. Say you have a file on a specific node::
nodeId = "7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d"
fn = "~/ssd2/randomFile.txt"
# -------------- file is on current node --------------
cat(fn) # returns iterator of lines inside the file
fn | cat() # same thing as above
# -------------- file is on remote node --------------
[nodeId, fn] | applyCl.cat() # returns iterator of lines of the file
applyCl.cat([nodeId, fn]) # same thing
nodeId | applyCl.cat(fn) # also same thing
So yeah, there're lots of ways to just simply read a file on a remote node. Is
it too much? Probably, but good thing is that you can pick any that's intuitive
for you. Note that this mode is just for convenience only, for when you want to do
exploratory analysis on a single remote file. To be efficient at bulk processing,
use the normal mode instead.
:param fn: file name
:param f: function to execute in every process
:param timeout: kills the processes if it takes longer than this amount of seconds
:param keepNodeIds: whether to keep the node id column or not
:param multiplier: by default, each node will spawn as many process as there
are cpus. Sometimes you want to spawn more process, change this to a higher number
:param includeId: includes a unique id for this process
"""
if f is None: # simple case
def inner(nodeId_fn:Tuple[str, str]):
nodeId, fn = nodeId_fn; seeks = [nodeId] | applyCl.aS(lambda: fn | cli.splitSeek(round(os.path.getsize(fn)/settings.cat.chunkSize+1))) | cli.cut(1) | cli.item() | cli.deref()
inter = seeks | cli.window(2) | apply(cli.wrapList() | cli.insert(nodeId)) | cli.deref()
return inter | ~applyCl(lambda sB, eB: cli.cat(fn,sB=sB,eB=eB) | cli.deref(), pre=True) | cli.cut(1) | cli.joinStreams()
# return [nodeId_fn] | applyCl(cat() | deref(), pre=True) | cut(1) | item() # direct, no chunking method
if fn is None: return aS(inner) # [nodeId, fn] | applyCl.cat()
if isinstance(fn, str): return aS(lambda nodeId: inner([nodeId, fn])) # nodeId | applyCl.cat()
else: return inner(fn) # applyCl.cat([nodeId, fn])
postprocess = cli.insertIdColumn(True, False) | ~apply(lambda x,y,z: [x,[*y,z]])
checkpoints = None | applyCl.aS(lambda: fn | cli.splitSeek(int(applyCl.meta()["Resources"]["CPU"]*multiplier)) | cli.window(2) | cli.deref()) | cli.ungroup(single=True, begin=True) | postprocess | cli.deref()
postprocess = cli.iden() if keepNodeIds else cli.cut(1)
return checkpoints | applyCl(~aS(lambda x,y,idx: cli.cat(fn, sB=x, eB=y) | ((cli.wrapList() | cli.insert(idx)) if includeId else cli.iden()) | f), pre=True, timeout=timeout, num_cpus=1) | postprocess
[docs] @staticmethod
def balanceFolder(folder:str, maxSteps:int=None, audit:bool=False):
"""Balances all files within a folder across all nodes.
Example::
base = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.balance"
# deletes old structures and making test folder
applyCl.cmd(f"rm -r {base}"); applyCl.cmd(f"mkdir -p {base}")
# creates 20 files of different sizes and dump it in the base folder of the current node
torch.linspace(1e4, 1e5, 20).int() | apply(lambda x: "x"*x) | insertIdColumn() | ~apply(lambda idx, contents: contents | file(f"{base}/{idx}.txt")) | deref();
# transfers files between nodes such that the total folder size is proportional to the number of cpus across nodes
applyCl.balanceFolder(base)
# get folder size of all nodes
None | applyCl.aS(lambda: ls(base) | apply(os.path.getsize) | toSum()) | deref()
# creates 20 additional files and dump it to the current node
torch.linspace(1e4, 1e5, 20).int() | apply(lambda x: "x"*x) | insertIdColumn() | ~apply(lambda idx, contents: contents | file(f"{base}/{idx+20}.txt")) | deref();
# balances the tree out again
applyCl.balance(base)
# get folder size of all nodes
None | applyCl.aS(lambda: ls(base) | apply(os.path.getsize) | toSum()) | deref()
So imagine that you just downloaded 1000 files to a single node on a specific folder,
but you need to analyze all of them in a distributed manner. What you can do is to
move some files to other nodes and then do your analysis. If you want to download
more files, just dump it to any node (or download distributed across all nodes),
then rebalance the folders and do your analysis.
:param folder: folder to rebalance all of the files
:param maxSteps: what's the maximum number of file transfers? By default has no limit, so that files are transferred until
:param audit: if True, don't actually move files around and just return what files are going to be moved where"""
from k1lib.cli._applyCl import balanceFolder
return balanceFolder(folder, audit, maxSteps)
[docs] def download(url:str, folder:str, merge:bool=False, timeout=120, chunkTimeout=5):
"""Downloads a file distributedly to a specified folder.
Example::
url = "https://vim.kelvinho.org"
fn = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.download" # file/folder name
applyCl.download(url, fn) # will download distributedly and dump file fragments into the folder fn
applyCl.download(url, fn, True) # same as above, but collects all fragments together, places it in fn in the current node, then deletes the temporary file fragments
This only works if the server allows partial downloads btw.
:param url: url to download
:param folder: which folder to download parts into
:param merge: whether to merge all of the fragments together into a single file in the current node or not
:param timeout: timeout for each process
:param chunkTimeout: timeout for each file chunk inside each process"""
from k1lib.cli._applyCl import download
download(url, folder, merge, timeout, chunkTimeout)
[docs] @staticmethod
def diskScan(folder:str, raw=False):
"""Scans for files and folders in the specified folder for potential
distributed files and folders. A distributed file is a file that exists on more
than 1 node. A distributed folder is a folder that that exists on more than 1
node and does not have any shared children. Example::
applyCl.diskScan("~/ssd2")
applyCl.diskScan("~/ssd2", True)
The first line does not return anything, but will print out something like this:
.. include:: ../literals/diskScan.rst
While the second line will return a parseable data structure instead::
[[['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity', [4113489746, 7912834090, 4164314316]],
['/home/kelvin/ssd2/data/genome/go/release_geneontology_org', [2071645117, 4172737915, 2107005131]],
['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity.backup', [568878496, 552888466, 600610083]],
['/home/kelvin/ssd2/data/genome/00-common_all.idx', [341738564, 671136833, 0]],
['/home/kelvin/ssd2/data/genome/genbank/ch1.dat.gz', [25356744, 0, 25356764]],
['/home/kelvin/ssd2/test', [136152, 273530, 136351]],
['/home/kelvin/ssd2/data/genome/genbank/ch1', [0, 0, 0]]],
[['/home/kelvin/ssd2/data/genome/dummy.txt', [1101, 1101, 1101]]],
[['/home/kelvin/ssd2/data/genome/00-All.vcf', [32737509360, 65475018903, 32737509588]],
['/home/kelvin/ssd2/data/genome/MotifFeatures/homo_sapiens.GRCh38.motif_features.gff', [13963854962, 27927709895, 13963854962]],
['/home/kelvin/ssd2/data/genome/00-common_all.vcf', [2353901811, 4707803470, 2353901831]]]]
Remember that since an operating system usually have lots of shared files
(like "~/.bashrc", for example), these might be mistaken as a distributed file.
Make sure to only scan folders that you store data in, or else it'll take a long time to return.
:param folder: the folder to scan through
:param raw: whether to return raw data or display it out nicely"""
from k1lib.cli._applyCl import diskScan3, diskScan4
if size: return diskScan4(folder)
else: return diskScan3(folder)
thEmptySentinel = object()
[docs]class applyTh(BaseCli):
[docs] def __init__(self, f, prefetch:int=None, timeout:float=5, bs:int=1, **kwargs):
"""Kinda like the same as :class:`applyMp`, but executes ``f`` on multiple
threads, instead of on multiple processes. Advantages:
- Relatively low overhead for thread creation
- Fast, if ``f`` is io-bound
- Does not have to serialize and deserialize the result, meaning iterators can be
exchanged
Disadvantages:
- Still has thread creation overhead, so it's still recommended to specify ``bs``
- Is slow if ``f`` has to obtain the GIL to be able to do anything
All examples from :class:`applyMp` should work perfectly here."""
fs = [f]; super().__init__(fs=fs); self.f = fs[0]; self.bs = bs; self.kwargs = kwargs
self.prefetch = prefetch or int(1e9); self.timeout = timeout
[docs] def __ror__(self, it):
if self.bs > 1:
yield from (it | cli.batched(self.bs, True) | applyTh(apply(self.f), self.prefetch, self.timeout) | cli.joinStreams()); return
datas = deque(); it = iter(it); kwargs = self.kwargs
innerF = fastF(self.f); timeout = self.timeout
def f(line, wrapper): wrapper.value = innerF(line, **kwargs)
for _, line in zip(range(self.prefetch), it):
w = k1lib.Wrapper(thEmptySentinel)
t = threading.Thread(target=f, args=(line,w))
t.start(); datas.append((t, w))
for line in it:
data = datas.popleft(); data[0].join(timeout)
if data[1].value is thEmptySentinel:
for data in datas: data[0].join(0.01)
raise RuntimeError("Thread timed out!")
yield data[1].value; w = k1lib.Wrapper(thEmptySentinel)
t = threading.Thread(target=f, args=(line,w))
t.start(); datas.append((t, w))
for i in range(len(datas)): # do it this way so that python can remove threads early, due to ref counting
data = datas.popleft(); data[0].join(timeout)
if data[1].value is thEmptySentinel:
for data in datas: data[0].join(0.01)
raise RuntimeError("Thread timed out!")
yield data[1].value
def _copy(self): return applyTh(self.f, self.prefetch, self.timeout, self.bs, **self.kwargs)
[docs] def __invert__(self):
res = self._copy(); f = fastF(res.f)
kw = res.kwargs
res.f = lambda x: f(*x, **kw)
res.kwargs = {}
return res
[docs]class applySerial(BaseCli):
[docs] def __init__(self, f, *args, **kwargs):
"""Applies a function repeatedly. First yields input iterator ``x``. Then
yields ``f(x)``, then ``f(f(x))``, then ``f(f(f(x)))`` and so on. Example::
# returns [2, 4, 8, 16, 32]
2 | applySerial(op()*2) | head(5) | deref()
If the result of your operation is an iterator, you might want to
:class:`~k1lib.cli.utils.deref` it, like this::
rs = iter(range(8)) | applySerial(rows()[::2])
# returns [0, 2, 4, 6]
rs | rows(1) | item() | deref()
# returns []. This is because all the elements are taken by the previous deref()
rs | item() | deref()
# returns [[2, 8], [10, -6], [4, 16], [20, -12]]
[2, 8] | ~applySerial(lambda a, b: (a + b, a - b)) | head(4) | deref()
rs = iter(range(8)) | applySerial(rows()[::2] | deref())
# returns [0, 2, 4, 6]
rs | rows(1) | item()
# returns [0, 4]
rs | item() # or `next(rs)`
# returns [0]
rs | item() # or `next(rs)`
:param f: function to apply repeatedly"""
fs = [f]; super().__init__(fs=fs); self.f = fs[0]
self.unpack = False; self.args = args; self.kwargs = kwargs
[docs] def __ror__(self, it):
f = fastF(self.f)
if self.unpack:
while True: yield it; it = f(*it, *self.args, **self.kwargs)
else:
while True: yield it; it = f(it, *self.args, **self.kwargs)
[docs] def __invert__(self):
ans = applySerial(self.f, *self.args, **self.kwargs)
ans.unpack = True; return ans
[docs]class sort(BaseCli):
[docs] def __init__(self, column:int=0, numeric=True, reverse=False):
"""Sorts all lines based on a specific `column`.
Example::
# returns [[5, 'a'], [1, 'b']]
[[1, "b"], [5, "a"]] | ~sort(0) | deref()
# returns [[2, 3]]
[[1, "b"], [5, "a"], [2, 3]] | ~sort(1) | deref()
# errors out, as you can't really compare str with int
[[1, "b"], [2, 3], [5, "a"]] | sort(1, False) | deref()
# returns [-1, 2, 3, 5, 8]
[2, 5, 3, -1, 8] | sort(None) | deref()
:param column: if None, sort rows based on themselves and not an element
:param numeric: whether to convert column to float
:param reverse: False for smaller to bigger, True for bigger to smaller. Use
:meth:`__invert__` to quickly reverse the order instead of using this param"""
self.column = column; self.reverse = reverse; self.numeric = numeric
self.filterF = (lambda x: float(x)) if numeric else (lambda x: x)
[docs] def __ror__(self, it:Iterator[str]):
c = self.column
if c is None:
return it | cli.wrapList() | cli.transpose() | sort(0, self.numeric, self.reverse) | cli.op()[0].all()
f = self.filterF
rows = (it | cli.isNumeric(c) if self.numeric else it) | cli.apply(list)
def sortF(row):
if len(row) > c: return f(row[c])
return float("inf")
return sorted(rows, key=sortF, reverse=self.reverse)
[docs] def __invert__(self):
"""Creates a clone that has the opposite sort order"""
return sort(self.column, self.numeric, not self.reverse)
[docs]class sortF(BaseCli):
[docs] def __init__(self, f:Callable[[T], float], column:int=None, reverse=False):
"""Sorts rows using a function.
Example::
# returns ['a', 'aa', 'aaa', 'aaaa', 'aaaaa']
["a", "aaa", "aaaaa", "aa", "aaaa"] | sortF(lambda r: len(r)) | deref()
# returns ['aaaaa', 'aaaa', 'aaa', 'aa', 'a']
["a", "aaa", "aaaaa", "aa", "aaaa"] | ~sortF(lambda r: len(r)) | deref()"""
fs = [f]; super().__init__(fs=fs); self.f = fs[0]
self.column = column; self.reverse = reverse
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
c = self.column; f = self.f
if c is None: return sorted(list(it), key=f, reverse=self.reverse)
def sortF(row):
if len(row) > c: return f(row[c])
return float("inf")
return sorted(list(it), key=sortF, reverse=self.reverse)
[docs] def __invert__(self) -> "sortF":
return sortF(self.f, self.column, not self.reverse)
[docs]class consume(BaseCli):
[docs] def __init__(self, f:Union[BaseCli, Callable[[T], None]]):
r"""Consumes the iterator in a side stream. Returns the iterator.
Kinda like the bash command ``tee``. Example::
# prints "0\n1\n2" and returns [0, 1, 2]
range(3) | consume(headOut()) | toList()
# prints "range(0, 3)" and returns [0, 1, 2]
range(3) | consume(lambda it: print(it)) | toList()
This is useful whenever you want to mutate something, but don't want to
include the function result into the main stream.
See also: :class:`~k1lib.cli.output.tee`"""
fs = [f]; super().__init__(fs=fs); self.f = fs[0]
[docs] def __ror__(self, it:T) -> T:
self.f(it); return it
[docs]class randomize(BaseCli):
[docs] def __init__(self, bs=100, seed=None):
"""Randomize input stream. In order to be efficient, this does not
convert the input iterator to a giant list and yield random values from that.
Instead, this fetches ``bs`` items at a time, randomizes them, returns and
fetch another ``bs`` items. If you want to do the giant list, then just pass
in ``float("inf")``, or ``None``. Example::
# returns [0, 1, 2, 3, 4], effectively no randomize at all
range(5) | randomize(1) | deref()
# returns something like this: [1, 0, 2, 3, 5, 4, 6, 8, 7, 9]. You can clearly see the batches
range(10) | randomize(3) | deref()
# returns something like this: [7, 0, 5, 2, 4, 9, 6, 3, 1, 8]
range(10) | randomize(float("inf")) | deref()
# same as above
range(10) | randomize(None) | deref()
# returns True, as the seed is the same
range(10) | randomize(seed=4) | deref() == range(10) | randomize(seed=4) | deref()"""
self.bs = bs if bs != None else float("inf"); self.seed = seed; self._initGenn()
def _initGenn(self):
if hasTorch:
gen = torch.Generator().manual_seed(random.Random(self.seed).getrandbits(63))
self.genn = lambda n: torch.randperm(n, generator=gen)
else: self.genn = np.random.permutation
def __getstate__(self):
genn = self.genn; self.genn = None; return self.__dict__
def __setstate__(self, d): self.__dict__.update(d); self._initGenn()
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
bs = self.bs
if isinstance(it, settings.arrayTypes):
if bs is None or len(it) <= bs: return it if len(it) == 1 else it[self.genn(len(it))]
def gen():
for batch in it | cli.batched(bs, True):
batch = list(batch); perms = self.genn(len(batch))
for idx in perms: yield batch[idx]
return gen()
class StaggeredStream:
def __init__(self, stream:Iterator[T], every:int):
"""Not intended to be instantiated by the end user. Use :class:`stagger`
instead."""
self.stream = stream; self.every = every
def __iter__(self):
for i, v in zip(range(self.every), self.stream): yield v
def __len__(self):
"""Length of window (length of result if you were to deref it)."""
return self.every
[docs]class stagger(BaseCli):
[docs] def __init__(self, every:int):
"""Staggers input stream into multiple stream "windows" placed serially. Best
explained with an example::
o = range(10) | stagger(3)
o | deref() # returns [0, 1, 2], 1st "window"
o | deref() # returns [3, 4, 5], 2nd "window"
o | deref() # returns [6, 7, 8]
o | deref() # returns [9]
o | deref() # returns []
This might be useful when you're constructing a data loader::
dataset = [range(20), range(30, 50)] | transpose()
dl = dataset | batched(3) | (transpose() | toTensor()).all() | stagger(4)
for epoch in range(3):
for xb, yb in dl: # looping over a window
print(epoch)
# then something like: model(xb)
The above code will print 6 lines. 4 of them is "0" (because we stagger every 4
batches), and xb's shape' will be (3,) (because we batched every 3 samples).
You should also keep in mind that this doesn't really change the property of the
stream itself. Essentially, treat these pairs of statement as being the same thing::
o = range(11, 100)
# both returns 11
o | stagger(20) | item()
o | item()
# both returns [11, 12, ..., 20]
o | head(10) | deref()
o | stagger(20) | head(10) | deref()
Lastly, multiple iterators might be getting values from the same stream window,
meaning::
o = range(11, 100) | stagger(10)
it1 = iter(o); it2 = iter(o)
next(it1) # returns 11
next(it2) # returns 12
This may or may not be desirable. Also this should be obvious, but I want to
mention this in case it's not clear to you."""
self.every = int(every)
[docs] def __ror__(self, it:Iterator[T]) -> StaggeredStream:
return StaggeredStream(iter(it), self.every)
[docs] @staticmethod
def tv(every:int, ratio:float=0.8):
"""Convenience method to quickly stagger train and valid datasets.
Example::
# returns [[16], [4]]
[range(100)]*2 | stagger.tv(20) | shape().all() | deref()"""
return stagger(round(every*ratio)) + stagger(round(every*(1-ratio)))
compareOps = {"__lt__", "__le__", "__eq__", "__ne__", "__gt__", "__ge__"}
[docs]class op(k1lib.Absorber, BaseCli):
[docs] def __init__(self):
"""Absorbs operations done on it and applies it on the stream. Based
on :class:`~k1lib.Absorber`. Example::
# returns 16
4 | op()**2
# returns 16, equivalent to the above
4 | aS(lambda x: x**2)
# returns [0, 1, 4, 9, 16]
range(5) | apply(op()**2) | deref()
# returns [0, 1, 4, 9, 16], equivalent to the above
range(5) | apply(lambda x: x**2) | deref()
Main advantage is that you don't have to waste keystrokes when you just want
to do a simple operation. How it works underneath is a little magical, so just
treat it as a blackbox. A more complex example::
t = torch.tensor([[1, 2, 3], [4, 5, 6.0]])
# returns [torch.tensor([[4., 5., 6., 7., 8., 9.]])]
[t] | (op() + 3).view(1, -1).all() | deref()
Basically, you can treat ``op()`` as the input tensor. Tbh, you
can do the same thing with this::
[t] | applyS(lambda t: (t+3).view(-1, 1)).all() | deref()
But that's kinda long and may not be obvious. This can be surprisingly resilient, as
you can still combine with other cli tools as usual, for example::
# returns [2, 3], demonstrating "&" operator
torch.randn(2, 3) | (op().shape & iden()) | deref() | item()
a = torch.tensor([[1, 2, 3], [7, 8, 9]])
# returns torch.tensor([4, 5, 6]), demonstrating "+" operator for clis and not clis
(a | op() + 3 + iden() | item() == torch.tensor([4, 5, 6])).all()
# returns [[3], [3]], demonstrating .all() and "|" serial chaining
torch.randn(2, 3) | (op().shape.all() | deref())
# returns [[8, 18], [9, 19]], demonstrating you can treat `op()` as a regular function
[range(10), range(10, 20)] | transpose() | filt(op() > 7, 0) | deref()
# returns [3, 4, 5, 6, 7, 8, 9], demonstrating bounds comparison
range(100) | filt(3 <= op() < 10) | deref()
This can only deal with simple operations only. For complex operations, resort
to the longer version ``aS(lambda x: ...)`` instead!
There are also operations that are difficult to achieve, like
``len(op())``, as Python is expecting an integer output, so
``op()`` can't exactly take over. Instead, you have to use :class:`aS`,
or do ``op().ab_len()``. Get a list of all of these special operations
in the source of :class:`~k1lib.Absorber`.
Performance-wise, in most cases, there are no degradation, so don't worry
about it. Everything is pretty much on par with native lambdas::
n = 10_000_000
# takes 1.48s
for i in range(n): i**2
# takes 1.89s, 1.28x worse than for loop
range(n) | apply(lambda x: x**2) | ignore()
# takes 1.86s, 1.26x worse than for loop
range(n) | apply(op()**2) | ignore()
# takes 1.86s
range(n) | (op()**2).all() | ignore()
More complex operations still retains the same speeds, as there's a JIT compiler embedded in::
# takes 2.15s
for i in range(n): (i**2-3)*0.1
# takes 2.53s, 1.18x worse than for loop
range(n) | apply(lambda x: (x**2-3)*0.1) | ignore()
# takes 2.46s, 1.14x worse than for loop
range(n) | apply((op()**2-3)*0.1) | ignore()
Reserved operations that are not absorbed are:
- all
- __ror__ (__or__ still works!)
- ab_solidify
- op_hint"""
super().__init__({"_hint": None})
[docs] @staticmethod
def solidify(f):
"""Static equivalent of ``a.ab_solidify()``.
Example::
f = op()**2
f = op.solidify(f)
If ``f`` is not an ``op``, then just return it without doing anything to it"""
if f.__class__.__name__.split(".")[-1] == "op": f.ab_solidify()
return f
[docs] def __ror__(self, it):
return self.ab_operate(it)
def __or__(self, o):
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__or__(o)
return super().__add__(o)
def __add__(self, o):
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__add__(o)
return super().__add__(o)
def __and__(self, o):
if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__and__(o)
return super().__and__(o)
def __call__(self, *args, **kwargs):
if self._ab_solidified: return self.ab_operate(*args, **kwargs)
return super().__call__(*args, **kwargs)
def _typehint(self, inp):
return self._hint if self._hint is not None else tAny()
[docs] def op_hint(self, _hint):
"""Specify output type hint"""
self._ab_sentinel = True; self._hint = _hint
self._ab_sentinel = False; return self
cli.op = op
[docs]class integrate(BaseCli):
[docs] def __init__(self, dt=1):
"""Integrates the input.
Example::
# returns [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]
range(10) | integrate() | deref()
# returns [0, 2, 6, 12, 20, 30, 42, 56, 72, 90]
range(10) | integrate(2) | deref()
:param dt: Optional small step"""
self.dt = dt
[docs] def __ror__(self, it):
if self.dt == 1:
s = 0
for e in it: s += e; yield s
else:
dt = self.dt; s = 0
for e in it: s += e*dt; yield s