# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for quick modifiers, think of them as changing formats, or modifying an
array in place
"""
__all__ = ["applyS", "aS", "apply", "map_", "applyMp", "parallel", "applyCl",
           "applyTh", "applySerial",
           "sort", "sortF", "consume", "randomize", "stagger", "op",
           "integrate", "roll", "clamp"]
from typing import Callable, Iterator, Any, Union, List, Tuple
from k1lib.cli.init import patchDefaultDelim, BaseCli, fastF; import k1lib.cli.init as init
import k1lib.cli as cli, numpy as np, threading, gc; import k1lib
from collections import deque, defaultdict
from functools import partial, update_wrapper, lru_cache
from k1lib.cli.typehint import *
import dill, pickle, json, k1lib, warnings, atexit, signal, time, os, random, sys
try: import torch; import torch.multiprocessing as mp; hasTorch = True
except: import multiprocessing as mp; hasTorch = False
try: import ray; hasRay = True
except: hasRay = False
settings = k1lib.settings.cli
[docs]class applyS(BaseCli):                                                           # applyS
[docs]    def __init__(self, f:Callable[[Any], Any], *args, **kwargs):                 # applyS
        """Like :class:`apply`, but much simpler, just operating on the entire input
object, essentially. The "S" stands for "single". There's
also an alias shorthand for this called :class:`aS`. Example::
    # returns 5
    3 | aS(lambda x: x+2)
Like :class:`apply`, you can also use this as a decorator like this::
    @aS
    def f(x):n
        return x+2
    # returns 5
    3 | f
This also decorates the returned object so that it has same qualname, docstring
and whatnot.
.. admonition:: Shorthands
    Writing out "lambda x:" all the time is annoying, and there are ways
    to quickly say ``lambda x: x+2`` like so::
        3 | op()+2 # returns 5
        3 | aS("x+2") # returns 5. Behind the scenes, it compiles and execute `lambda x: x+2`
    The first way is to use :class:`op`, that will absorb all operations done on it,
    like "+", and returns a function that essentially replays all the operations.
    In the second way, you only have to pass in the string containing code that you want
    done on the variable "x". Then internally, it will compile to regular Python code.
    In fact, you can pass in ``op()`` or just a string to any cli that accepts any kind
    of function, like :class:`~k1lib.cli.filt.filt` or :class:`apply`::
        range(4) | apply("x-2") | deref()
        range(4) | apply(op()-2) | deref()
        range(4) | filt("x%2") | deref()
        range(4) | filt(op()%2) | deref()
:param f: the function to be executed
:param kwargs: other keyword arguments to pass to the function, together with ``args``""" # applyS
        super().__init__(fs=[f]); self.args = args; self.kwargs = kwargs         # applyS
        self.f = f; self._fC = fastF(f); update_wrapper(self, f, updated=())     # applyS
        self.inverted = False; self.preInvAS = None                              # applyS 
    def _typehint(self, inp):                                                    # applyS
        if self.hasHint: return self._hint                                       # applyS
        try: return self.f._typehint(inp)                                        # applyS
        except: return tAny()                                                    # applyS
[docs]    def __ror__(self, it:Any) -> Any:                                            # applyS
        return self._fC(it, *self.args, **self.kwargs)                           # applyS 
[docs]    def __invert__(self):                                                        # applyS
        """Configures it so that it expand the arguments out.
Example::
    # returns 5
    [2, 3] | ~aS(lambda x, y: x + y)
    def f(x, y, a=4):
        return x*y + a
    # returns 10
    [2, 3] | ~aS(f)
    # returns 11
    [2, 3] | ~aS(f, a=5)"""                                                      # applyS
        if self.inverted: raise Exception("Doesn't support __invert__()ing multiple times") # applyS
        f = self.f; a = self.args; kw = self.kwargs; res = applyS(lambda x: f(*x, *a, **kw)); # applyS
        res.inverted = True; res.preInvAS = self; return res                     # applyS 
    def _jsF(self, meta):                                                        # applyS
        # if len(self.kwargs) > 0: raise Exception("JS does not have the concept of keyword arguments") # applyS
        # if len(self.args) > 0: raise Exception("aS._jsF() doesn't support *args yet") # applyS
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto(); inverted = False # applyS
        if self.inverted: self = self.preInvAS; inverted = True                  # applyS
        # lookup for custom _jsF() functions                                     # applyS
        header, _fIdx, _async = k1lib.kast.asyncGuard(k1lib.kast.prepareFunc3(self.f, ("aS", meta), self.kwargs, self.args)) # applyS
        # TODO: might want to inject args right here, on the JS side, instead of on the Python side # applyS
        if inverted: return f"{header}\nconst {fIdx} = {'async ' if _async else ''}({dataIdx}) => {{ return {'await ' if _async else ''}{dataIdx}.aSInv{'_async' if _async else ''}({_fIdx}); }}", fIdx # applyS
        else: return header, _fIdx                                               # applyS 
        # for x,y in self.pattern | grep("\ue157", sep=True).till("\ue239") | cli.apply(cli.join("")) | cli.filt("x") | cli.apply(lambda x: [x, x.replace("\ue157", "${").replace("\ue239", "}")]): p = p.replace(x, y) # applyS
        # return f"const {fIdx} = ({dataIdx}) => {dataIdx}.grep(`{p}`)", fIdx    # applyS
aS = applyS                                                                      # applyS
def _allOpt_gen(a, ir:List[int], n:int): # a is a complex, deref-ed structure    # _allOpt_gen
    if n == 0: return                                                            # _allOpt_gen
    ir.append(len(a))                                                            # _allOpt_gen
    for e in a: _allOpt_gen(e, ir, n-1)                                          # _allOpt_gen
def _allOpt_genIr(a, n=None) -> "(derefed structure, flattened structure, ir, depth)": # a is a complex, not yet deref-ed structure # _allOpt_genIr
    a = (a | cli.deref()) if n is None else (a | cli.deref(n)); ir = []; n = n or (a | cli.shape() | cli.shape(0)) # _allOpt_genIr
    _allOpt_gen(a, ir, n); return a, list(a | cli.joinStreams(n-1)), ir, n       # _allOpt_genIr
def _allOpt_recover(b:Iterator["data_structure"], ir:Iterator[int], n): # assumes b and ir are iterators # _allOpt_recover
    ans = []; l = next(ir)                                                       # _allOpt_recover
    for i in range(l):                                                           # _allOpt_recover
        if n-1 > 0: ans.append(_allOpt_recover(b, ir, n-1))                      # _allOpt_recover
        else: ans.append(next(b)); #print("  "*(4-n) + f"appended: {ans}");      # _allOpt_recover
    return ans                                                                   # _allOpt_recover
[docs]class apply(BaseCli):                                                            # apply
[docs]    def __init__(self, f:Callable[[Any], Any], column:Union[int, List[int]]=None, cache:int=0, **kwargs): # apply
        """Applies a function f to every element in the incoming list/iterator.
Example::
    # returns [0, 1, 4, 9, 16]
    range(5) | apply(lambda x: x**2) | deref()
    # returns [[3.0, 1.0, 1.0], [3.0, 1.0, 1.0]], running the function on the 0th column
    torch.ones(2, 3) | apply(lambda x: x+2, 0) | deref()
    # returns [[0, -1, 2, 3, -4], [2, -3, 4, 5, -6], [0, -1, 4, 9, -16]], running the function on the 1st (0-index btw) and 4th columns
    [[0, 1, 2, 3, 4], [2, 3, 4, 5, 6], [0, 1, 4, 9, 16]] | apply(lambda x: -x, [1, 4]) | deref()
You can also use this as a decorator, like this::
    @apply
    def f(x):
        return x**2
    # returns [0, 1, 4, 9, 16]
    range(5) | f | deref()
You can also add a cache, like this::
    def calc(i): time.sleep(0.5); return i**2
    # takes 2.5s
    range(5) | repeatFrom(2) | apply(calc, cache=10) | deref()
    # takes 5s
    range(5) | repeatFrom(2) | apply(calc) | deref()
You can add custom keyword arguments into the function::
    def f(x, y, z=3):
        return x + y + z
    # returns [15, 17, 19, 21, 23]
    [range(5), range(10, 15)] | transpose() | ~apply(f, z=5) | deref()
If "apply" is too hard to remember, this cli also has an alias :class:`map_`
that kinda mimics Python's ``map()``. Also slight reminder that you can't pass
in extra positional args like in :class:`aS`, just extra keyword arguments.
See also: :class:`aS`, :class:`~k1lib.cli.filt.filt`
.. admonition:: JS transpiler notes
    So, because JS don't have the concept of keyword arguments, ``kwargs`` will have its values
    extracted, then injected as positional arguments in the transpiled JS function.
:param column: if not None, then applies the function to that column or columns only
:param cache: if specified, then caches this much number of values
:param kwargs: extra keyword arguments to pass in the function"""                # apply
        super().__init__(fs=[f]); self.f = f; self.kwargs = kwargs # f is the original operator, _fC is # apply
        if column: # quick type checks                                           # apply
            ex = Exception(f"Applying a function on a negative-indexed column ({column}) is not supported") # apply
            if isinstance(column, int):                                          # apply
                if column < 0: raise ex                                          # apply
            else:                                                                # apply
                column = list(column)                                            # apply
                if len([c for c in column if c < 0]): raise ex                   # apply
        self.column = column; self.cache = cache; self._fC = fastF(f)            # apply
        if cache > 0: self._fC = lru_cache(cache)(self._fC)                      # apply
        self.normal = self.column is None and self.cache == 0 and len(kwargs) == 0 # cached value to say that this apply is just being used as a wrapper, nothing out of the ordinary, like custom columns, cache or custom kwargs # apply
        if self.normal: # just propagating information upward, to save runtime graph analysis time # apply
            try: self._propagatedF = f._propagatedF; self._applyDepth = f._applyDepth + 1 # assuming f is another apply() # apply
            except: self._propagatedF = f; self._applyDepth = 1                  # apply
        else: self._propagatedF = None; self._applyDepth = 1 # might have to rethink if this depth should be 1 or not # apply
        # optimization 1: BaseCli._all_array_opt(), aimed at accelerating array types # apply
        self.__arrayTypeF = None # None for not formulated yet, 0 for cannot formulate a faster operation, else the cached, accelerated function (that might not work) # apply
        # optimization 2: BaseCli._all_opt(), aimed at accelerating language models # apply
        self.__allOptF = None # None for not formulated yet, 0 for cannot formulate a faster operation, else the cached, accelerated function (that will guaranteed to work) # apply
        self.inverted = False; self.preInvApply = None # for ._jsF(), to contain information about the apply() pre __invert__(), so that .jsF() can extract out information # apply 
    @property                                                                    # apply
    def _arrayTypeF(self): # optimization 1: returns None or the function (that might not work) # apply
        if self.__arrayTypeF == 0: return None                                   # apply
        if self.__arrayTypeF is None:                                            # apply
            arrs = []; last = self # figure out the depth                        # apply
            while isinstance(last, apply) and last.normal: arrs.append(last); last = last.f # apply
            depth = len(arrs)                                                    # apply
            if depth == 0: self.__arrayTypeF = 0; return None                    # apply
            if isinstance(last, cli.serial): # breaks up the serial: (A | B.all(2)).all(3) -> A.all(3) | B.all(5) # apply
                self.__arrayTypeF = cli.serial(*[(e if isinstance(e, BaseCli) else aS(e)).all(depth) for e in last.clis]); return self.__arrayTypeF # apply
            else: # it | A.all(3) -> A._all_array_opt(it, 3). This function might return NotImplemented, which means it can't figure out how to utilize the speed up # apply
                self.__arrayTypeF = aS(lambda it: last._all_array_opt(it, depth)); return self.__arrayTypeF # apply
        return self.__arrayTypeF                                                 # apply
    @property                                                                    # apply
    def _allOptF(self): # optimization 2: returns None or the function (that has to work all the time!) # apply
        if self.__allOptF == 0: return None                                      # apply
        if self._propagatedF is None or not hasattr(self._propagatedF, "_all_opt"): self.__allOptF = 0; return None # apply
        f = self._propagatedF._all_opt                                           # apply
        def inner(it): # has to regenerate the IR on each pass through. Slow (O(30*n) or so), but the function this is supposed to run (LLMs), are even slower, so this is fine for now # apply
            a, af, ir, n = _allOpt_genIr(it, self._applyDepth) # af = a flat     # apply
            return _allOpt_recover(iter(f(af)), iter(ir), n)                     # apply
        return inner                                                             # apply
    def _typehint(self, inp):                                                    # apply
        if self.column is None:                                                  # apply
            if isinstance(inp, tListIterSet):                                    # apply
                try: return tIter(self.f._typehint(inp.child))                   # apply
                except: return tIter(tAny())                                     # apply
        return super()._typehint(inp)                                            # apply
    def _copy(self): return apply(self.f, self.column, self.cache, **self.kwargs) # ~apply() case handled automatically # apply
[docs]    def __ror__(self, it:Iterator[str]):                                         # apply
        c = self.column; f = self._fC; kwargs = self.kwargs                      # apply
        if c is None:                                                            # apply
            if self.normal:                                                      # apply
                if isinstance(it, settings.arrayTypes): # optimization 1         # apply
                    af = self._arrayTypeF                                        # apply
                    if af is not None: # there're lots of code here, but it doesn't impact perf cause it's done once for each array object # apply
                        try:                                                     # apply
                            ans = af(it)                                         # apply
                            if ans is not NotImplemented: return ans             # apply
                        except Exception as e: pass                              # apply
                        self.__arrayTypeF = 0 # tried to use the accelerated version, but failed, so won't ever try the accelerated version again # apply
                elif self._allOptF is not None: return self._allOptF(it) # optimization 2, for LLMs # apply
            return (f(line, **kwargs) for line in it)                            # apply
        elif isinstance(c, int):                                                 # apply
            def gen(it):                                                         # apply
                for row in it: row = list(row); row[c] = f(row[c], **kwargs); yield row # apply
            return gen(it) # return ([(e if i != c else f(e, **kwargs)) for i, e in enumerate(row)] for row in it) # old version # apply
        else: # List[int]                                                        # apply
            def gen(it):                                                         # apply
                for row in it:                                                   # apply
                    row = list(row)                                              # apply
                    for c_ in c: row[c_] = f(row[c_], **kwargs)                  # apply
                    yield row                                                    # apply
            return gen(it)                                                       # apply 
[docs]    def __invert__(self):                                                        # apply
        """Same mechanism as in :class:`applyS`, it expands the
arguments out. Just for convenience really. Example::
    # returns [10, 12, 14, 16, 18]
    [range(5), range(10, 15)] | transpose() | ~apply(lambda x, y: x+y) | deref()""" # apply
        if self.inverted: raise Exception("Doesn't support _invert__()ing multiple times") # apply
        res = apply(lambda x: self._fC(*x, **self.kwargs), self.column, self.cache) # apply
        res.preInvApply = self; res.inverted = True; return res                  # apply 
    def _jsF(self, meta):                                                        # apply
        if self.cache != 0: raise Exception("apply._jsF() doesn't support caching values yet") # apply
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); kwIdx = init._jsDAuto(); argIdx = init._jsDAuto(); inverted = False # apply
        if self.inverted: self = self.preInvApply; inverted = True               # apply
        header, _fIdx, _async = k1lib.kast.asyncGuard(k1lib.kast.prepareFunc3(self.f, ("apply", meta), self.kwargs)) # apply
        return f"{header}\nconst {kwIdx} = {json.dumps(self.kwargs)};\nconst {fIdx} = {'async ' if _async else ''}({dataIdx}) => {dataIdx}.apply{'_async' if _async else ''}({'async ' if _async else ''}({argIdx}) => {'await ' if _async else ''}{_fIdx}({'...' if inverted else ''}{argIdx}), {cli.kjs.v(self.column)}, {kwIdx}, false)", fIdx # apply
                                                                                 # apply
        # old code below, with args (self, kast, jsFnVars:"list[str]", **kwargs) # apply
        argVars = kast.kast_lambda(self.f); var = ",".join(argVars)              # apply
        fn, header = kast.kast_prepareFunc(self.f, [*argVars, *jsFnVars])        # apply
        col = self.column                                                        # apply
        if col is None: return f".apply(({var}) => {fn})", header                # apply
        else:                                                                    # apply
            cols = [col] if isinstance(col, int) else col                        # apply
            return "".join([f".apply(({var}) => {fn}, {c})" for c in cols]), header # apply 
map_ = apply                                                                     # apply
cloudpickle = k1lib.dep("cloudpickle")                                           # apply
def executeFunc(common, line, usingDill):                                        # executeFunc
    import time                                                                  # executeFunc
    if usingDill:                                                                # executeFunc
        import dill; f, kwargs = dill.loads(common)                              # executeFunc
        res = f(dill.loads(line), **kwargs)                                      # executeFunc
    else:                                                                        # executeFunc
        import cloudpickle; f, kwargs = cloudpickle.loads(common)                # executeFunc
        res = f(cloudpickle.loads(line), **kwargs)                               # executeFunc
    time.sleep(0.1); return res # suggestion by https://stackoverflow.com/questions/36359528/broken-pipe-error-with-multiprocessing-queue # executeFunc
def terminateGraceful(): signal.signal(signal.SIGINT, signal.SIG_IGN)            # terminateGraceful
_k1_applyMp_global_ctx = {}; _k1_applyMp_global_ctx_autoInc = k1lib.AutoIncrement(prefix="_k1_applyMp") # terminateGraceful
[docs]class applyMp(BaseCli):                                                          # applyMp
    _pools = set()                                                               # applyMp
    _torchNumThreads = None                                                      # applyMp
[docs]    def __init__(self, f:Callable[[Any], Any], prefetch:int=None, timeout:float=8, utilization:float=0.8, bs:int=1, newPoolEvery:int=0, **kwargs): # applyMp
        """Like :class:`apply`, but execute a function over the input iterator
in multiple processes. Example::
    # returns [3, 2]
    ["abc", "de"] | applyMp(len) | deref()
    # returns [5, 6, 9]
    range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | deref()
    # returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work
    someList = [1, 2, 3]
    ["abc", "de"] | applyMp(lambda s: someList) | deref()
Internally, this will continuously spawn new jobs up until 80% of all CPU
cores are utilized. On posix systems, the default multiprocessing start method is
``fork()``. This sort of means that all the variables in memory will be copied
over. On windows and macos, the default start method is ``spawn``, meaning each
child process is a completely new interpreter, so you have to pass in all required
variables and reimport every dependencies. Read more at https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
If you don't wish to schedule all jobs at once, you can specify a ``prefetch``
amount, and it will only schedule that much jobs ahead of time. Example::
    range(10000) | applyMp(lambda x: x**2)    | head() | deref() # 700ms
    range(10000) | applyMp(lambda x: x**2, 5) | head() | deref() # 300ms
    # demonstrating there're no huge penalties even if we want all results at the same time
    range(10000) | applyMp(lambda x: x**2)    | deref() # 900ms
    range(10000) | applyMp(lambda x: x**2, 5) | deref() # 1000ms
The first line will schedule all jobs at once, and thus will require more RAM and
compute power, even though we discard most of the results anyway (the
:class:`~k1lib.cli.filt.head` cli). The second line only schedules 5 jobs ahead of
time, and thus will be extremely more efficient if you don't need all results right
away.
.. note::
    Remember that every :class:`~k1lib.cli.init.BaseCli` is also a
    function, meaning that you can do stuff like::
        # returns [['ab', 'ac']]
        [["ab", "cd", "ac"]] | applyMp(filt(op().startswith("a")) | deref()) | deref()
    Also remember that the return result of ``f`` should be serializable, meaning it
    should not be a generator. That's why in the example above, there's a ``deref()``
    inside f. You should also convert PyTorch tensors into Numpy arrays
Most of the time, you would probably want to specify ``bs`` to something bigger than 1
(may be 32 or sth like that). This will executes ``f`` multiple times in a single job,
instead of executing ``f`` only once per job. Should reduce overhead of process
creation dramatically.
If you encounter strange errors not seen on :class:`apply`, you can try to clear all
pools (using :meth:`clearPools`), to terminate all child processes and thus free
resources. On earlier versions, you have to do this manually before exiting, but now
:class:`applyMp` is much more robust.
Also, you should not immediately assume that :class:`applyMp` will always be faster
than :class:`apply`. Remember that :class:`applyMp` will create new processes,
serialize and transfer data to them, execute it, then transfer data back. If your code
transfers a lot of data back and forth (compared to the amount of computation done), or
the child processes don't have a lot of stuff to do before returning, it may very well
be a lot slower than :class:`apply`.
There's a potential loophole here that can make your code faster. Because the main
process is forked (at least on linux), every variable is still there, even the big
ones. So, you can potentially do something like this::
    bigData = [] # 1B items in the list
    # summing up all items together. No input data transfers (because it's forked instead)
    range(1_000_000_000) | batched(100) | applyMp(lambda r: r | apply(lambda i: bigData[i]) | toSum()) | toSum()
In fact, I use this loophole all the time, and thus has made the function :meth:`shared`,
so check it out.
:param prefetch: if not specified, schedules all jobs at the same time. If
    specified, schedules jobs so that there'll only be a specified amount of
    jobs, and will only schedule more if results are actually being used.
:param timeout: seconds to wait for job before raising an error
:param utilization: how many percent cores are we running? 0 for no cores, 1 for
    all the cores. Defaulted to 0.8
:param bs: if specified, groups ``bs`` number of transforms into 1 job to be more
    efficient.
:param kwargs: extra arguments to be passed to the function. ``args`` not
    included as there're a couple of options you can pass for this cli.
:param newPoolEvery: creates a new processing pool for every specific amount of input
    fed. 0 for not refreshing any pools at all. Turn this on in case your process consumes
    lots of memory and you have to kill them eventually to free up some memory""" # applyMp
        super().__init__(fs=[f]); self.f = fastF(f)                              # applyMp
        self.prefetch = prefetch or int(1e9)                                     # applyMp
        self.timeout = timeout; self.utilization = utilization                   # applyMp
        self.bs = bs; self.kwargs = kwargs; self.p = None                        # applyMp
        self.newPoolEvery = newPoolEvery; self.ps = []; self._serializeF = True  # applyMp 
[docs]    def __ror__(self, it:Iterator[Any]) -> Iterator[Any]:                        # applyMp
        timeout = self.timeout; it = iter(it); f = self.f # really make sure it's an iterator, for prefetch # applyMp
        if self.bs > 1: return it | cli.batched(self.bs, True) | applyMp(apply(f) | cli.toList(), self.prefetch, timeout, **self.kwargs) | cli.joinStreams() # applyMp
        def newPool():                                                           # applyMp
            if hasTorch:                                                         # applyMp
                try: applyMp._torchNumThreads = applyMp._torchNumThreads or torch.get_num_threads(); torch.set_num_threads(1) # applyMp
                except: pass # why do all of this? Because some strange interaction between PyTorch and multiprocessing, outlined here: https://github.com/pytorch/pytorch/issues/82843 # applyMp
            os.environ["py_k1lib_in_applyMp"] = "True"                           # applyMp
            self.p = mp.Pool(int(mp.cpu_count()*self.utilization), terminateGraceful); self.ps.append(self.p) # applyMp
            if hasTorch and applyMp._torchNumThreads is not None: torch.set_num_threads(applyMp._torchNumThreads) # applyMp
        def intercept(it, n):                                                    # applyMp
            for i, e in enumerate(it):                                           # applyMp
                if i % n == 0:                                                   # applyMp
                    if self.p is not None: self.p.close(); self.ps.remove(self.p) # applyMp
                    gc.collect(); newPool()                                      # applyMp
                yield e                                                          # applyMp
        try: common = dill.dumps([f, self.kwargs]); usingDill = True             # applyMp
        except: common = cloudpickle.dumps([f, self.kwargs]); usingDill = False  # applyMp
        def gen(it):                                                             # applyMp
            with k1lib.captureStdout(False, True) as out:                        # applyMp
                try:                                                             # applyMp
                    if self.newPoolEvery > 0: it = intercept(it, self.newPoolEvery) # applyMp
                    else: newPool()                                              # applyMp
                    fs = deque()                                                 # applyMp
                    for i, line in zip(range(self.prefetch), it):                # applyMp
                        fs.append(self.p.apply_async(executeFunc, [common, dill.dumps(line), usingDill])) # applyMp
                    for line in it:                                              # applyMp
                        yield fs.popleft().get(timeout)                          # applyMp
                        fs.append(self.p.apply_async(executeFunc, [common, dill.dumps(line), usingDill])) # applyMp
                    for f in fs: yield f.get(timeout)                            # applyMp
                except KeyboardInterrupt as e:                                   # applyMp
                    print("applyMp interrupted. Terminating pool now")           # applyMp
                    for p in self.ps: p.close(); p.terminate();                  # applyMp
                    raise e                                                      # applyMp
                except Exception as e:                                           # applyMp
                    print("applyMp encounter errors. Terminating pool now")      # applyMp
                    for p in self.ps: p.close(); p.terminate();                  # applyMp
                    raise e                                                      # applyMp
                else:                                                            # applyMp
                    for p in self.ps: p.close(); p.terminate();                  # applyMp
        return gen(it)                                                           # applyMp 
[docs]    @staticmethod                                                                # applyMp
    def cat(fileName: str, f:Callable, n:int=None, rS=None, **kwargs):           # applyMp
        """Like :meth:`applyCl.cat`, this will split a file up into multiple
sections, execute ``f`` over all sections and return the results.
Example::
    fn = "~/repos/labs/k1lib/k1lib/cli/test/applyMp.cat"
    "0123456789\\n"*100 | file(fn)
    # returns [6, 6, 6, 7, 6, 6, 6, 7, 6, 6, 6, 7, 6, 6, 6, 8]
    applyMp.cat(fn, shape(0), 16) | deref()
:param f: function to execute on an iterator of lines
:param n: how many chunks should it split the file into. Defaulted to the number of cpu cores available
:param rS: :class:`~k1lib.cli.inp.refineSeek` instance, if you need more fine-grained
    control over section boundaries so as to not make everything corrupted
:param kwargs: extra keyword arguments for :class:`applyMp`"""                   # applyMp
        return fileName | cli.splitSeek(n or os.cpu_count()) | (rS or cli.iden()) | cli.window(2) | ~applyMp(lambda x,y: cli.cat(fileName, sB=x, eB=y) | f, **kwargs) # applyMp 
[docs]    @staticmethod                                                                # applyMp
    def shared(f, **kwargs):                                                     # applyMp
        """Execution model where the input iterator is dereferenced and shared across
all processes, bypassing serialization. Example::
    a = range(1_000_000_000) | apply(lambda x: x*1.5 - 2000) | aS(list) # giant data structure
    a | batched(50_000_000, True) | applyMp(toSum()) | toSum() # has to serialize and deserialize lists of numbers, which wastes lots of cpu cycles and memory
    a | applyMp.shared(toSum()) | toSum() # giant data structure is forked, no serialization happens, no memory even gets copied, much faster
In the 2nd line, most of the time is spent on serializing the data and transferring
it to other processes, while in the 3rd line, most of the time is spent on calculating
the sum instead, as the giant data structure is forked, and Linux doesn't copy it internally.""" # applyMp
        def inner(it):                                                           # applyMp
            try: n = len(it)                                                     # applyMp
            except: it = list(it); n = len(it)                                   # applyMp
            # this is pretty unintuitive right? Why do it this way? Turns out, if you were to reference `it` directly, it will store it in f's co_freevars, # applyMp
            # which will be serialized, defeating the purpose. Moving it to a global variable forces it to move to co_names instead, avoiding serialization. This took forever to understand # applyMp
            idx = _k1_applyMp_global_ctx_autoInc(); _k1_applyMp_global_ctx[idx] = it # applyMp
            res = range(n) | cli.batched(round(n/os.cpu_count()+1), True) | applyMp(lambda r: f(_k1_applyMp_global_ctx[idx][r.start:r.stop]), **kwargs) | aS(list) # applyMp
            _k1_applyMp_global_ctx[idx] = None; return res                       # applyMp
        return aS(inner)                                                         # applyMp 
    def _copy(self): return applyMp(self.f, self.prefetch, self.timeout, self.utilization, self.bs, self.newPoolEvery, **self.kwargs) # applyMp
[docs]    def __invert__(self):                                                        # applyMp
        """Expands the arguments out, just like :class:`apply`.
Example::
    # returns [20, 20, 18, 14, 8, 0, -10, -22, -36, -52]
    [range(10), range(20, 30)] | transpose() | ~applyMp(lambda x, y: y-x**2) | deref()""" # applyMp
        res = self._copy(); f = res.f; res.f = lambda x: f(*x); return res       # applyMp 
[docs]    @staticmethod                                                                # applyMp
    def clearPools():                                                            # applyMp
        """Terminate all existing pools. Do this before restarting/quitting the
script/notebook to make sure all resources (like GPU) are freed. **Update**:
you probably won't have to call this manually anymore since version 0.9, but
if you run into problems, try doing this."""                                     # applyMp
        for p in applyMp._pools:                                                 # applyMp
            try: p.terminate()                                                   # applyMp
            except: pass                                                         # applyMp
        applyMp._pools = set()                                                   # applyMp 
[docs]    @staticmethod                                                                # applyMp
    def pools():                                                                 # applyMp
        """Get set of all pools. Meant for debugging purposes only."""           # applyMp
        return applyMp._pools                                                    # applyMp 
    def __del__(self):                                                           # applyMp
        return                                                                   # applyMp
        if hasattr(self, "p"):                                                   # applyMp
            self.p.terminate();                                                  # applyMp
            if self.p in applyMp._pools: applyMp._pools.remove(self.p)           # applyMp 
# apparently, this doesn't do anything, at least in jupyter environment          # applyMp
atexit.register(lambda: applyMp.clearPools())                                    # applyMp
parallel = applyMp                                                               # applyMp
s = k1lib.Settings(); settings.add("applyCl", s, "modifier.applyCl() settings")  # applyMp
s.add("sudoTimeout", 300, "seconds before deleting the stored password for sudo commands") # applyMp
s.add("cpuLimit", None, "if specified (int), will not schedule more jobs if the current number of assigned cpus exceeds this") # applyMp
_password = k1lib.Wrapper(None); _cpuUsed = k1lib.Wrapper(0)                     # applyMp
def removePw():                                                                  # removePw
    while True: time.sleep(settings.applyCl.sudoTimeout); _password.value = None # removePw
t = threading.Thread(target=removePw, daemon=True).start()                       # removePw
_nodeIdsCache = k1lib.Wrapper([])                                                # removePw
def specificNode(f, nodeId:str, num_gpus=0): # modify a function so that it will only run on a specific node only # specificNode
    if num_gpus > 0:                                                             # specificNode
        #return f.options(num_gpus=num_gpus, scheduling_strategy="SPREAD")       # specificNode
        return f.options(num_gpus=num_gpus, scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(node_id=nodeId, soft=False)) # specificNode
    else: return f.options(scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(node_id=nodeId, soft=False)) # specificNode
def exportSe(se):                                                                # exportSe
    if not isinstance(se, k1lib.Settings): return se                             # exportSe
    return {k:exportSe(v) for k,v in se.__dict__.items() if not k.startswith("_")} # exportSe
def movePropsSe(obj, se):                                                        # movePropsSe
    d = se.__dict__; keys = [e for e in d.keys() if not e.startswith("_")]       # movePropsSe
    for key in keys:                                                             # movePropsSe
        if key not in obj: continue                                              # movePropsSe
        if isinstance(d[key], k1lib.Settings): movePropsSe(obj[key], d[key])     # movePropsSe
        else: d[key] = obj[key]                                                  # movePropsSe
_applyCl_soCache = set() # dynamic library (.so) that has been installed across all nodes, so don't have to reimport # movePropsSe
[docs]class applyCl(BaseCli):                                                          # applyCl
[docs]    def __init__(self, f, prefetch=None, timeout=60, bs=1, rss:Union[dict, str]={}, pre:bool=False, num_cpus=1, num_gpus=0, memory=None, resolve=True, **kwargs): # applyCl
        """Like :class:`apply`, but execute a function over the input iterator
in multiple processes on multiple nodes inside of a cluster (hence "cl"). So, just a more
powerful version of :class:`applyMp`, assuming you have a cluster to run it on.
Example::
    # returns [3, 2]
    ["abc", "de"] | applyCl(len) | deref()
    # returns [5, 6, 9]
    range(3) | applyCl(lambda x, bias: x**2+bias, bias=5) | deref()
    # returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work
    someList = [1, 2, 3]
    ["abc", "de"] | applyCl(lambda s: someList) | deref()
    nIds = applyCl.nodeIds()
    # returns [[<nodeId1>, 0], [<nodeId2>, 1], ...], demonstrating preserve mode
    [nIds, range(10)] | transpose() | applyCl(lambda x: x**2, pre=True) | deref()
    # executes the function, but stores the result on remote nodes, instead of copying result to this node
    a = range(5) | applyCl(lambda x: x**2, resolve=False) | deref()
    # returns [0, 1, 4, 9, 16]
    a | applyCl(lambda x: x) | deref()
Summary of all mode of operations::
    # Data types:
    # - 1:   literal value, just a normal Python object
    # - or1: ray.ObjectRef object - Ray's reference to a remote object living somewhere
    # - h1:  Handle        object - k1lib's reference to a remote object, obtained if `resolve` is set to False. Use `h.get()` to
    # - n1:  node id, string
    [1/or1/h1, 2/or2/h2] | applyCl(...)                                              # returns [1, 2, 3]. "1/or1/h1" means that the input can be a list of literals, ObjectRef, or Handle
    [1/or1/h1, 2/or2/h2] | applyCl(..., resolve=False)                               # returns [h1, h2, h3]
    [[n1/h1,   1/or1/h3], [n2/h2, 2/or2/h4]] | applyCl(..., pre=True)                # returns [[n1/h1, 1],  [n2/h2, 2]], executed on n1/h1, h3 is copied over
    [[n1/h1,   1/or1/h3], [n2/h2, 2/or2/h4]] | applyCl(..., pre=True, resolve=False) # returns [[n1/h1, h3], [n2/h2, h4]]
    [n1, n2] | applyCl.aS(lambda: ...)                # returns [[n1, 1],  [n2, 2]]
    None     | applyCl.aS(lambda: ...)                # returns [[n1, 1],  [n2, 2], ...], executes once on all nodes
    [n1, n2] | applyCl.aS(lambda: ..., resolve=False) # returns [[n1, h1], [n2, h2]]
Internally, this uses the library Ray (https://www.ray.io) to do the heavy
lifting. So, :class:`applyCl` can be thought of as a thin wrapper around that
library, but still has the same consistent interface as :class:`apply` and
:class:`applyMp`. From all of my tests so far, it seems that :class:`applyCl`
works quite well and is quite robust, so if you have access to a cluster, use
it over :class:`applyMp`.
The library will connect to a Ray cluster automatically when you import
everything using ``from k1lib.imports import *``. It will execute
``import ray; ray.init()``, which is quite simple. If you have ray installed,
but does not want this default behavior, you can do this::
    import k1lib
    k1lib.settings.startup.init_ray = False
    from k1lib.imports import *
As with :class:`applyMp`, there are pitfalls and weird quirks to multiprocessing,
on 1 or multiple nodes, so check out the docs over there to be aware of them,
as those translates well to here.
There're more extensive documentation on these notebooks: `27-multi-node <https://mlexps.com/other/27-multi-node/>`_,
`30-applyCl-benchmarks <https://mlexps.com/other/30-applyCl-benchmarks/>`_, if you want to kinda get the feel of this
tool more.
.. admonition:: Time sharing the cluster
    Let's say that the cluster is located in a company, and that multiple users want to
    access it, then you might have to think about it a little more. Say the cluster has
    60 cores, and someone has launched a long-running job: 2160 tasks, 10 minutes/task,
    1 core/task, totalling 6 hours. If you want to launch another job that has 20 tasks,
    requiring 10 cores, 1 second/task, totalling 2 seconds on an idle cluster.
    All modern schedulers (Ray, Slurm, Spark, etc) can't schedule your 20 tasks immediately.
    It has to wait for some running tasks to finish to schedule your task. This means you
    have to wait on average for 5-10 minutes before all of your tasks finish. This might be
    fine if you've used Slurm a lot, but extremely not okay for me and my patience. The whole
    point of a cluster is to get results immediately, within a few seconds. So here's a
    workaround::
        # long running task, on notebook 1
        from k1lib.imports import *
        settings.cli.applyCl.cpuLimit = 40
        range(2160) | applyCl(lambda x: time.sleep(10*60)) | ignore() # long running task
        # short running task, on notebook 2
        from k1lib.imports import *
        range(20) | applyCl(lambda x: time.sleep(1)) | ignore() # short running task, should finishes almost immediately
    Essentially, there's that setting that you can adjust. Like with Ray's ``num_cpus``,
    this is merely a suggestion to my library to not schedule jobs past that cpu limit,
    but you can circumvent it in some strange edge cases that I'm too lazy to implement.
    Likewise, when you schedule a Ray task, you can specify that it will only take 1 cpu,
    but you can end up forking it into 5 different processes, which can cause congestion
    and memory thrashing. If Ray doesn't do it right (possibly impossible to do so anyway)
    then do I really have to?
.. admonition:: Advanced use case
    Not really advanced, but just a bit difficult to understand/follow. Let's say
    that you want to scan through the home directory of all nodes, grab all files,
    read them, and get the number of bytes they have. You can do something like this::
        a = None | applyCl.aS(lambda: None | cmd("ls ~") | filt(os.path.isfile) | deref()) | deref()
        b = a | ungroup() | deref()
        c = b | applyCl(cat(text=False) | shape(0), pre=True) | deref()
        d = c | groupBy(0, True) | apply(item().all() | toSum(), 1) | deref()
    Noted, this is relatively complex. Let's see what A, B, C and D looks like::
        # A
        [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', ['Miniconda3-latest-Linux-x86_64.sh', 'mintupgrade-2023-04-01T232950.log']],
         ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', ['5a', 'abc.jpg', 'a.txt']]]
        # B
        [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 'Miniconda3-latest-Linux-x86_64.sh'],
         ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 'mintupgrade-2023-04-01T232950.log'],
         ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', '5a'],
         ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 'abc.jpg'],
         ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 'a.txt']]
        # C
        [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 74403966],
         ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 1065252],
         ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 2601],
         ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 16341],
         ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 10177]]
        # D
        [['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 92185432],
         ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 75469218]]
    The steps we're concerned with is A and C. In step A, we're running 2 processes, 1 for each
    node, to get all the file names in the home directory. In step C, we're running 5 processes
    total, 2 on the first node and 3 on the second node. For each process, it's going to read as
    bytes and count up those bytes. Finally in step D, the results are grouped together and the
    sizes summed.
    So yeah, it's pretty nice that we did all of that in a relatively short amount of code.
    The data is distributed too (reading multiple files from multiple nodes), so we're truly
    not bottlenecked by anything.
.. admonition:: Context object handle
    Let's say you have these unresolved handles::
        # creates a bunch of infinite random number generators, one on each node
        its = None | applyCl.aS(lambda: repeatF(lambda: random.randint(2, 15)), resolve=False) | deref()
        # gets the next value of all generators, can return [4, 4, 9, 2] for a 4-node cluster
        its | cut(1) | applyCl(lambda x: next(x)) | deref()
        # gets the next value of all generators, add 0 to the 1st generator, 1 to the 2nd generator, etc, then return the resulting output that might look like [3, 16, 10, 7]
        [its | cut(1), range(4)] | transpose() | applyCl(lambda x: next(ctxHandle) + x, pre=True) | cut(1) | deref()
    So, the special thing about this is that variable `ctxHandle` on the last line. That is a
    special variable that is injected on the way. Why all this complexity?
    The whole idea with unresolved object handles is that you have a distributed complex
    data structure that can't be serialized and juggle around easily. That's the `its` handles
    in the example. Then, you might want to feed in some (simple, serializable) input X,
    change the complex data structure in its own process, then return some (simple, serializable)
    output Y. In the example, X is range(4), while Y is the resulting number array.
.. admonition:: Cython
    Even with running everything distributedly like this, you might run into speed issues.
    Then, you'll essentially have 2 options. First is to write (pleasant) Cython code, or
    second is to write (unpleasant) C/C++ Python extensions. If you were to choose the C/C++
    option, then here's the flow:
    - Develop Python C extension, export everything as a shared library (a single .so file)
    - Execute ``applyCl.installSo("library.so")`` to install the library to all nodes
    - Use functions provided by your library normally, like ``import yourlibrary; range(10) | applyCl(yourlibrary.afunction) | deref()``
    But applyCl can deal with cython functions directly in your notebook. Here's the flow:
    - Annotate a code cell with the magic "%%cython", write Cython code as usual
    - Just use that function normally
    Let's see an example::
        # ---------- code cell 1 ----------
        from k1lib.imports import *      # cython ipython extension is automatically loaded
        # ---------- code cell 2 ----------
        %%cython
        from k1lib.cli import ls         # demonstrating that you can use all of the existing tools and libraries as usual
        cdef g(a:int): return f"{a} 123" # demonstrating that you can refactor out to other functions
        def f (a:int): return [g(a), ls(".")]
        # ---------- code cell 3 ----------
        range(10) | applyCl(f) | deref()
    You only have to install Cython on the current node and not the other nodes. Also note
    that currently, this only supports you passing in Cython-compiled functions directly into
    ``applyCl()`` or ``applyCl.aS()``. You can't pass a normal Python function that uses a
    Cython function like this::
        # ---------- code cell 1 ----------
        from k1lib.imports import *
        # ---------- code cell 2 ----------
        %%cython
        from k1lib.cli import ls          # note: have to reimport here because all the symbols inside this code block is independent from the rest of the notebook
        cpdef g(a:int): return f"{a} 123"
        # ---------- code cell 3 ----------
        def f (a:int): return [g(a), ls(".")]
        range(10) | applyCl(f) | deref() # this throws an import error, as the compiled code won't be installed on the remote nodes
    This behavior can potentially be fixed in the future, but I'm lazy and it's not a hard
    thing to follow the rules. The dynamic library will be installed in the working directory.
    You can delete them after a coding session to free up some space, but they're likely to be
    tiny, so you don't really have to worry about it.
    Also, like everything else in parallel programming, please benchmark absolutely everything
    because it might even be slower using Cython if internally you're allocating space for
    large data structures constantly, compared to cli tool's lazy execution model. For operations
    that work on giant files, I actually find it very difficult to gain any appreciable speedups
    using Cython, as cli tools are already pretty optimized, so best task for this is probably
    long-running, complex mathematical modelling, and not generic text manipulation.
.. warning::
    Just like with any other parallel processing model, there are some quirks that
    can happen behind the scenes that aren't quite what you expected, as this is
    incredibly tricky. Dig into Ray's serialization page (https://docs.ray.io/en/latest/ray-core/objects/serialization.html)
    or their whitepapers (https://docs.ray.io/en/latest/ray-contribute/whitepaper.html)
    to get a feel for how it all works underneath. The notable quirks that you might need to think about is:
    - A lot of the internal code assumes that you're on a Unix system, preferably Linux,
      so it might not work on other platforms like Windows. But honestly, screw Windows.
:param prefetch: if not specified, schedules all jobs at the same time. If
    specified, schedules jobs so that there'll only be a specified amount of
    jobs, and will only schedule more if results are actually being used.
:param timeout: seconds to wait for job before raising an error
:param bs: if specified, groups ``bs`` number of transforms into 1 job to be more
    efficient.
:param rss: resources required for the task. Can be {"custom_resource1": 2} or "custom_resource1" as a shortcut
:param pre: "preserve", same convention as :meth:`applyCl.aS`. If True, then allow passing
    through node ids as the first column to shedule jobs on those specific nodes only
:param num_cpus: how many cpu does each task take?
:param memory: how much memory to give to the task in bytes?
:param resolve: whether to resolve the outputs or not. Set this to False to not move
    memory to the requesting node and cache the big data structure on the remote node
:param kwargs: extra arguments to be passed to the function. ``args`` not
    included as there're a couple of options you can pass for this cli."""       # applyCl
        super().__init__(fs=[f]); _fC = fastF(f); self.ogF = f; self.pre = pre; self.num_cpus = num_cpus # applyCl
        try: # f might be a string, so can't do f.__module__                     # applyCl
            isCythonFunc = "cython" in f.__module__                              # applyCl
            if isCythonFunc: applyCl.installSo(sys.modules[f.__module__].__file__) # applyCl
        except: isCythonFunc = False                                             # applyCl
        self.rss = rss = {rss: 1} if isinstance(rss, str) else rss               # applyCl
        cwd = os.getcwd(); se = exportSe(k1lib.settings)                         # applyCl
        def remoteF(s, e, idxCtxHandle=None): # function that will be executed on remote node. Have to setup environment a little bit before executing # applyCl
            # e: the real element. s is just e's storage context, in case e is a Handle. Else s is not used and can be None. Why? Because Actors can't be # applyCl
            # serialized directly with cloudpickle, but it can be passed as function parameters, which Ray will do some special sauce serialization # applyCl
            import k1lib; movePropsSe(se, k1lib.settings) # do this to sync current settings with the remote worker nodes # applyCl
            if k1lib.settings.startup.or_patch.numpy: k1lib.cli.init.patchNumpy() # applyCl
            if k1lib.settings.startup.or_patch.dict: k1lib.cli.init.patchDict()  # applyCl
            if k1lib.settings.startup.or_patch.pandas: k1lib.cli.init.patchPandas() # applyCl
            import os; os.makedirs(cwd, exist_ok = True); os.chdir(cwd)          # applyCl
            if isinstance(e, Handle): e.setStorage(s); e = e.get()               # applyCl
            if idxCtxHandle: _fC.__globals__["ctxHandle"] = s.d[idxCtxHandle]    # applyCl
            return _fC(e, **kwargs)                                              # applyCl
        # self.remoteF = remoteF; f = ray.remote(resources=rss, num_cpus=num_cpus, **({"memory": memory} if memory else {}))(remoteF) # applyCl
        rssKw = {"resources": rss, "num_cpus": num_cpus, "num_gpus": num_gpus, **({"memory": memory} if memory else {})}#; rssKw = None # applyCl
        self.prefetch = prefetch or int(1e8); self.timeout = timeout; self.bs = bs # applyCl
        self._copyCtx = lambda: [f, [prefetch, timeout, bs, rss, pre, num_cpus, num_gpus, memory, resolve], kwargs] # applyCl
        nodeId = applyCl.nodeId(); rssF = ray.remote(**rssKw)(remoteF) # f that has constraints injected into it # applyCl
        def preprocessF(e): # return Handle (if pre=False), or [nodeId, Handle] (if pre=True). f is remoteF, core element can be a Handle, real object, or ObjectRef # applyCl
            if resolve: # storage location managed by ray, returns or2/h2 or [nId/h1, or2/h2] # applyCl
                if pre:                                                          # applyCl
                    a, b = e; s = extractStorage(b)                              # applyCl
                    if isinstance(a, Handle): h = a.deposit(b); return [a, h.executeAsync(remoteF, rssKw, a.idx)], [a,h] # applyCl
                    else: return [a, specificNode(rssF, a, num_gpus=num_gpus).remote(s, b)], [] # applyCl
                else:                                                            # applyCl
                    if isinstance(e, Handle): return e.executeAsync(remoteF, rssKw), [e] # applyCl
                    else: return rssF.remote(None, e), []                        # applyCl
            else: # storage location explicitly managed by me, returns h2 or [nId/h1, h2] # applyCl
                storageWarmup()                                                  # applyCl
                if pre:                                                          # applyCl
                    a, b = e; s = extractStorage(b)                              # applyCl
                    if isinstance(a, Handle): # [h1, 2/or2/h2]                   # applyCl
                        h = a.deposit(b) # first deposits b into a's storage context to get handle. h and a have the same storage context # applyCl
                        a.report("report7"); b.report("report7")                 # applyCl
                        return [a, h.executeAsync(remoteF, rssKw, a.idx)], [a,b,h] # then executes it in h's storage context # applyCl
                    else: # [nId, 2/or2/h2]                                      # applyCl
                        if isinstance(b, Handle) and b.nodeId == nodeId: return [a, b.executeAsync(remoteF, rssKw)], [b] # [nId, h2], if h2 is on nId, then use h2's storage context # applyCl
                        h = Handle.create(b, a, num_gpus=num_gpus); return [a, h.executeAsync(remoteF, rssKw)], [h] # create storage context on `a`, deposits b on it, then execute # applyCl
                else: # 1/or1/h1                                                 # applyCl
                    s = extractStorage(e)                                        # applyCl
                    if isinstance(e, Handle): return e.executeAsync(remoteF, rssKw), [e] # has storage context already, execute on it directly # applyCl
                    else: h = Handle.create(e, num_gpus=num_gpus); return h.executeAsync(remoteF, rssKw), [h] # create storage context, deposit e on it, then execute # applyCl
        @ray.remote                                                              # applyCl
        def resolveFRemote(o): return 1                                          # applyCl
        def resolveF(e):                                                         # applyCl
            e = e[0] # why do this? Because of a pretty obscure bug related to the reference counting mechanism I have here # applyCl
            # in preprocessF(), it returns [meats, Python references to (potential) Handles] # applyCl
            # I have to do this in order to keep the Handles alive. After resolveF(), everything is settled, so # applyCl
            # old Handles can safely be deleted                                  # applyCl
            if resolve:                                                          # applyCl
                if pre: a, b = e; return [a, b.block().get() if isinstance(b, Handle) else ray.get(b)] # applyCl
                else: return e.block().get() if isinstance(e, Handle) else ray.get(e) # applyCl
            else:                                                                # applyCl
                return [e[0], e[1].block()] if pre else e.block() # don't resolve to this node, but still block execution until that object is resolvable # applyCl
        self.preprocessF = preprocessF; self.resolveF = resolveF; applyCl.preprocessF = preprocessF; applyCl.resolveF = resolveF # references for lprun so that I can benchmark these 2 functions # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def installSo(fn:str, force:bool=False):                                     # applyCl
        """Installs dynamic library (.so file) to all nodes.
:param fn: file name of the shared library
:param force: force reinstall even if the library is already on the remote node""" # applyCl
        basename = os.path.basename(fn)                                          # applyCl
        if not force and basename in _applyCl_soCache: return                    # applyCl
        print("Installing dynamic library to all nodes... ", end=""); _applyCl_soCache.add(basename); contents = cli.cat(fn, False) # applyCl
        None | applyCl.aS(lambda: contents | cli.file(basename)) | cli.ignore(); print("Done") # applyCl 
[docs]    def __ror__(self, it):                                                       # applyCl
        timeout = self.timeout; bs = self.bs; ogF = self.ogF; preprocessF = self.preprocessF; resolveF = self.resolveF # applyCl
        if bs > 1: return it | cli.batched(bs, True) | applyCl(lambda x: x | apply(ogF) | cli.aS(list), self.prefetch, timeout) | cli.joinStreams() # applyCl
        def gen(it):                                                             # applyCl
            futures = deque(); it = iter(it); n = self.num_cpus; limit = settings.applyCl.cpuLimit or int(1e9) # applyCl
            for i, e in zip(range(min(self.prefetch, (limit-_cpuUsed.value)//n)), it): # try to anticipate how much resources can be consumed ahead of time and only schedule that much, to prevent deadlocks when multiple applyCl() is called, but their parent process has not consumed the yield statement, so the cpu count doesn't get decremented # applyCl
                while _cpuUsed.value + n > limit: time.sleep(0.1) # this is a very rudimentary lock. Doesn't have to be accurate though, and Python's GIL ensure atomic-ness # applyCl
                futures.append(preprocessF(e)); _cpuUsed.value += n              # applyCl
            for e in it: yield resolveF(futures.popleft()); futures.append(preprocessF(e)) # free and allocate cpu slot immediately, so no while loop necessary # applyCl
            for e in futures: res = resolveF(e); _cpuUsed.value -= n; yield res  # applyCl
        applyCl.rorGen = gen                                                     # applyCl
        return gen(it)                                                           # applyCl 
[docs]    def __invert__(self):                                                        # applyCl
        """Expands the arguments out, just like :class:`apply`.
Example::
    # returns [20, 20, 18, 14, 8, 0, -10, -22, -36, -52]
    [range(10), range(20, 30)] | transpose() | ~applyCl(lambda x, y: y-x**2) | deref()""" # applyCl
        f, rest, kwargs = self._copyCtx(); return applyCl(lambda x: f(*x), *rest, **kwargs) # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def nodeIds(includeSelf=True) -> List[str]:                                  # applyCl
        """Returns a list of all node ids in the current cluster.
Example::
    applyCl.nodeIds() # returns something like ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', '1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068']
If you want to get nodes' metadata, then just use ray's builtin function ``ray.nodes()``
:param includeSelf: whether to include node id of the current process or not"""  # applyCl
        res = ray.nodes() | cli.filt(lambda x: x["Alive"]) | apply(lambda x: x["NodeID"]) | aS(list) # applyCl
        if includeSelf: return res                                               # applyCl
        res.remove(applyCl.nodeId()); return res                                 # applyCl 
[docs]    @staticmethod                                                                # applyCl
    @lru_cache                                                                   # applyCl
    def nodeId() -> str:                                                         # applyCl
        """Returns current node id"""                                            # applyCl
        return ray.runtime_context.get_runtime_context().get_node_id()           # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def cpu() -> int:                                                            # applyCl
        """Grabs the number of cpus available on this node"""                    # applyCl
        return int(applyCl.meta()["Resources"]["CPU"])                           # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def aS(f, **kwargs):                                                         # applyCl
        """Executes function f once for all node ids that are piped in.
Example::
    # returns [['1051da...', ['Desktop', 'Downloads']], ['7bb387...', ['Pictures', 'Music']]]
    applyCl.nodeIds() | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref()
    # same as above, demonstrating passing in a list of nodeIds
    ["1051da...", "7bb387..."] | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref()
    # same as above, demonstrating passing in "None" for all nodeIds in the cluster
    None | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref()
If you want to execute f for all nodes, you can pass in None instead.
As a reminder, this kinda follows the same logic as the popular cli :class:`aS`, where
f is executed once, hence the name "apply Single". Here, the meaning of "single" is
different. It just means execute once for each node ids. If you want to quickly execute
a function on a single node, without all the fuss, there's this short form that you can follow::
    # returns ['Desktop', 'Downloads'], demonstrating that you can also pass in a single node id
    "1051da..." | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref()
:param f: main function to execute in each node. Not supposed to accept any arguments
:param kwargs: keyword arguments for the main :class:`applyCl` function"""       # applyCl
        f = fastF(f); final = apply(lambda x: [x, 0]) | applyCl(lambda _: f(), pre=True, **kwargs) # applyCl
        def inner(it):                                                           # applyCl
            shortform = False                                                    # applyCl
            if it is None: it = applyCl.nodeIds()                                # applyCl
            if isinstance(it, str): it = [it]; shortform = True                  # applyCl
            if it | ~cli.inSet(_nodeIdsCache()) | cli.shape(0) > 0: # caching nodeIds(), because that takes a surprising amount of time # applyCl
                _nodeIdsCache.value = applyCl.nodeIds(); outliers = it | ~cli.inSet(_nodeIdsCache()) | cli.deref() # applyCl
                if len(outliers) > 0: raise Exception(f"These nodes cannot be found: {outliers}") # applyCl
            return it | final | ((cli.cut(1) | cli.item()) if shortform else cli.iden()) # applyCl
        return aS(inner)                                                         # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def cmd(s:str, sudo=False, nodeIds=None, **kwargs):                          # applyCl
        """Convenience function to execute shell command on all nodes.
Example::
    applyCl.cmd("mkdir -p /some/folder")
It returns [[nodeid1, output1], [nodeid2, output2]]. If you need more flexibility,
fall back to :meth:`applyCl.aS`
:param s: shell command to execute
:param sudo: if True, will execute the command with sudo privileges. Will ask for password
    and then cache it internally for 5 minutes
:param kwargs: keyword arguments to pass to :class:`applyCl`"""                  # applyCl
        global _password; import getpass                                         # applyCl
        if sudo:                                                                 # applyCl
            if _password() is None:                                              # applyCl
                print("Enter password:"); _password.value = getpass.getpass(prompt="") # applyCl
            return   nodeIds | applyCl.aS(lambda: _password() | cli.cmd(f"sudo -S {s}") | cli.deref(), **kwargs) | cli.deref() # applyCl
        else: return nodeIds | applyCl.aS(lambda: None        | cli.cmd(s)              | cli.deref(), **kwargs) | cli.deref() # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def lookup():                                                                # applyCl
        """Tries to lookup a particular file to see on which node it's at.
Example::
    # returns [[nodeId, "something.txt"], [nodeId, "abc.jpg"]]
    ["something.txt", "abc.jpg"] | applyCl.lookup()
    # returns [nodeId, "something.txt"]
    "something.txt" | applyCl.lookup()
Files that don't exist won't be included in the result, and files that
exist on multiple nodes will be returned multiple times. The output format
is such that I can pipe it into applyCl(..., pre=True) and have it execute
some function that I want. This is pretty much just a convenience function."""   # applyCl
        def inner(fns):                                                          # applyCl
            fns = fns | cli.deref(); single = isinstance(fns, str)               # applyCl
            if single: fns = [fns]                                               # applyCl
            ans = None | applyCl.aS(lambda: fns | cli.iden() & (apply(os.path.expanduser) | apply(os.path.exists)) | cli.transpose() | cli.filt("x", 1) | cli.cut(0) | cli.deref()) | cli.ungroup() | cli.deref() # applyCl
            return ans[0] if single else ans                                     # applyCl
        return cli.aS(inner)                                                     # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def replicateFile(fn:str, nodeIds=None):                                     # applyCl
        """Replicates a specific file in the current node to all the other nodes.
Example::
    applyCl.replicateFile("~/cron.log")
Internally, this will read chunks of 100kB of the specified file and dump it
incrementally to all other nodes, which has implications on performance. To
increase or decrease it, check out :class:`~k1lib.cli.inp.cat`. This also means
you can replicate arbitrarily large files around as long as you have the disk
space for it, while ram size doesn't really matter
Please note that this operation is not symmetric. Unlike :meth:`balanceFile` and
:meth:`balanceFolder`, in which they can be invoke on any node and it'll roughly
do the same thing (rebalances everything out), this operation can do totally
different things depending on which node you run it on. Let's say the file exists
on nodes A and B, but not on nodes C and D. If you run this function on either
node A or B, it will replicate the file to C and D. However, if you run this
function on node C or D, it will instead throw an error since the file doesn't
exist.
:param fn: file name"""                                                          # applyCl
        fn = os.path.expanduser(fn); dirname = os.path.dirname(fn)               # applyCl
        # checking if there's an existing file already. If there is, then don't try to copy data to that node # applyCl
        if nodeIds is None: canSize = os.path.getsize(fn); nodeIds = None | applyCl.aS(lambda: os.path.getsize(fn) if os.path.exists(fn) else 0) | cli.filt(cli.op() != canSize, 1) | cli.cut(0) | cli.deref() # applyCl
        nodeIds = nodeIds | cli.wrapList().all() | cli.deref()                   # applyCl
        nodeIds | cli.insert(None, False).all() | applyCl(lambda _: None | cli.cmd(f"mkdir -p {dirname}; rm -rf {fn}") | cli.deref(), pre=True) | cli.deref() # applyCl
        for chunk in cli.cat(fn, text=False, chunks=True): nodeIds | cli.insert(chunk, False).all() | applyCl(lambda chunk: chunk >> cli.file(fn) | cli.deref(), pre=True) | cli.deref() # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def balanceFile(fn:str, nAs:List[str]=None, nBs:List[str]=None, rS=None, chunkSize:int=100_000_000): # applyCl
        """Splits a specified file in node nAs and dumps other parts
to nodes nBs. Example::
    applyCl.balanceFile("~/cron.log")
This will split the big files up into multiple segments (1 for each node). Then
for each segment, it will read through it chunk by chunk into memory, and then
deposits it into the respective nodes. Finally, it truncates the original files
down to its segment boundary.
The main goal of this is so that you can analyze a single big (say 200GB) file
quickly. If that file is on a single node, then it will take forever, even with
:class:`applyMp`. So splitting things up on multiple nodes will make analyzing
it a lot faster.
There's also the function :meth:`balanceFolder`, which has the opposite problem of
having lots of small (say 100MB) files. So it will try to move files around (keeping
them intact in the meantime) to different nodes so that the folder size ratio is
roughly proportional to the cpu count.
The exact split rule depends on the number of CPUs of each node. Best to see an
example::
    Command:         applyCl.balanceFile("~/cron.log")
    Verbose command: applyCl.balanceFile("~/cron.log", ["1"], ["1", "2", "3", "4", "5"])
    ----------- Before -----------
    Node:      1  2  3  4 5
    Cpu:       8  12 16 8 8
    Size (GB): 52 0  0  0 0
    ----------- After  -----------
    Node:      1  2  3  4 5
    Cpu:       8  12 16 8 8
    Size (GB): 8  12 16 8 8
This also works if you have files on existing nodes already, and are upgrading the
cluster::
    Command:         applyCl.balanceFile("~/cron.log")
    Verbose command: applyCl.balanceFile("~/cron.log", ["1", "5"], ["1", "2", "3", "4", "5"])
    ----------- Before -----------
    Node:      1  2  3  4  5
    Cpu:       8  12 16 8  8
    Size (GB): 26 0  0  26 0
    ----------- After  -----------
    Node:      1  2  3  4  5
    Cpu:       8  12 16 8  8
    Size (GB): 8  12 16 8  8
If you want to move files out of a node when decommissioning them, you can do
something like this::
    Command:         applyCl.decommission("~/cron.log", ["3", "4"])
    Verbose command: applyCl.balanceFile("~/cron.log", ["1", "2", "3", "4", "5"], ["1", "2", "5"])
    ----------- Before -----------
    Node:      1  2  3  4 5
    Cpu:       8  12 16 8 8
    Size (GB): 8  12 16 8 8
    ----------- After  -----------
    Node:      1  2  3  4 5
    Cpu:       8  12 16 8 8
    Size (GB): 15 22 0  0 15
Remember that the node ids "1", etc. is for illustrative purposes only. You should get
real node ids from :meth:`nodeIds`.
Why is the file size proportional to the number of cores on each node? Well, if
you have more cores, you should be able to process more, so as to make everything
balanced, right?
Again, this means that you can split arbitrarily large files as long as you have
the disk space for it, ram size is not a concern. How does this perform? Not
the best in the world if you don't have a lot of nodes. With sata 3 ssds, 750MB/s
ethernet, I got transfer speeds of roughly 100MB/s. This should increase as you
have more nodes based on the code structure, but I haven't tested it yet. Can
it be faster? Definitely. Am I willing to spend time optimizing it? No.
:param fn: file name
:param nAs: node ids that currently stores the file. If not specified, try to detect
    what nodes the file exists in
:param nBs: node ids that will store the file after balancing everything out. If not
    specified, will take all available nodes
:param rS: :class:`~k1lib.cli.inp.refineSeek` instance, if you need more fine-grained
    control over section boundaries so as to not make everything corrupted
:param chunkSize: see :meth:`balanceFolder`
"""                                                                              # applyCl
        from k1lib.cli._applyCl import balanceFile                               # applyCl
        with settings.cat.context(chunkSize=chunkSize): balanceFile(fn, nAs, nBs, rS) # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def decommissionFile(fn, nAs:List[str], rS=None, chunkSize:int=100_000_000): # applyCl
        """Convenience function for :meth:`balanceFile`. See docs over there.""" # applyCl
        from k1lib.cli._applyCl import balanceFile                               # applyCl
        with settings.cat.context(chunkSize=chunkSize): balanceFile(fn, None, applyCl.nodeIds() | ~cli.inSet(nAs) | cli.deref(), rS) # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def cat(fn:str=None, f:Callable=None, nodeIds=None, timeout:float=60, pre:bool=False, multiplier:int=1, includeId:bool=False, resolve:bool=True): # applyCl
        """Reads a file distributedly, does some operation on them, collects and
returns all of the data together. Example::
    fn = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.cat.data"
    ("0123456789"*5 + "\\n") * 1000 | file(fn)
    applyCl.splitFile(fn)
    applyCl.cat(fn, shape(0), keepNodeIds=True) | deref()
That returns something like this (for a 2-node cluster, with 2 (node A) and 4 (node B) cpus respectively)::
    [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167],
     ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167],
     ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166],
     ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167],
     ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166],
     ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167]]
Here, we're creating an initial file with 1000 lines. Then we'll split it up into
2 fragments: 334 lines and 667 lines and store them on the respective nodes. Then,
on node A, we'll split the file up into 2 parts, each with 167 lines. On node B,
we'll split the file up into 4 parts, each with around 166 lines. Then we'll
schedule 6 processes total, each dealing with 166 lines. After all of that, results
are collected together and returned.
If you want to distinguish between different processes inside f, for example you
want to write results into different files, you can do something like this::
    dir_ = "~/repos/labs/k1lib/k1lib/cli/test"
    fn = f"{dir_}/applyCl.cat.data"
    applyCl.cmd(f"rm -r {dir_}/applyCl")    # clear out old folders
    applyCl.cmd(f"mkdir -p {dir_}/applyCl") # creating folders
    # do processing on fn distributedly, then dump results into multiple files
    applyCl.cat(fn, ~aS(lambda idx, lines: lines | shape(0) | aS(dill.dumps) | file(f"{dir_}/applyCl/{idx}.pth")), includeId=True) | deref()
    # reading all files and summing them together
    None | applyCl.aS(lambda: ls(f"{dir_}/applyCl")) | ungroup() | applyCl(cat(text=False) | aS(dill.loads), pre=True) | cut(1) | toSum()
.. admonition:: Simple mode
    There's also another mode that's activated whenever f is not specified that feels
    more like vanilla :class:`~inp.cat`. Say you have a file on a specific node::
        nodeId = "7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d"
        fn = "~/ssd2/randomFile.txt"
        # -------------- file is on current node --------------
        cat(fn) # returns iterator of lines inside the file
        fn | cat() # same thing as above
        # -------------- file is on remote node --------------
        [nodeId, fn] | applyCl.cat() # returns iterator of lines of the file
        applyCl.cat([nodeId, fn]) # same thing
        nodeId | applyCl.cat(fn) # also same thing
    So yeah, there're lots of ways to just simply read a file on a remote node. Is
    it too much? Probably, but good thing is that you can pick any that's intuitive
    for you. Note that this mode is just for convenience only, for when you want to do
    exploratory analysis on a single remote file. To be efficient at bulk processing,
    use the normal mode instead.
:param fn: file name
:param f: function to execute in every process
:param nodeIds: only read file from these nodes
:param timeout: kills the processes if it takes longer than this amount of seconds
:param pre: "preserve" mode, just like in :class:`applyCl`. Whether to keep the node id column or not
:param multiplier: by default, each node will spawn as many process as there
    are cpus. Sometimes you want to spawn more process, change this to a higher number
:param includeId: includes a unique id for this process (just normal integers from 0 to n)
:param resolve: whether to resolve the remote objects or not
"""                                                                              # applyCl
        fn = os.path.expanduser(fn) if fn is not None else None                  # applyCl
        if f is None: # simple case                                              # applyCl
            def inner(nodeId_fn:Tuple[str, str]):                                # applyCl
                nodeId, fn = nodeId_fn; seeks = [nodeId] | applyCl.aS(lambda: fn | cli.splitSeek(round(os.path.getsize(fn)/settings.cat.chunkSize+1))) | cli.cut(1) | cli.item() | cli.deref() # applyCl
                inter = seeks | cli.window(2) | apply(cli.wrapList() | cli.insert(nodeId)) | cli.deref() # applyCl
                return inter | ~applyCl(lambda sB, eB: cli.cat(fn,sB=sB,eB=eB) | cli.deref(), pre=True) | cli.cut(1) | cli.joinStreams() # applyCl
                # return [nodeId_fn] | applyCl(cat() | deref(), pre=True) | cut(1) | item() # direct, no chunking method # applyCl
            if fn is None: return aS(inner) # [nodeId, fn] | applyCl.cat()       # applyCl
            if isinstance(fn, str): return aS(lambda nodeId: inner([nodeId, fn])) # nodeId | applyCl.cat() # applyCl
            else: return inner(fn) # applyCl.cat([nodeId, fn])                   # applyCl
        nodeIds = nodeIds or (applyCl.nodeIds() | applyCl.aS(lambda: os.path.exists(fn)) | cli.filt(cli.op(), 1) | cli.cut(0) | cli.deref()) # applyCl
        checkpoints = nodeIds | applyCl.aS(lambda: fn | cli.splitSeek(int(applyCl.meta()["Resources"]["CPU"]*multiplier)) | cli.window(2) | cli.deref()) | cli.ungroup() | cli.insertIdColumn(True, False) | ~apply(lambda x,y,z: [x,[*y,z]]) | cli.deref() # applyCl
        return checkpoints | applyCl(~aS(lambda x,y,idx: cli.cat(fn, sB=x, eB=y) | ((cli.wrapList() | cli.insert(idx)) if includeId else cli.iden()) | f), pre=True, timeout=timeout, num_cpus=1, resolve=resolve) | (cli.iden() if pre else cli.cut(1)) # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def replicateFolder(folder:str, nodeIds=None):                               # applyCl
        """Replicates a specific folder in the current node to all the other nodes.
Example::
    applyCl.replicateFolder("~/ssd2/data/owl")
This just list out all files recursively in the specified folder, then replicate each file using :meth:`replicateFile`""" # applyCl
        applyCl.getFilesInFolder(folder) | applyCl(lambda fn: applyCl.replicateFile(fn, nodeIds), num_cpus=0.1) | cli.deref() # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def balanceFolder(folder:str, maxSteps:int=None, audit:bool=False, bs:int=5, chunkSize:int=100_000_000): # applyCl
        """Balances all files within a folder across all nodes.
Example::
    # make the chunk size huge so that transfers become faster
    settings.cli.cat.chunkSize = 100_000_000
    base = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.balance"
    # deletes old structures and making test folder
    applyCl.cmd(f"rm -r {base}"); applyCl.cmd(f"mkdir -p {base}")
    # creates 20 files of different sizes and dump it in the base folder of the current node
    torch.linspace(1e4, 1e5, 20).int() | apply(lambda x: "x"*x) | insertIdColumn() | ~apply(lambda idx, contents: contents | file(f"{base}/{idx}.txt")) | deref();
    # transfers files between nodes such that the total folder size is proportional to the number of cpus across nodes
    applyCl.balanceFolder(base)
    # get folder size of all nodes
    None | applyCl.aS(lambda: ls(base) | apply(os.path.getsize) | toSum()) | deref()
    # creates 20 additional files and dump it to the current node
    torch.linspace(1e4, 1e5, 20).int() | apply(lambda x: "x"*x) | insertIdColumn() | ~apply(lambda idx, contents: contents | file(f"{base}/{idx+20}.txt")) | deref();
    # balances the tree out again
    applyCl.balance(base)
    # get folder size of all nodes
    None | applyCl.aS(lambda: ls(base) | apply(os.path.getsize) | toSum()) | deref()
So imagine that you just downloaded 1000 files to a single node on a specific folder,
but you need to analyze all of them in a distributed manner. What you can do is to
move some files to other nodes and then do your analysis. If you want to download
more files, just dump it to any node (or download distributed across all nodes),
then rebalance the folders and do your analysis.
Also, internally, it splits files into multiple chunks, transfer the chunks to other
nodes and append to the correct files. It uses :meth:`~k1lib.cli.inp.cat` to split up
the file, which has settings under ``settings.cli.cat``. By default, the chunk size is
100k bytes, which I think is the sweet spot because :meth:`~k1lib.cli.inp.cat` also
supports remote file accessed from the internet and sometimes the library is used for
systems with very few memory. But for this use case where you already have the insane
hardware for this, 100kB is extremely small and will slow transfer rates to a crawl,
so in this function, it will be temporarily be set to the parameter ``ChunkSize``, which
is 100MB by default.
:param folder: folder to rebalance all of the files
:param maxSteps: what's the maximum number of file transfers? By default has no limit, so that files are transferred until
:param audit: if True, don't actually move files around and just return what files are going to be moved where
:param bs: batch size for transporting this many files at once. Increase to make it faster, but with the
    penalty of the progress bar not updating as frequently
:param chunkSize: file chunk size to split up and send to other nodes
"""                                                                              # applyCl
        from k1lib.cli._applyCl import balanceFolder                             # applyCl
        with settings.cat.context(chunkSize=chunkSize): return balanceFolder(folder, audit, maxSteps, bs=bs) # applyCl 
[docs]    def decommissionFolder(folder:str, nAs:List[str], maxSteps:int=10000, audit:bool=False, timeout:float=3600, bs:int=5, chunkSize:int=100_000_000): # applyCl
        """Like :meth:`decommissionFile`, but works for distributed folders instead.
:param nAs: list of node ids to migrate files away from
:param maxSteps: limits the total number of optimization steps. Normally don't have to specify,
    but just here in case it runs for too long trying to optimize the folder structure
:param audit: if True, just returns the file movements it's planning to do
:param bs: batch size for transporting this many files at once. Increase to make it faster, but with the
    penalty of the progress bar not updating as frequently
:param chunkSize: see :meth:`balanceFolder`
"""                                                                              # applyCl
        from k1lib.cli._applyCl import decommissionFolder                        # applyCl
        with settings.cat.context(chunkSize=chunkSize): return decommissionFolder(folder, nAs, audit=audit, maxSteps=maxSteps, timeout=timeout, bs=bs) # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def pruneFolder(folder):                                                     # applyCl
        """Removes empty directories recursively from a root folder."""          # applyCl
        def inner(folder):                                                       # applyCl
            folder = os.path.expanduser(folder)                                  # applyCl
            dirs, files = folder | ls() | filt(os.path.isdir).split() | deref()  # applyCl
            if len(files) > 0: return                                            # applyCl
            dirs | apply(pruneFolder) | ignore()                                 # applyCl
            if folder | ls() | shape(0) == 0: None | cmd(f"rm -rf {folder}") | ignore() # applyCl
        None | applyCl.aS(lambda: inner(folder)) | deref()                       # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def diskScan(folder:str, raw=False, accurate=True, f=None):                  # applyCl
        """Scans for files and folders in the specified folder for potential
distributed files and folders. A distributed file is a file that exists on more
than 1 node. A distributed folder is a folder that that exists on more than 1
node and does not have any shared children. Example::
    applyCl.diskScan("~/ssd2")
    applyCl.diskScan("~/ssd2", True)
The first line does not return anything, but will print out something like this:
.. include:: ../literals/diskScan.rst
While the second line will return a parseable data structure instead::
    [[['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity', [4113489746, 7912834090, 4164314316]],
      ['/home/kelvin/ssd2/data/genome/go/release_geneontology_org', [2071645117, 4172737915, 2107005131]],
      ['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity.backup', [568878496, 552888466, 600610083]],
      ['/home/kelvin/ssd2/data/genome/00-common_all.idx', [341738564, 671136833, 0]],
      ['/home/kelvin/ssd2/data/genome/genbank/ch1.dat.gz', [25356744, 0, 25356764]],
      ['/home/kelvin/ssd2/test', [136152, 273530, 136351]],
      ['/home/kelvin/ssd2/data/genome/genbank/ch1', [0, 0, 0]]],
     [['/home/kelvin/ssd2/data/genome/dummy.txt', [1101, 1101, 1101]]],
     [['/home/kelvin/ssd2/data/genome/00-All.vcf', [32737509360, 65475018903, 32737509588]],
      ['/home/kelvin/ssd2/data/genome/MotifFeatures/homo_sapiens.GRCh38.motif_features.gff', [13963854962, 27927709895, 13963854962]],
      ['/home/kelvin/ssd2/data/genome/00-common_all.vcf', [2353901811, 4707803470, 2353901831]]]]
Remember that since an operating system usually have lots of shared files
(like "~/.bashrc", for example), these might be mistaken as a distributed file.
Make sure to only scan folders that you store data in, or else it'll take a long time to return.
:param folder: the folder to scan through
:param raw: whether to return raw data or display it out nicely
:param accurate: if True, returns size when you read all files into RAM. If False
    returns size occupied by the entire file/folder (will be larger because files
    are arranged into different blocks in the underlying disk)
:param f: optional post process function applied after getting the raw results, if ``raw=False``""" # applyCl
        from k1lib.cli._applyCl import diskScan4, diskScan5                      # applyCl
        if raw: return diskScan4(folder, accurate=accurate)                      # applyCl
        else: return diskScan5(folder, accurate=accurate, f=(f or cli.iden()))   # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def balancedNodeIds():                                                       # applyCl
        """Returns a stream of node ids that's balanced based on cpu count/performance.
Example::
    # returns list of 10 node ids: ["abc...", "def...", "abc...", ...]
    applyCl.balancedNodeIds() | head() | deref()
"""                                                                              # applyCl
        from k1lib.cli._applyCl import balancedNodeIds                           # applyCl
        return balancedNodeIds()                                                 # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def balancedCpus():                                                          # applyCl
        """Returns Dict[nodeId (str) -> #cpu (int))]. Could be useful to know
how much to split up files and folders according to your custom rules. Example::
    # returns {"abc...": 8, "def...": 19, "ghi...": 88} for 7700 (4c8t), 10700k (8c16t) and 13900k (24c32t)
    applyCl.balancedCpus()
"""                                                                              # applyCl
        from k1lib.cli._applyCl import loadTestGuard                             # applyCl
        return loadTestGuard()                                                   # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def loadTest():                                                              # applyCl
        """Performs a load test on the cluster.
Example::
    applyCl.loadTest()
What is a load test? It basically tries to perform some intensive and
long-running calculations on all processes on all nodes in the cluster
to know how good are each individual nodes. This is useful information
because whenever you try to split a file up to form a distributed file,
or move files in a folder around to form a distributed folder, the amount
of data each node gets is going to be proportional to this performance
information. More powerful nodes will have more data to process, so that
the total running time across all nodes is going to roughly be the same.
But isn't cpu count good enough for this? No, not actually. The i7 7700
has 4 cores, 8 threads, and the i9 13900k has 8 performance cores and 16
efficiency cores, totalling to 32 threads. You would suspect that the
13900k to be 4x (32/8=4) or 6x (24/4=6) more powerful than the 7700, but
it's actually 10x more powerful.
The test itself takes around 1-2 minutes, and the test results are going
to be saved locally in the folder "~/.k1lib/", so that it can use that
info directly in future runs."""                                                 # applyCl
        from k1lib.cli._applyCl import loadTest                                  # applyCl
        return loadTest()                                                        # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def getFolderSize(folder:str=None) -> int:                                   # applyCl
        """Shortcut function to get size of a folder on the current node."""     # applyCl
        from k1lib.cli._applyCl import getFolderSize                             # applyCl
        if folder is None: return getFolderSize                                  # applyCl
        return folder | getFolderSize                                            # applyCl 
[docs]    @staticmethod                                                                # applyCl
    def getFilesInFolder(folder:str=None):                                       # applyCl
        from k1lib.cli._applyCl import getFilesInFolder                          # applyCl
        if folder is None: return getFilesInFolder                               # applyCl
        return folder | getFilesInFolder                                         # applyCl  
import random                                                                    # applyCl
requests = k1lib.dep("requests")                                                 # applyCl
if hasRay:                                                                       # applyCl
    @ray.remote(num_cpus=0)                                                      # applyCl
    class Storage: # a wrapper for specific objects, kinda like ObjectRef, but it's an ObjectRef in my control. Should not be serialized to every other place # applyCl
        def __init__(self, v=None):                                              # applyCl
            self.lockExecute = threading.Lock(); self.lockIncref = threading.Lock(); self.lockDecref = threading.Lock() # applyCl
            self.nodeId = applyCl.nodeId(); self.d = {}; self.refs = defaultdict(lambda: 0) # applyCl
            self.autoInc = k1lib.AutoIncrement(prefix="_d")                      # applyCl
            self.idx = f"{self.nodeId}_" + f"{random.random()}"[:10] + f"_{time.time()}" # applyCl
            self.idx2 = f"{random.randint(100, 900)}"                            # applyCl
            self.deposit(v)                                                      # applyCl
        def getIdx(self): return self.idx                                        # applyCl
        def getMeta(self): return [self.nodeId, self.idx, self.idx2]             # applyCl
        def lookup(self, idx:str): return self.d[idx]                            # applyCl
        def remove(self, idx:str): del self.d[idx]                               # applyCl
        def keys(self): return list(self.d.keys())                               # applyCl
        def incref(self, idx:str):                                               # applyCl
            # with self.lockIncref:                                              # applyCl
            if idx is not None:                                                  # applyCl
                # requests.get(f"http://192.168.1.133:8892/increfd/{self.idx2}/{idx}") # applyCl
                if self.refs[idx] <= 0: raise Exception(f"incref-ing object {self.idx2}/{idx} that has already been deleted") # applyCl
                self.refs[idx] += 1#; return self                                # applyCl
        def decref(self, idx:str):                                               # applyCl
            # with self.lockDecref:                                              # applyCl
            if idx is not None:                                                  # applyCl
                self.refs[idx] -= 1; v = self.d[idx]                             # applyCl
                if self.refs[idx] == 0: self.remove(idx)                         # applyCl
                if self.refs[idx] < 0: raise Exception(f"decref-ing object {self.idx2}/{idx} that no longer exists") # applyCl
                # requests.get(f"http://192.168.1.133:8892/decrefd/{self.idx2}/{idx}")#; return self # applyCl
        def deposit(self, v:"1/or1/h1", s:"Storage"=None) -> "idx:str": # injecting s into v if v is a Handle # applyCl
            if isinstance(v, Handle): v.setStorage(s); v = v.get()               # applyCl
            if isinstance(v, ray.ObjectRef): v = ray.get(v)                      # applyCl
            idx = self.autoInc()                                                 # applyCl
            # requests.get(f"http://192.168.1.133:8892/deposit/{self.idx2}/{idx}/{v}"[:100]) # applyCl
            self.d[idx] = v; self.refs[idx] += 1; return idx                     # applyCl
        def execute(self, f, idx:str, idxCtxHandle:str=None) -> "idx:str":       # applyCl
            # requests.get(f"http://192.168.1.133:8892/execute/{self.idx2}/{idx}") # applyCl
            # if idxCtxHandle: f.__globals__["ctxHandle"] = self.d[idxCtxHandle] # injecting in global variable # applyCl
            with self.lockExecute: return self.deposit(f(self, self.d[idx], idxCtxHandle)) # executing "pure" f with some argument taken from the storage # applyCl
        def __getstate__(self): raise Exception("Can't be serialized!")          # applyCl
else:                                                                            # applyCl
    class Storage: pass                                                          # applyCl
_storages = {} # nodeId -> {idx: int, ss: [Storage]}, idx for current index to yield the storage # applyCl
_cpuCounts = {} # nodeId -> int                                                  # applyCl
_nodeIdsGen = None                                                               # applyCl
def getStorage(nodeId:str=None, num_gpus=0) -> Storage:                          # getStorage
    """Handles creating storage contexts. This is created mainly because it's
costly to actually instantiate a new actor (1-2 second/actor!), so this will
spawn new storage contexts till the cpu count of each node is reached. From
then on, it will keep reusing old storage contexts"""                            # getStorage
    global _nodeIdsGen; _nodeIdsGen = _nodeIdsGen or applyCl.balancedNodeIds()   # getStorage
    nodeId = nodeId or next(_nodeIdsGen)                                         # getStorage
    if num_gpus == 0:                                                            # getStorage
        if nodeId not in _storages or nodeId not in _cpuCounts:                  # getStorage
            _storages[nodeId] = {"idx": 0, "ss": []}                             # getStorage
            _cpuCounts[nodeId] = nodeId | applyCl.aS(lambda: os.cpu_count())     # getStorage
        if len(_storages[nodeId]["ss"]) < _cpuCounts[nodeId]: # add new storage context # getStorage
            _storages[nodeId]["ss"].append(specificNode(Storage, nodeId).remote()) # getStorage
        idx = _storages[nodeId]["idx"]; res = _storages[nodeId]["ss"][idx]       # getStorage
        _storages[nodeId]["idx"] = (idx+1)%_cpuCounts[nodeId]; return res        # getStorage
    else: return specificNode(Storage, nodeId, num_gpus).remote()                # getStorage
def extractStorage(x): return x.storage if isinstance(x, Handle) else None       # extractStorage
class Handle: # specific object in storage, pretty much ObjectRef that I'm in control of. Can be serialized to every other place # Handle
    def __init__(self, storage, idx:str=None, _or=None):                         # Handle
        self.storage = storage; self.idx = idx; self._or = _or                   # Handle
        self.nodeId, self.storageId, self.idx2 = ray.get(storage.getMeta.remote()); self.weakref = False # whether this Handle should decrement reference count of storage element or not # Handle
    @staticmethod                                                                # Handle
    def create(v, nodeId:str=None, num_gpus=0) -> "Handle": # creates new storage and put value into it # Handle
        s = getStorage(nodeId, num_gpus); return Handle(s, ray.get(s.deposit.remote(v, extractStorage(v)))) # Handle
    def deposit(self, v) -> "Handle": # put new value into this handle's Storage # Handle
        return Handle(self.storage, ray.get(self.storage.deposit.remote(v, extractStorage(v)))) # Handle
    def execute(self, f, s:"Storage"=None, kwargs=None) -> "Handle": # f is a normal function. Blocks until finished executing # Handle
        if kwargs:                                                               # Handle
            kwargs.pop("num_gpus", None)                                         # Handle
            @ray.remote(**kwargs)                                                # Handle
            def inner(sto, s): return sto.execute.remote(f, self.idx)            # Handle
            return Handle(self.storage, ray.get(ray.get(inner.remote(self.storage, s)))) # Handle
        else: return Handle(self.storage, ray.get(self.storage.execute.remote(f, self.idx))) # Handle
    def block(self) -> "Handle": # block execution and finalize Handle's state   # Handle
        if self._or: self.idx = ray.get(self._or)                                # Handle
        return self                                                              # Handle
    def executeAsync(self, f, kwargs=None, idxCtxHandle:str=None) -> "Handle": # f is a normal function. Returns immediately, finalize it by calling .block() # Handle
        # requests.get(f"https://logs.mlexps.com/async12/{self.idx}/N/{self.idx2}") # Handle
        if kwargs: # idxCtxHandle is the (optional) string of the background handle object (left arg in pre=True mode), to be dynamically injected into the "ctxHandle" global variable # Handle
            # i1 = self.idx; i2 = self.idx2                                      # Handle
            kwargs.pop("num_gpus", None)                                         # Handle
            @ray.remote(**kwargs)                                                # Handle
            def inner(sto):                                                      # Handle
                # requests.get(f"https://logs.mlexps.com/async56/{i1}/N/{i2}")   # Handle
                return ray.get(sto.execute.remote(f, self.idx, idxCtxHandle))    # Handle
            # requests.get(f"https://logs.mlexps.com/async34/{self.idx}/N/{self.idx2}") # Handle
            return Handle(self.storage, _or=inner.remote(self.storage))          # Handle
        else: return Handle(self.storage, _or=self.storage.execute.remote(f, self.idx, idxCtxHandle)) # Handle
    def get(self): return ray.get(self.storage.lookup.remote(self.idx))          # Handle
    def setStorage(self, s): # inject storage dependency into this handle, increment storage's refcount # Handle
        if s: self.storage = s; self.storage.incref.remote(self.idx); self.weakref = False # Handle
    def __repr__(self): return f"<Handle idx={self.idx} storage={self.storageId}>" # Handle
    def __getstate__(self): d = dict(self.__dict__); d["storage"] = None; return d # Handle
    def __setstate__(self, d): self.__dict__.update(d); self.weakref = True # reconstructed Handles don't decrement reference count of variable, because Actors can't be serialized # Handle
    # def report(self, s): requests.get(f"http://192.168.1.133:8892/{s}/{self.idx2}/{self.idx}") # Handle
    def __del__(self):                                                           # Handle
        # requests.get(f"http://192.168.1.133:8892/handdel/{self.idx2}/{self.idx}") # Handle
        # print(f"storage: {self.storage}, nodeId: {self.nodeId}")               # Handle
        if not self.weakref: self.storage.decref.remote(self.idx)                # Handle
@lru_cache                                                                       # Handle
def storageWarmup(): print("Warming up distributed storage..."); None | applyCl.aS(lambda: os.cpu_count()) | ~apply(lambda x,y: [x]*y) | cli.joinSt() | apply(getStorage) | cli.ignore(); print("Finished warming up") # storageWarmup
def storageSize(): return _storages.values() | cli.op()["ss"].all() | cli.joinSt() | apply(lambda s: ray.get(s.keys.remote())) | cli.joinSt() | cli.shape(0) # storageSize
thEmptySentinel = object()                                                       # storageSize
[docs]class applyTh(BaseCli):                                                          # applyTh
[docs]    def __init__(self, f, prefetch:int=None, timeout:float=5, bs:int=1, **kwargs): # applyTh
        """Kinda like the same as :class:`applyMp`, but executes ``f`` on multiple
threads, instead of on multiple processes. Advantages:
- Relatively low overhead for thread creation
- Fast, if ``f`` is io-bound
- Does not have to serialize and deserialize the result, meaning iterators can be
  exchanged
Disadvantages:
- Still has thread creation overhead, so it's still recommended to specify ``bs``
- Is slow if ``f`` has to obtain the GIL to be able to do anything
All examples from :class:`applyMp` should work perfectly here."""                # applyTh
        fs = [f]; super().__init__(fs=fs); self.f = fs[0]; self.bs = bs; self.kwargs = kwargs # applyTh
        self.prefetch = prefetch or int(1e9); self.timeout = timeout             # applyTh 
[docs]    def __ror__(self, it):                                                       # applyTh
        if self.bs > 1:                                                          # applyTh
            yield from (it | cli.batched(self.bs, True) | applyTh(apply(self.f), self.prefetch, self.timeout) | cli.joinStreams()); return # applyTh
        datas = deque(); it = iter(it); kwargs = self.kwargs                     # applyTh
        innerF = fastF(self.f); timeout = self.timeout                           # applyTh
        def f(line, wrapper): wrapper.value = innerF(line, **kwargs)             # applyTh
        for _, line in zip(range(self.prefetch), it):                            # applyTh
            w = k1lib.Wrapper(thEmptySentinel)                                   # applyTh
            t = threading.Thread(target=f, args=(line,w))                        # applyTh
            t.start(); datas.append((t, w))                                      # applyTh
        for line in it:                                                          # applyTh
            data = datas.popleft(); data[0].join(timeout)                        # applyTh
            if data[1].value is thEmptySentinel:                                 # applyTh
                for data in datas: data[0].join(0.01)                            # applyTh
                raise RuntimeError("Thread timed out!")                          # applyTh
            yield data[1].value; w = k1lib.Wrapper(thEmptySentinel)              # applyTh
            t = threading.Thread(target=f, args=(line,w))                        # applyTh
            t.start(); datas.append((t, w))                                      # applyTh
        for i in range(len(datas)): # do it this way so that python can remove threads early, due to ref counting # applyTh
            data = datas.popleft(); data[0].join(timeout)                        # applyTh
            if data[1].value is thEmptySentinel:                                 # applyTh
                for data in datas: data[0].join(0.01)                            # applyTh
                raise RuntimeError("Thread timed out!")                          # applyTh
            yield data[1].value                                                  # applyTh 
    def _copy(self): return applyTh(self.f, self.prefetch, self.timeout, self.bs, **self.kwargs) # applyTh
[docs]    def __invert__(self):                                                        # applyTh
        res = self._copy(); f = fastF(res.f)                                     # applyTh
        kw = res.kwargs                                                          # applyTh
        res.f = lambda x: f(*x, **kw)                                            # applyTh
        res.kwargs = {}                                                          # applyTh
        return res                                                               # applyTh  
[docs]class applySerial(BaseCli):                                                      # applySerial
[docs]    def __init__(self, f, *args, **kwargs):                                      # applySerial
        """Applies a function repeatedly. First yields input iterator ``x``. Then
yields ``f(x)``, then ``f(f(x))``, then ``f(f(f(x)))`` and so on. Example::
    # returns [2, 4, 8, 16, 32]
    2 | applySerial(op()*2) | head(5) | deref()
If the result of your operation is an iterator, you might want to
:class:`~k1lib.cli.utils.deref` it, like this::
    rs = iter(range(8)) | applySerial(rows()[::2])
    # returns [0, 2, 4, 6]
    rs | rows(1) | item() | deref()
    # returns []. This is because all the elements are taken by the previous deref()
    rs | item() | deref()
    # returns [[2, 8], [10, -6], [4, 16], [20, -12]]
    [2, 8] | ~applySerial(lambda a, b: (a + b, a - b)) | head(4) | deref()
    rs = iter(range(8)) | applySerial(rows()[::2] | deref())
    # returns [0, 2, 4, 6]
    rs | rows(1) | item()
    # returns [0, 4]
    rs | item() # or `next(rs)`
    # returns [0]
    rs | item() # or `next(rs)`
:param f: function to apply repeatedly"""                                        # applySerial
        fs = [f]; super().__init__(fs=fs); self.f = fs[0]                        # applySerial
        self.unpack = False; self.args = args; self.kwargs = kwargs              # applySerial 
[docs]    def __ror__(self, it):                                                       # applySerial
        f = fastF(self.f)                                                        # applySerial
        if self.unpack:                                                          # applySerial
            while True: yield it; it = f(*it, *self.args, **self.kwargs)         # applySerial
        else:                                                                    # applySerial
            while True: yield it; it = f(it, *self.args, **self.kwargs)          # applySerial 
[docs]    def __invert__(self):                                                        # applySerial
        ans = applySerial(self.f, *self.args, **self.kwargs)                     # applySerial
        ans.unpack = True; return ans                                            # applySerial  
def argsort(it, key=None, reverse=False):                                        # argsort
    if isinstance(it, settings.arrayTypes): return np.argsort(it) # this mode ignores key and reverse! # argsort
    if key: return sorted(range(len(it)), key=lambda i: key(it[i]), reverse=reverse) # argsort
    else: return sorted(range(len(it)), key=it.__getitem__, reverse=reverse)     # argsort
[docs]class sort(BaseCli):                                                             # sort
[docs]    def __init__(self, column:int=0, numeric=True, reverse=False, unsort=False): # sort
        """Sorts all lines based on a specific `column`.
Example::
    # returns [[5, 'a'], [1, 'b']]
    [[1, "b"], [5, "a"]] | ~sort(0) | deref()
    # returns [[2, 3]]
    [[1, "b"], [5, "a"], [2, 3]] | ~sort(1) | deref()
    # errors out, as you can't really compare str with int
    [[1, "b"], [2, 3], [5, "a"]] | sort(1, False) | deref()
    # returns [-1, 2, 3, 5, 8]
    [2, 5, 3, -1, 8] | sort(None) | deref()
.. admonition:: unsort
    This is how it works::
        a = np.array([1, 5, 9, 2, 6, 3, 7, 4, 8])
        # returns np.array([1, 5, 9, 2, 6, 3, 7, 4, 8])
        a | sort(None, unsort=True)
        # returns np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]), normal sort
        a | sort(None)
        # returns np.array([-3.5,  0.5,  4.5, -2.5,  1.5, -1.5,  2.5, -0.5,  3.5]), sorts, do transformation, then unsort
        a | (sort(None, unsort=True) | aS(lambda x: x - x[-1]/2))
        # returns np.array([12.25,  0.25, 20.25,  6.25,  2.25,  2.25,  6.25,  0.25, 12.25])
        a | (sort(None, unsort=True) | aS(lambda x: (x - x[-1]/2)**2))
    How this works is that it will sort everything as usual, then it'll execute the captured
    transformation and then it will unsort everything. This is for scenarios when an operation
    needs to operate on sorted data, but you still want to keep the original ordering for some
    reason.
:param column: if None, sort rows based on themselves and not an element
:param numeric: whether to convert column to float
:param reverse: False for smaller to bigger, True for bigger to smaller. Use
    :meth:`__invert__` to quickly reverse the order instead of using this param
:param unsort: whether to sort and then unsort the input or not"""               # sort
        super().__init__(capture=True)                                           # sort
        self.column = column; self.reverse = reverse; self.numeric = numeric; self.unsort = unsort # sort
        self.filterF = (lambda x: float(x)) if numeric else (lambda x: str(x))   # sort 
    def _all_array_opt(self, it, level):                                         # sort
        if self.unsort: return NotImplemented # too complex to think about right now # sort
        c = self.column; reverse = self.reverse; p = [slice(None, None, None)]*level; p1 = (*p, slice(None, None, None)); ser = self.capturedSerial # sort
        if c is None and len(it.shape)-level != 1: raise Exception(f"Expected sort(None) to take in 1-d array, but the array has shape {it.shape[level:]}") # sort
        if c is not None and len(it.shape)-level != 2: raise Exception(f"Expected sort(None) to take in 2-d array, but the array has shape {it.shape[level:]}") # sort
        bm = np if isinstance(it, np.ndarray) else (torch if (hasTorch and isinstance(it, torch.Tensor)) else None) # sort
        if bm is not None:                                                       # sort
            try:                                                                 # sort
                if c is None: b = bm.argsort(it);   b = bm.flip(b, (level,)) if reverse else b; return bm.gather(it, level, b) | ser.all(level) # sort
                else: b = bm.argsort(it[(*p1, c)]); b = bm.flip(b, (level,)) if reverse else b; return bm.gather(it, level, b[(*p1, None)].expand(it.shape)) | ser.all(level) # sort
            except Exception as e: print(e)                                      # sort
        return NotImplemented                                                    # sort
[docs]    def __ror__(self, it:Iterator[str]):                                         # sort
        c = self.column; reverse = self.reverse; bm = None; ser = self.capturedSerial # sort
        if isinstance(it, settings.arrayTypes):                                  # sort
            if c is None and len(it.shape) != 1: raise Exception(f"Expected sort(None) to take in a 1-d array, but the array has shape {it.shape}") # sort
            if c is not None and len(it.shape) != 2: raise Exception(f"Expected sort(col) to take in a 2-d array, but the array has shape {it.shape}") # sort
            bm = np if isinstance(it, np.ndarray) else (torch if (hasTorch and isinstance(it, torch.Tensor)) else None) # sort
            if bm:                                                               # sort
                arg = bm.argsort(it) if c is None else bm.argsort(it[:,c]); arg = bm.flip(arg, (0,)) if reverse else arg # sort
                return it[arg] | ser | (cli.rows(*argsort(arg)) if self.unsort else cli.iden()) # sort
        f = self.filterF                                                         # sort
        rows = list(it) if c is None else list((it | cli.isNumeric(c) if self.numeric else it) | cli.apply(list)) # sort
        def sortF(row):                                                          # sort
            if len(row) > c: return f(row[c])                                    # sort
            return float("inf")                                                  # sort
        if self.unsort:                                                          # sort
            arg = argsort(rows, f if c is None else sortF, self.reverse)         # sort
            return rows | cli.rows(*arg) | ser | cli.rows(*argsort(arg))         # sort
        return sorted(rows, key=f if c is None else sortF, reverse=self.reverse) | ser # sort 
[docs]    def __invert__(self):                                                        # sort
        """Creates a clone that has the opposite sort order"""                   # sort
        return sort(self.column, self.numeric, not self.reverse, self.unsort)    # sort 
    def _jsF(self, meta):                                                        # sort
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto() # sort
        if self.unsort: raise Exception("sort._jsF() unsort mode is unavailable, as the cli capture mechanism isn't possible in JS") # sort
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.ksort({cli.kjs.v(self.column)}, {cli.kjs.v(self.numeric)}, {cli.kjs.v(self.reverse)})", fIdx # sort 
[docs]class sortF(BaseCli):                                                            # sortF
[docs]    def __init__(self, f:Callable[[Any], float], column:int=None, reverse=False): # sortF
        """Sorts rows using a function.
Example::
    # returns ['a', 'aa', 'aaa', 'aaaa', 'aaaaa']
    ["a", "aaa", "aaaaa", "aa", "aaaa"] | sortF(lambda r: len(r)) | deref()
    # returns ['aaaaa', 'aaaa', 'aaa', 'aa', 'a']
    ["a", "aaa", "aaaaa", "aa", "aaaa"] | ~sortF(lambda r: len(r)) | deref()"""  # sortF
        fs = [f]; super().__init__(fs=fs); self.f = fs[0]; self._fC = fastF(self.f) # sortF
        self.column = column; self.reverse = reverse                             # sortF 
[docs]    def __ror__(self, it:Iterator[Any]) -> Iterator[Any]:                        # sortF
        c = self.column; f = self._fC                                            # sortF
        if c is None: return sorted(list(it), key=f, reverse=self.reverse)       # sortF
        def sortF(row):                                                          # sortF
            if len(row) > c: return f(row[c])                                    # sortF
            return float("inf")                                                  # sortF
        return sorted(list(it), key=sortF, reverse=self.reverse)                 # sortF 
[docs]    def __invert__(self) -> "sortF":                                             # sortF
        return sortF(self.f, self.column, not self.reverse)                      # sortF 
    def _jsF(self, meta):                                                        # sortF
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); argIdx = init._jsDAuto() # sortF
        header, _fIdx, _async = k1lib.kast.asyncGuard(k1lib.kast.prepareFunc3(self.f, ("sortF", meta))) # sortF
        return f"{header}\nconst {fIdx} = {'async ' if _async else ''}({dataIdx}) => {{ return {'await ' if _async else ''}{dataIdx}.sortF{'_async' if _async else ''}(({argIdx}) => {_fIdx}({argIdx}), {cli.kjs.v(self.column)}, {cli.kjs.v(self.reverse)}); }}", fIdx # sortF 
[docs]class consume(BaseCli):                                                          # consume
[docs]    def __init__(self, f:Union[BaseCli, Callable[[Any], None]]):                 # consume
        r"""Consumes the iterator in a side stream. Returns the iterator.
Kinda like the bash command ``tee``. Example::
    # prints "0\n1\n2" and returns [0, 1, 2]
    range(3) | consume(headOut()) | toList()
    # prints "range(0, 3)" and returns [0, 1, 2]
    range(3) | consume(lambda it: print(it)) | toList()
This is useful whenever you want to mutate something, but don't want to
include the function result into the main stream.
See also: :class:`~k1lib.cli.output.tee`"""                                      # consume
        fs = [f]; super().__init__(fs=fs); self.f = fs[0]                        # consume 
[docs]    def __ror__(self, it):                                                       # consume
        self.f(it); return it                                                    # consume  
[docs]class randomize(BaseCli):                                                        # randomize
[docs]    def __init__(self, bs=100, seed=None):                                       # randomize
        """Randomize input stream. In order to be efficient, this does not
convert the input iterator to a giant list and yield random values from that.
Instead, this fetches ``bs`` items at a time, randomizes them, returns and
fetch another ``bs`` items. If you want to do the giant list, then just pass
in ``float("inf")``, or ``None``. Example::
    # returns [0, 1, 2, 3, 4], effectively no randomize at all
    range(5) | randomize(1) | deref()
    # returns something like this: [1, 0, 2, 3, 5, 4, 6, 8, 7, 9]. You can clearly see the batches
    range(10) | randomize(3) | deref()
    # returns something like this: [7, 0, 5, 2, 4, 9, 6, 3, 1, 8]
    range(10) | randomize(float("inf")) | deref()
    # same as above
    range(10) | randomize(None) | deref()
    # returns True, as the seed is the same
    range(10) | randomize(seed=4) | deref() == range(10) | randomize(seed=4) | deref()
Note that if ``seed=True``, then it will randomize all input
iterators the same way and independently of each other. Meaning::
    r = randomize(seed=42)
    range(10) | r | deref() #      returns [6, 9, 1, 2, 0, 8, 3, 5, 4, 7]
    range(10) | r | deref() # also returns [6, 9, 1, 2, 0, 8, 3, 5, 4, 7]
This may or may not be desireable, but I think it's desirable.
:param bs: batch size
:param seed: if specified, will always randomize the input iterator in the same way""" # randomize
        self.bs = bs if bs != None else float("inf"); self.seed = seed           # randomize
        if seed is not None and not hasTorch: raise Exception("Seeded randomize() depends on PyTorch. Please install it first") # randomize 
    def _newGenn(self):                                                          # randomize
        if self.seed is None: return np.random.permutation                       # randomize
        gen = torch.Generator().manual_seed(random.Random(self.seed).getrandbits(63)) # randomize
        return lambda n: torch.randperm(n, generator=gen)                        # randomize
[docs]    def __ror__(self, it:Iterator[Any]) -> Iterator[Any]:                        # randomize
        bs = self.bs                                                             # randomize
        if isinstance(it, settings.arrayTypes):                                  # randomize
            if bs is None or len(it) <= bs: return it if len(it) == 1 else it[self._newGenn()(len(it))] # randomize
        def gen():                                                               # randomize
            genn = self._newGenn()                                               # randomize
            for batch in it | cli.batched(bs, True):                             # randomize
                batch = list(batch); perms = genn(len(batch))                    # randomize
                for idx in perms: yield batch[idx]                               # randomize
        return gen()                                                             # randomize 
    def _jsF(self, meta):                                                        # randomize
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # randomize
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.randomize({cli.kjs.v(self.seed)})", fIdx # randomize 
class StaggeredStream:                                                           # StaggeredStream
    def __init__(self, stream:Iterator[Any], every:int):                         # StaggeredStream
        """Not intended to be instantiated by the end user. Use :class:`stagger`
instead."""                                                                      # StaggeredStream
        self.stream = stream; self.every = every                                 # StaggeredStream
    def __iter__(self):                                                          # StaggeredStream
        for i, v in zip(range(self.every), self.stream): yield v                 # StaggeredStream
    def __len__(self):                                                           # StaggeredStream
        """Length of window (length of result if you were to deref it)."""       # StaggeredStream
        return self.every                                                        # StaggeredStream
[docs]class stagger(BaseCli):                                                          # stagger
[docs]    def __init__(self, every:int):                                               # stagger
        """Staggers input stream into multiple stream "windows" placed serially. Best
explained with an example::
    o = range(10) | stagger(3)
    o | deref() # returns [0, 1, 2], 1st "window"
    o | deref() # returns [3, 4, 5], 2nd "window"
    o | deref() # returns [6, 7, 8]
    o | deref() # returns [9]
    o | deref() # returns []
This might be useful when you're constructing a data loader::
    dataset = [range(20), range(30, 50)] | transpose()
    dl = dataset | batched(3) | (transpose() | toTensor()).all() | stagger(4)
    for epoch in range(3):
        for xb, yb in dl: # looping over a window
            print(epoch)
            # then something like: model(xb)
The above code will print 6 lines. 4 of them is "0" (because we stagger every 4
batches), and xb's shape' will be (3,) (because we batched every 3 samples).
You should also keep in mind that this doesn't really change the property of the
stream itself. Essentially, treat these pairs of statement as being the same thing::
    o = range(11, 100)
    # both returns 11
    o | stagger(20) | item()
    o | item()
    # both returns [11, 12, ..., 20]
    o | head(10) | deref()
    o | stagger(20) | head(10) | deref()
Lastly, multiple iterators might be getting values from the same stream window,
meaning::
    o = range(11, 100) | stagger(10)
    it1 = iter(o); it2 = iter(o)
    next(it1) # returns 11
    next(it2) # returns 12
This may or may not be desirable. Also this should be obvious, but I want to
mention this in case it's not clear to you."""                                   # stagger
        self.every = int(every)                                                  # stagger 
[docs]    def __ror__(self, it:Iterator[Any]) -> StaggeredStream:                      # stagger
        return StaggeredStream(iter(it), self.every)                             # stagger 
[docs]    @staticmethod                                                                # stagger
    def tv(every:int, ratio:float=0.8):                                          # stagger
        """Convenience method to quickly stagger train and valid datasets.
Example::
    # returns [[16], [4]]
    [range(100)]*2 | stagger.tv(20) | shape().all() | deref()"""                 # stagger
        return stagger(round(every*ratio)) + stagger(round(every*(1-ratio)))     # stagger  
compareOps = {"__lt__", "__le__", "__eq__", "__ne__", "__gt__", "__ge__"}        # stagger
[docs]class op(k1lib.Absorber, BaseCli):                                               # op
[docs]    def __init__(self):                                                          # op
        """Absorbs operations done on it and applies it on the stream. Based
on :class:`~k1lib.Absorber`. Example::
    # returns 16
    4 | op()**2
    # returns 16, equivalent to the above
    4 | aS(lambda x: x**2)
    # returns [0, 1, 4, 9, 16]
    range(5) | apply(op()**2) | deref()
    # returns [0, 1, 4, 9, 16], equivalent to the above
    range(5) | apply(lambda x: x**2) | deref()
Main advantage is that you don't have to waste keystrokes when you just want
to do a simple operation. How it works underneath is a little magical, so just
treat it as a blackbox. A more complex example::
    t = torch.tensor([[1, 2, 3], [4, 5, 6.0]])
    # returns [torch.tensor([[4., 5., 6., 7., 8., 9.]])]
    [t] | (op() + 3).view(1, -1).all() | deref()
Basically, you can treat ``op()`` as the input tensor. Tbh, you
can do the same thing with this::
    [t] | applyS(lambda t: (t+3).view(-1, 1)).all() | deref()
But that's kinda long and may not be obvious. This can be surprisingly resilient, as
you can still combine with other cli tools as usual, for example::
    # returns [2, 3], demonstrating "&" operator
    torch.randn(2, 3) | (op().shape & iden()) | deref() | item()
    a = torch.tensor([[1, 2, 3], [7, 8, 9]])
    # returns torch.tensor([4, 5, 6]), demonstrating "+" operator for clis and not clis
    (a | op() + 3 + iden() | item() == torch.tensor([4, 5, 6])).all()
    # returns [[3], [3]], demonstrating .all() and "|" serial chaining
    torch.randn(2, 3) | (op().shape.all() | deref())
    # returns [[8, 18], [9, 19]], demonstrating you can treat `op()` as a regular function
    [range(10), range(10, 20)] | transpose() | filt(op() > 7, 0) | deref()
    # returns [3, 4, 5, 6, 7, 8, 9], demonstrating bounds comparison
    range(100) | filt(3 <= op() < 10) | deref()
This can only deal with simple operations only. For complex operations, resort
to the longer version ``aS(lambda x: ...)`` instead!
There are also operations that are difficult to achieve, like
``len(op())``, as Python is expecting an integer output, so
``op()`` can't exactly take over. Instead, you have to use :class:`aS`,
or do ``op().ab_len()``. Get a list of all of these special operations
in the source of :class:`~k1lib.Absorber`.
Performance-wise, in most cases, there are no degradation, so don't worry
about it. Everything is pretty much on par with native lambdas::
    n = 10_000_000
    # takes 1.48s
    for i in range(n): i**2
    # takes 1.89s, 1.28x worse than for loop
    range(n) | apply(lambda x: x**2) | ignore()
    # takes 1.86s, 1.26x worse than for loop
    range(n) | apply(op()**2) | ignore()
    # takes 1.86s
    range(n) | (op()**2).all() | ignore()
More complex operations still retains the same speeds, as there's a JIT compiler embedded in::
    # takes 2.15s
    for i in range(n): (i**2-3)*0.1
    # takes 2.53s, 1.18x worse than for loop
    range(n) | apply(lambda x: (x**2-3)*0.1) | ignore()
    # takes 2.46s, 1.14x worse than for loop
    range(n) | apply((op()**2-3)*0.1) | ignore()
Reserved operations that are not absorbed are:
- all
- __ror__ (__or__ still works!)
- ab_solidify
- op_hint"""                                                                     # op
        super().__init__({"_hint": None})                                        # op 
[docs]    @staticmethod                                                                # op
    def solidify(f):                                                             # op
        """Static equivalent of ``a.ab_solidify()``.
Example::
    f = op()**2
    f = op.solidify(f)
If ``f`` is not an ``op``, then just return it without doing anything to it"""   # op
        if f.__class__.__name__.split(".")[-1] == "op": f.ab_solidify()          # op
        return f                                                                 # op 
[docs]    def __ror__(self, it):                                                       # op
        return self.ab_operate(it)                                               # op 
    def __or__(self, o):                                                         # op
        if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__or__(o)  # op
        return super().__add__(o)                                                # op
    def __add__(self, o):                                                        # op
        if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__add__(o) # op
        return super().__add__(o)                                                # op
    def __and__(self, o):                                                        # op
        if isinstance(o, BaseCli): return super(k1lib.Absorber, self).__and__(o) # op
        return super().__and__(o)                                                # op
    def __call__(self, *args, **kwargs):                                         # op
        if self._ab_solidified: return self.ab_operate(*args, **kwargs)          # op
        return super().__call__(*args, **kwargs)                                 # op
    def _typehint(self, inp):                                                    # op
        return self._hint if self._hint is not None else tAny()                  # op
[docs]    def op_hint(self, _hint):                                                    # op
        """Specify output type hint"""                                           # op
        self._ab_sentinel = True; self._hint = _hint                             # op
        self._ab_sentinel = False; return self                                   # op 
    def _jsF(self, meta): return cli.aS(self)._jsF(meta)                         # op 
cli.op = op                                                                      # op
[docs]class integrate(BaseCli):                                                        # integrate
[docs]    def __init__(self, dt=1):                                                    # integrate
        """Integrates the input.
Example::
    # returns [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]
    range(10) | integrate() | deref()
    # returns [0, 2, 6, 12, 20, 30, 42, 56, 72, 90]
    range(10) | integrate(2) | deref()
:param dt: Optional small step"""                                                # integrate
        self.dt = dt                                                             # integrate 
[docs]    def __ror__(self, it):                                                       # integrate
        if self.dt == 1:                                                         # integrate
            s = 0                                                                # integrate
            for e in it: s += e; yield s                                         # integrate
        else:                                                                    # integrate
            dt = self.dt; s = 0                                                  # integrate
            for e in it: s += e*dt; yield s                                      # integrate  
[docs]class roll(BaseCli):                                                             # roll
[docs]    def __init__(self, shift:int):                                               # roll
        """Rolls the input some amount of shift.
Example::
    # returns [7, 8, 9, 0, 1, 2, 3, 4, 5, 6]
    range(10) | roll(3)
:param shift: shift amount"""                                                    # roll
        self.shift = shift                                                       # roll 
[docs]    def __ror__(self, it):                                                       # roll
        shift = self.shift                                                       # roll
        if isinstance(it, np.ndarray): return np.roll(it, shift)                 # roll
        if hasTorch and isinstance(it, torch.Tensor): return torch.roll(it, shift) # roll
        try: it[0]; len(it)                                                      # roll
        except: it = list(it)                                                    # roll
        return [*it[-shift:], *it[:-shift]]                                      # roll  
[docs]class clamp(BaseCli):                                                            # clamp
[docs]    def __init__(self, col=None, min=0, max=1):                                  # clamp
        """Clamps input list/array between 2 values.
Example::
    # returns [3, 3, 3, 3, 4, 5, 6, 7, 7, 7]
    range(10) | clamp(None, 3, 7) | deref()
    # clamps the 1st column (0-index!) between 0 and 2
    np.random.randn(10, 3) | clamp(1, 0, 2)
:param col: column to clamp"""                                                   # clamp
        self.col = col; self.min = min; self.max = max                           # clamp 
[docs]    def __ror__(self, it):                                                       # clamp
        col = self.col; min_ = self.min; max_ = self.max                         # clamp
        if isinstance(it, np.ndarray):                                           # clamp
            if col is None: return np.clip(it, min_, max_)                       # clamp
            else: a = np.copy(it); a[:,col] = np.clip(a[:,col], min_, max_); return a # clamp
        if hasTorch and isinstance(it, torch.Tensor):                            # clamp
            if col is None: return torch.clamp(it, min_, max_)                   # clamp
            else: a = torch.clone(it); a[:,col] = torch.clamp(a[:,col], min_, max_); return a # clamp
        return it | cli.apply(lambda x: max(min(x, max_), min_), self.col)       # clamp