# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for functions that sort of changes the table
structure in a dramatic way. They're the core transformations
"""
from typing import List, Union, Iterator, Callable, Any, Tuple, Dict
from collections import defaultdict, Counter, deque
from k1lib.cli.init import patchDefaultDelim, BaseCli, oneToMany, fastF, yieldT
import k1lib.cli as cli; import k1lib.cli.init as init; from k1lib.cli.typehint import *
import itertools, numpy as np, k1lib, math, json; plt = k1lib.dep("matplotlib.pyplot")
try: import torch; hasTorch = True
except: torch = k1lib.Object().withAutoDeclare(lambda: type("RandomClass", (object, ), {})); hasTorch = True
__all__ = ["transpose", "T", "reshape", "insert", "splitW", "splitC",
           "joinStreams", "joinSt", "joinStreamsRandom", "activeSamples",
           "table", "batched", "batchedTrigger", "window", "groupBy", "ungroup",
           "insertColumn", "insertIdColumn", "insId",
           "unsqueeze",
           "count", "hist", "permute", "AA_", "peek", "peekF",
           "repeat", "repeatF", "repeatFrom", "oneHot", "latch"]
settings = k1lib.settings.cli
[docs]class transpose(BaseCli):                                                        # transpose
[docs]    def __init__(self, dim1:int=0, dim2:int=1, fill=None):                       # transpose
        """Join multiple columns and loop through all rows. Aka transpose.
Example::
    # returns [[1, 4], [2, 5], [3, 6]]
    [[1, 2, 3], [4, 5, 6]] | transpose() | deref()
    # returns [[1, 4], [2, 5], [3, 6], [0, 7]]
    [[1, 2, 3], [4, 5, 6, 7]] | transpose(fill=0) | deref()
Multidimensional transpose works just like :meth:`torch.transpose` too::
    # returns (2, 7, 5, 3), but detected Tensor, so it will use builtin :meth:`torch.transpose`
    torch.randn(2, 3, 5, 7) | transpose(3, 1) | shape()
    # also returns (2, 7, 5, 3), but actually does every required computation. Can be slow if shape is huge
    torch.randn(2, 3, 5, 7) | deref(igT=False) | transpose(3, 1) | shape()
Can also work with numpy arrays::
    # returns (5, 3, 2)
    np.random.randn(2, 3, 5) | transpose(0, 2) | op().shape
Be careful with infinite streams, as transposing stream of shape (inf, 5) will
hang this operation! Either don't do it, or temporarily limit all infinite streams like
this::
    with settings.cli.context(inf=21):
        # returns (3, 21)
        [2, 1, 3] | repeat() | transpose() | shape()
Also be careful with empty streams, as you might not get any results at all::
    # returns [], as the last stream has no elements
    [[1, 2], [3, 4], []] | transpose() | deref()
    # returns [[1, 3, 0], [2, 4, 0]]
    [[1, 2], [3, 4], []] | transpose(fill=0) | deref()
This also has an alias of ``T()``, so you can make it shorter, like ``np.random.randn(2, 3, 5) | T()``
:param fill: if not None, then will try to zip longest with this fill value"""   # transpose
        super().__init__(); self.fill = fill                                     # transpose
        self.d1 = min(dim1, dim2); self.d2 = max(dim1, dim2)                     # transpose
        self.normal = self.d1 == 0 and self.d2 == 1                              # transpose 
    def _all_array_opt(self, it, level:int): return it | transpose(self.d1 + level, self.d2 + level, self.fill) # transpose
    def _typehint(self, inp):                                                    # transpose
        if isinstance(inp, tArrayTypes):                                         # transpose
            if inp.rank is None: return inp                                      # transpose
            if inp.rank > max(self.d1, self.d2): return inp                      # transpose
            else: return tAny() # this case doesn't quite exist                  # transpose
        if self.d1 == 0 and self.d2 == 1:                                        # transpose
            if isinstance(inp, tListIterSet):                                    # transpose
                if isinstance(inp.child, tListIterSet):                          # transpose
                    return tIter(tList(inp.child.child))                         # transpose
        return tAny()                                                            # transpose
[docs]    def __ror__(self, it):                                                       # transpose
        d1 = self.d1; d2 = self.d2; fill = self.fill                             # transpose
        if isinstance(it, torch.Tensor): return it.transpose(d1, d2)             # transpose
        if isinstance(it, np.ndarray):                                           # transpose
            dims = list(range(len(it.shape)))                                    # transpose
            temp = dims[d1]; dims[d1] = dims[d2]; dims[d2] = temp                # transpose
            return it.transpose(dims)                                            # transpose
        if d1 != 0 or d2 != 1: return it | cli.serial(*([transpose(fill=fill).all(i) for i in range(d1, d2)] + [transpose(fill=fill).all(i-1) for i in range(d2-1, d1, -1)])) # transpose
        if self.fill is None: return zip(*it)                                    # transpose
        else: return itertools.zip_longest(*it, fillvalue=fill)                  # transpose 
[docs]    @staticmethod                                                                # transpose
    def fill(fill="", dim1:int=0, dim2:int=1):                                   # transpose
        """Convenience method to fill in missing elements of a table.
Example::
    # returns [[1, 2, 3], [4, 5, 0]]
    [[1, 2, 3], [4, 5]] | transpose.fill(0) | deref()
    # also returns [[1, 2, 3], [4, 5, 0]], demonstrating how it works underneath
    [[1, 2, 3], [4, 5]] | transpose(fill=0) | transpose(fill=0) | deref()"""     # transpose
        return transpose(dim1, dim2, fill=fill) | transpose(dim1, dim2, fill=fill) # transpose 
[docs]    @staticmethod                                                                # transpose
    def wrap(f, dim1:int=0, dim2:int=1, fill=None):                              # transpose
        """Wraps ``f`` around 2 :class:`transpose`, can be useful in combination with
:class:`k1lib.cli.init.mtmS`. Example::
    # returns [[1, 4, 3, 4], [8, 81, 10, 11]]
    [range(1, 5), range(8, 12)] | transpose.wrap(mtmS.f(apply(op()**2), 1)) | deref()
    # also returns [[1, 4, 3, 4], [8, 81, 10, 11]], demonstrating the typical way to do this
    [range(1, 5), range(8, 12)] | apply(op()**2, 1) | deref()
The example given is sort of to demonstrate this only. Most of the time, just use
:class:`~k1lib.cli.modifier.apply` with columns instead. But sometimes you need direct
access to a column, so this is how you can do it."""                             # transpose
        if not isinstance(f, BaseCli): f = cli.applyS(f)                         # transpose
        return transpose(dim1, dim2, fill) | f | transpose(dim1, dim2, fill)     # transpose 
    def _jsF(self, meta):                                                        # transpose
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # transpose
        if self.d1 != 0 or self.d2 != 1: raise Exception(f"transpose._jsF() doesn't allow complex transpose across many dimensions yet") # transpose
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.transpose({cli.kjs.v(self.fill)})", fIdx # transpose 
#tOpt.clearPasses()                                                              # transpose
def oTranspose1(cs, ts, metadata): # `transpose() | transpose().all()` to `transpose() | apply(aS(torch.stack) | transpose())` # oTranspose1
    tr, ap = cs; t = ts[0]                                                       # oTranspose1
    if (not ap.normal) or (not tr.normal): return None                           # oTranspose1
    if (not isinstance(ap.f, transpose)) or (not ap.f.normal): return None       # oTranspose1
    if isinstance(t, tListIterSet) and (isinstance(t.child, tTensor) or isinstance(t.child, tCollection) and isinstance(tLowest(*t.child.children), tTensor)): # oTranspose1
        return [transpose(), cli.apply(cli.aS(torch.stack) | transpose())]       # oTranspose1
tOpt.addPass(oTranspose1, [transpose, cli.apply])                                # oTranspose1
T = transpose                                                                    # oTranspose1
def _formStructure(it, dims, dimI):                                              # _formStructure
    if dimI >= len(dims): return next(it)                                        # _formStructure
    return [_formStructure(it, dims, dimI+1) for i in range(dims[dimI])]         # _formStructure
[docs]class reshape(BaseCli):                                                          # reshape
[docs]    def __init__(self, *dims):                                                   # reshape
        """Reshapes the input stream into the desired shape.
Example::
    # returns [[0, 1, 2], [3, 4, 5]]
    range(6) | reshape(2, 3) | deref()
    # returns [[0, 1], [2, 3], [4, 5]]
    range(6) | reshape(3, 2) | deref()
    # returns [[0, 1], [2, 3], [4, 5]], stopped early
    range(100) | reshape(3, 2) | deref()
    # returns [[0, 1, 2], [3, 4, 5]], can leave out first dimension
    range(6) | reshape(-1, 3) | deref()
    # returns [[0, 1, 2]], won't include 2nd element, as it ran out of elements
    range(5) | reshape(-1, 3) | deref()
    # throws error, as it ran out of elements and can't fulfill the request
    range(6) | reshape(3, 3) | deref()
Unlike :meth:`torch.reshape`, the input piped into this has to be a simple iterator.
If you have a complex data structure with multiple dimensions, turn that into a simple
iterator with :class:`joinStreams` first, like this::
    # returns [[[0, 1, 2]], [[3, 4, 5]]]
    [[[0], [1]], [[2], [3]], [[4], [5]]] | joinStreams(2) | reshape(2, 1, 3) | deref()""" # reshape
        self.dims = dims                                                         # reshape 
[docs]    def __ror__(self, it):                                                       # reshape
        it = iter(it)                                                            # reshape
        if self.dims[0] == -1:                                                   # reshape
            try:                                                                 # reshape
                while True: yield _formStructure(it, self.dims, 1)               # reshape
            except StopIteration: pass                                           # reshape
        else:                                                                    # reshape
            for i in range(self.dims[0]): yield _formStructure(it, self.dims, 1) # reshape  
[docs]class insert(BaseCli):                                                           # insert
[docs]    def __init__(self, element, begin=True):                                     # insert
        """Join element into list.
Example::
    # returns [5, 2, 6, 8]
    [2, 6, 8] | insert(5) | deref()
    # returns [2, 6, 8, 5]
    [2, 6, 8] | insert(5, begin=False) | deref()
    # returns [[3, 1], 2, 6, 8]
    [2, 6, 8] | insert([3, 1]) | deref()
:param element: the element to insert"""                                         # insert
        super().__init__(); self.element = element; self.begin = begin; self.expand = False # insert 
[docs]    def __ror__(self, it):                                                       # insert
        element = self.element; it = iter(it)                                    # insert
        if self.begin: yield element; yield from it                              # insert
        else: yield from it; yield element                                       # insert 
    def _jsF(self, meta):                                                        # insert
        fIdx = init._jsFAuto(); elemIdx = init._jsDAuto(); dataIdx = init._jsDAuto() # insert
        return f"const {elemIdx} = {json.dumps(self.element)}; const {fIdx} = ({dataIdx}) => {dataIdx}.insert({elemIdx}, {cli.kjs.v(self.begin)})", fIdx # insert 
[docs]class splitW(BaseCli):                                                           # splitW
[docs]    def __init__(self, *weights:List[float]):                                    # splitW
        """Splits elements into multiple weighted lists. If no weights are provided,
then automatically defaults to [0.8, 0.2]. Example::
    # returns [[0, 1, 2, 3, 4, 5, 6, 7], [8, 9]]
    range(10) | splitW(0.8, 0.2) | deref()
    # same as the above
    range(10) | splitW() | deref()
This also works with array types::
    torch.randn(100, 3) | splitW() # returns 2 tensors with shapes (80, 3) and (20, 3)
See also: :class:`splitC`"""                                                     # splitW
        super().__init__();                                                      # splitW
        if len(weights) == 0: weights = [0.8, 0.2]                               # splitW
        self.weights = np.array(weights)                                         # splitW 
[docs]    def __ror__(self, it):                                                       # splitW
        try: it[0]; len(it)                                                      # splitW
        except: it = list(it)                                                    # splitW
        ws = self.weights; c = 0; ws = (ws * len(it) / ws.sum()).astype(int)     # splitW
        for w in ws[:-1]: yield it[c:c+w]; c += w                                # splitW
        yield it[c:]                                                             # splitW 
    def _jsF(self, meta):                                                        # splitW
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # splitW
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.splitW({cli.kjs.vs(self.weights) | cli.join(', ')})", fIdx # splitW 
[docs]class splitC(BaseCli):                                                           # splitC
[docs]    def __init__(self, *checkpoints:List[float]):                                # splitC
        """Splits elements into multiple checkpoint-delimited lists.
Example::
    # returns [[0, 1], [2, 3, 4], [5, 6, 7, 8, 9]]
    range(10) | splitC(2, 5) | deref()
    # returns ['01', '234', '56789']
    "0123456789" | splitC(2, 5) | deref()
Here, you're specifying 2 checkpoints, 2 and 5, so it will split
the list up into 3 sections. First section is 0-2, second section
is 2-5, third section is 5-end. You can pass in fractional
checkpoints too::
    # returns [[0, 1], [2, 3, 4, 5], [6, 7, 8, 9]]
    range(10) | splitC(0.2, 0.6) | deref()
This cli might be unintuitive to remember, so if you want to just
split it up into 2 parts, check out :meth:`~k1lib.cli.filt.head.split`.
If you want to split things up by weighted length, check
out :class:`splitW`"""                                                           # splitC
        self.checkpoints = checkpoints | cli.aS(np.array)                        # splitC
        self.intMode = checkpoints | cli.apply(lambda x: int(x) == x) | cli.aS(all) # splitC 
[docs]    def __ror__(self, it):                                                       # splitC
        try: it[0]; len(it)                                                      # splitC
        except: it = list(it)                                                    # splitC
        cs = self.checkpoints                                                    # splitC
        if not self.intMode: cs = (cs * len(it)).astype(int)                     # splitC
        cs = sorted(cs); yield it[:cs[0]]                                        # splitC
        for i in range(len(cs)-1): yield it[cs[i]:cs[i+1]]                       # splitC
        yield it[cs[-1]:]                                                        # splitC 
    def _jsF(self, meta):                                                        # splitC
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # splitC
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.splitC({cli.kjs.vs(self.checkpoints) | cli.join(', ')})", fIdx # splitC 
[docs]class joinStreams(BaseCli):                                                      # joinStreams
[docs]    def __init__(self, dims=1):                                                  # joinStreams
        """Joins multiple streams.
Example::
    # returns [1, 2, 3, 4, 5]
    [[1, 2, 3], [4, 5]] | joinStreams() | deref()
    # returns [[0, 1], [2], [3, 4, 5], [6, 7, 8], [], [9, 10]]
    [[[0, 1], [2], [3, 4, 5]], [[6, 7, 8], [], [9, 10]]] | joinStreams() | deref()
    # returns [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    [[[0, 1], [2], [3, 4, 5]], [[6, 7, 8], [], [9, 10]]] | joinStreams(2) | deref()
If you pass in :class:`numpy.ndarray` or :class:`torch.Tensor`, then it will
automatically use the C-accelerated version, like this::
    # returns Tensor with shape (6, 4)
    torch.randn(2, 3, 4) | joinStreams()
    # returns array with shape (6, 4)
    np.random.randn(2, 3, 4) | joinStreams()
Sometimes, you may want to impose some dimensional structure after joining all streams
together, which :class:`reshape` does.
If "joinStreams" is too unintuitive to remember, there's also an alias called
:class:`joinSt`.
:param dims: how many ``joinStreams()`` do you want to do consecutively?"""      # joinStreams
        if dims < 0: raise AttributeError(f"`dims` ({dims}) can't be less than 0, as it doesn't make any sense!") # joinStreams
        self.dims = dims; self.multi = cli.serial(*(joinStreams() for d in range(dims))) if dims > 1 else None # joinStreams 
    def _all_array_opt(self, it, level:int): sh = it | cli.shape(); return it.reshape([*sh[:level], sh[level:level+self.dims+1] | cli.toProd(), *sh[level+self.dims+1:]]) # joinStreams
[docs]    def __ror__(self, streams:Iterator[Iterator[Any]]) -> Iterator[Any]:         # joinStreams
        if isinstance(streams, settings.arrayTypes): return self._all_array_opt(streams, 0) # joinStreams
        if self.multi != None:                                                   # joinStreams
            return streams | self.multi                                          # joinStreams
            def gen(): yield from streams | self.multi                           # joinStreams
            return gen()                                                         # joinStreams
        elif self.dims == 1:                                                     # joinStreams
            def gen():                                                           # joinStreams
                for stream in streams: yield from stream                         # joinStreams
            return gen()                                                         # joinStreams
        else: return streams                                                     # joinStreams 
    def _jsF(self, meta):                                                        # joinStreams
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # joinStreams
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.joinSt({cli.kjs.v(self.dims)})", fIdx # joinStreams 
joinSt = joinStreams                                                             # joinStreams
def probScale(ps, t): # t from 0 -> 1, for typical usage                         # probScale
    l = np.log(ps); avg = l.mean()                                               # probScale
    a = (l-avg)*t+avg; a -= a.max()                                              # probScale
    ans = np.exp(a); return ans/ans.sum()                                        # probScale
import random                                                                    # probScale
def rand(n, ps=None):                                                            # rand
    if ps is None:                                                               # rand
        while True: yield random.randrange(n)                                    # rand
    else:                                                                        # rand
        while True: yield from np.random.choice(n, size=100, p=ps)               # rand
[docs]class joinStreamsRandom(BaseCli):                                                # joinStreamsRandom
[docs]    def __init__(self, alpha=0, ps=None):                                        # joinStreamsRandom
        """Join multiple streams randomly. If any streams runs out, then quits. If
any stream yields :data:`~k1lib.cli.init.yieldT`, then just ignores that result and
continue. Could be useful in active learning. Example::
    # could return [0, 1, 10, 2, 11, 12, 13, ...], with max length 20, typical length 18
    [range(0, 10), range(10, 20)] | joinStreamsRandom() | deref()
    stream2 = [[-5, yieldT, -4, -3], yieldT | repeat()] | joinStreams()
    # could return [-5, -4, 0, -3, 1, 2, 3, 4, 5, 6], demonstrating yieldT
    [range(7), stream2] | joinStreamsRandom() | deref()
By default, all streams are treated equally, and are yielded with equal probabilities.
However, you can tweak these probabilities a little bit, to your liking. This is
controlled by the parameter ``alpha``:
.. image:: ../images/probScale.png
If ``alpha`` is 0, then all probabilities will be the same. If ``alpha`` is 1,
then all probabilities are proportional to the length of the input stream. The
original intention was to vary ``alpha`` just from 0 to 1, but it can actually
be of any number::
    [range(0, 10), range(10, 100)] | joinStreamsRandom(0) | shape(0) # returns around 21, because it favors both streams equally
    [range(0, 10), range(10, 100)] | joinStreamsRandom(1) | shape(0) # returns around 90, because it favors the second array 9x more
    [range(0, 10), range(10, 100)] | joinStreamsRandom(100) | shape(0) # returns 90, because it highly favors the second array
    [range(0, 10), range(10, 100)] | joinStreamsRandom(-100) | shape(0) # returns 10, because it highly favors the first array
:param alpha: if not zero, does a weighted joining, instead of totally uniform probability
:param ps: if specified, use these probabilities, else try to determine from the lengths of the input streams""" # joinStreamsRandom
        super().__init__(); self.alpha = alpha; self.ps = ps                     # joinStreamsRandom 
[docs]    def __ror__(self, streams:Iterator[Iterator[Any]]) -> Iterator[Any]:         # joinStreamsRandom
        alpha = self.alpha; ps = self.ps; streams = list(streams); nStreams = len(streams) # joinStreamsRandom
        if alpha != 0:                                                           # joinStreamsRandom
            if ps is None:                                                       # joinStreamsRandom
                try: ps = np.array([len(st) for st in streams])                  # joinStreamsRandom
                except:                                                          # joinStreamsRandom
                    streams = [list(st) for st in streams]                       # joinStreamsRandom
                    ps = np.array([len(st) for st in streams])                   # joinStreamsRandom
            else: ps = np.array(list(ps))*1.0                                    # joinStreamsRandom
        else: ps = np.array([1/nStreams]*nStreams)                               # joinStreamsRandom
        ps = probScale(ps/ps.sum(), alpha)                                       # joinStreamsRandom
        streams = [iter(st) for st in streams]                                   # joinStreamsRandom
        try:                                                                     # joinStreamsRandom
            for streamIdx in rand(len(streams), ps):                             # joinStreamsRandom
                o = next(streams[streamIdx])                                     # joinStreamsRandom
                if not o is yieldT: yield o # "not is" to fix numpy `==`         # joinStreamsRandom
        except StopIteration: pass                                               # joinStreamsRandom 
    def _jsF(self, meta):                                                        # joinStreamsRandom
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # joinStreamsRandom
        if self.alpha != 0 and not self.ps is None: raise Exception("joinStreamsRandom._jsF() doesn't allow custom alpha and ps yet") # joinStreamsRandom
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.joinStreamsRandom()", fIdx # joinStreamsRandom 
[docs]class activeSamples(BaseCli):                                                    # activeSamples
[docs]    def __init__(self, limit:int=100, p:float=0.95):                             # activeSamples
        """Yields active learning samples.
Example::
    o = activeSamples()
    ds = range(10) # normal dataset
    ds = [o, ds] | joinStreamsRandom() # dataset with active learning capability
    next(ds) # returns 0
    next(ds) # returns 1
    next(ds) # returns 2
    o.append(20)
    next(ds) # can return     3     or 20
    next(ds) # can return (4 or 20) or 4
So the point of this is to be a generator of samples. You can define your dataset
as a mix of active learning samples and standard samples. Whenever there's a data
point that you want to focus on, you can add it to ``o`` and it will eventially yield
it.
.. warning::
    It might not be a good idea to set param ``limit`` to higher numbers than
    100. This is because, the network might still not understand a wrong sample
    after being shown multiple times, and will keep adding that wrong sample
    back in, distracting it from other samples, and reduce network's accuracy
    after removing active learning from it.
    If ``limit`` is low enough (from my testing, 30-100 should be fine), then
    old wrong samples will be kicked out, allowing for a fresh stream of wrong
    samples coming in, and preventing the problem above. If you found that
    removing active learning makes the accuracy drops dramatically, then try
    decreasing the limit.
:param limit: max number of active samples. Discards samples if number of samples
    is over this.
:param p: probability of actually adding the samples in"""                       # activeSamples
        super().__init__(); self.p = p                                           # activeSamples
        self.samples = deque([], limit)                                          # activeSamples 
[docs]    def append(self, item):                                                      # activeSamples
        """Adds 1 sample."""                                                     # activeSamples
        if random.random() < self.p: self.samples.append(item)                   # activeSamples 
[docs]    def extend(self, items):                                                     # activeSamples
        """Adds multiple samples."""                                             # activeSamples
        for item in items: self.append(item)                                     # activeSamples 
    def __iter__(self):                                                          # activeSamples
        samples = self.samples                                                   # activeSamples
        while True:                                                              # activeSamples
            if len(samples) == 0: yield yieldT                                   # activeSamples
            else: yield samples.popleft()                                        # activeSamples 
[docs]def table(delim:str=None):                                                       # table
    """Basically ``op().split(delim).all()``. This exists because this is used
quite a lot in bioinformatics. Example::
    # returns [['a', 'bd'], ['1', '2', '3']]
    ["a|bd", "1|2|3"] | table("|") | deref()"""                                  # table
    return cli.op().split(patchDefaultDelim(delim)).all()                        # table 
def _batch(it, bs, includeLast, incl):                                           # _batch
    l = []; it = iter(it); sentinel = object(); last = sentinel                  # _batch
    try:                                                                         # _batch
        if incl:                                                                 # _batch
            while True:                                                          # _batch
                if last is not sentinel: l.append(last)                          # _batch
                for i in range(bs): l.append(next(it))                           # _batch
                last = next(it); l.append(last)                                  # _batch
                yield l; l = []                                                  # _batch
        else:                                                                    # _batch
            while True:                                                          # _batch
                for i in range(bs): l.append(next(it))                           # _batch
                yield l; l = []                                                  # _batch
            pass                                                                 # _batch
    except StopIteration:                                                        # _batch
        if includeLast and len(l) > 0: yield l                                   # _batch
def _batchRange(it, bs, includeLast, incl):                                      # _batchRange
    start, stop, step = it.start, it.stop, it.step                               # _batchRange
    lastCur = start; cur = lastCur + bs*step                                     # _batchRange
    if incl:                                                                     # _batchRange
        while cur <= stop: yield range(lastCur, min(cur+1, stop), step); lastCur = cur; cur += bs*step # _batchRange
    else:                                                                        # _batchRange
        while cur <= stop: yield range(lastCur, cur, step); lastCur = cur; cur += bs*step # _batchRange
    if includeLast and lastCur < stop: yield range(lastCur, stop, step)          # _batchRange
def _batchSliceable(it, bs, includeLast, incl):                                  # _batchSliceable
    cur = 0; n = len(it)                                                         # _batchSliceable
    if incl:                                                                     # _batchSliceable
        while (cur+1)*bs <= n: yield it[cur*bs:(cur+1)*bs+1]; cur += 1           # _batchSliceable
    else:                                                                        # _batchSliceable
        while (cur+1)*bs <= n: yield it[cur*bs:(cur+1)*bs]; cur += 1             # _batchSliceable
    if includeLast:                                                              # _batchSliceable
        ans = it[cur*bs:(cur+1)*bs]                                              # _batchSliceable
        if len(ans) > 0: yield ans                                               # _batchSliceable
[docs]class batched(BaseCli):                                                          # batched
[docs]    def __init__(self, bs=32, includeLast=False, incl=False):                    # batched
        """Batches the input stream.
Example::
    # returns [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
    range(11) | batched(3) | deref()
    # returns [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
    range(11) | batched(3, True) | deref()
    # returns [[0, 1, 2, 3, 4]]
    range(5) | batched(float("inf"), True) | deref()
    # returns []
    range(5) | batched(float("inf"), False) | deref()
    # returns [[0, 1, 2, 3], [3, 4, 5, 6], [6, 7, 8, 9]], includes the first element of the next batch!
    range(11) | batched(3, incl=True) | deref()
    # returns [[0, 1, 2, 3], [3, 4, 5, 6], [6, 7, 8, 9], [9, 10]]
    range(11) | batched(3, True, incl=True) | deref()
Can work well and fast with :class:`torch.Tensor` and :class:`numpy.ndarray`::
    # both returns torch.Tensor of shape (2, 3, 4, 5)
    torch.randn(6, 4, 5) | batched(3)
    torch.randn(7, 4, 5) | batched(3)
Also, if input is a :class:`range`, then to save time, a bunch of other
ranges will be returned, instead of a bunch of lists, for performance::
    # returns [range(0, 3), range(3, 6), range(6, 9)]
    range(11) | batched(3) | toList()
See also: :class:`window`, :class:`batchedTrigger`
:param bs: batch size
:param includeLast: whether to include the last incomplete batch or not
:param incl: whether to have inclusive bounds or not"""                          # batched
        super().__init__(); self.bs = bs; self.includeLast = includeLast; self.incl = incl # batched 
    def _all_array_opt(self, it, level):                                         # batched
        if self.includeLast: return NotImplemented                               # batched
        bs = self.bs; a = [slice(None, None, None)]*level; n = it.shape[level] // bs; it = it[(*a, slice(None, n*bs))] # batched
        return it.reshape(*it.shape[:level], n, bs, *it.shape[level+1:])         # batched
[docs]    def __ror__(self, it):                                                       # batched
        bs = self.bs; includeLast = self.includeLast; incl = self.incl           # batched
        if bs == float("inf"): return [it] if includeLast else []                # batched
        if not incl and isinstance(it, k1lib.settings.cli.arrayTypes):           # batched
            if (not includeLast) or (it.shape[0]%bs == 0): n = it.shape[0] // bs; it = it[:n*bs]; return it.reshape(n, bs, *it.shape[1:]) # batched
        if not incl and isinstance(it, range): return _batchRange(it, bs, includeLast, incl) # batched
        try: it[0]; len(it); return _batchSliceable(it, bs, includeLast, incl)   # batched
        except: return _batch(it, bs, includeLast, incl)                         # batched 
    def _jsF(self, meta):                                                        # batched
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # batched
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.batched({cli.kjs.v(self.bs)}, {cli.kjs.v(self.includeLast)})", fIdx # batched 
[docs]class batchedTrigger(BaseCli): # batched the data using a trigger column         # batchedTrigger
[docs]    def __init__(self, col:int=None, value:"any"=None, delta:float=None, adj:bool=True): # value is to only yield segments that have the specified value # batchedTrigger
        """Like :class:`batched`, will batch rows/elements up but based on
a trigger value instead. See :class:`~k1lib.cli.filt.trigger` for more
discussion. Normal mode::
    data = [[1, "a"], [1, "b"], [2, "c"], [3, "d"], [3, "e"], [1, "f"]]
    # returns [[1, 1], [2], [3, 3], [1]]
    [1, 1, 2, 3, 3, 1] | batchedTrigger()  | deref()
    # throws error, does not make sense to invert search. Can only invert if .value is specified
    [1, 1, 2, 3, 3, 1] | ~batchedTrigger() | deref()
    # returns [[[1, 'a'], [1, 'b']], [[2, 'c']], [[3, 'd'], [3, 'e']], [[1, 'f']]]
    data | batchedTrigger(0) | deref()
In this mode, elements of the specified column that have the same value consecutively
will be grouped together into a batch. Note the difference between this and :class:`groupBy`.
Specific-value mode::
    # returns [[1, 1], [1]]
    [1, 1, 2, 3, 3, 1] | batchedTrigger(value=1)  | deref()
    # returns [[2], [3, 3]], essentially only yield the batches that don't match the specified value
    [1, 1, 2, 3, 3, 1] | ~batchedTrigger(value=1) | deref()
    # returns [[[1, 'a'], [1, 'b']], [[1, 'f']]]
    data | batchedTrigger(0, value=1) | deref()
This mode is pretty much the same as previously, but will only return
the batches whose specified columns match (or not match) a specific value.
Finally, the delta mode::
    # returns [[1, 1.1, 1.2], [2], [3], [3.55, 3.8, 3.9, 4.1], [6], [10]]
    [1, 1.1, 1.2, 2, 3, 3.55, 3.8, 3.9, 4.1, 6, 10] | batchedTrigger(delta=0.5) | deref()
    # returns [[1, 1.1, 1.2], [2], [3], [3.55, 3.8, 3.9], [4.1], [6], [10]]
    [1, 1.1, 1.2, 2, 3, 3.55, 3.8, 3.9, 4.1, 6, 10] | batchedTrigger(delta=0.5, adj=False) | deref()
    # .col also works here, but it's too long to fit here
In contrast to the previous 2 modes, this does not rely on exact values. Instead,
2 consecutive elements are considered to be in the same batch if their difference
is less than the provided ``delta`` value. Within delta mode, there's the parameter
.adj as well. If this is true, then it will measure the delta between the last (3.9)
and current element (4.1), while if .adj=False, then it will measure the delta between
the first element in the group (3.55) and the current element (4.1, which exceeds
delta, which splits into a new batched).
See also: :class:`~k1lib.cli.filt.trigger`
:param col: column of the trigger value. Can be None, int, or list[int]
:param value: if trigger value equals to this, then yields the batch
:param delta: if trigger value changes more than this, then yields the batch
:param adj: "adjacent", only makes sense if ``delta != None``. See example above""" # batchedTrigger
        self.col = col; self.value = value; self.delta = delta; self.adj = adj; self.inverted = False # batchedTrigger
        if value is not None and delta is not None: raise Exception(f"Can't specify precise value and fuzzy values (by specifying delta) at the same time. Does not make sense! Leave either .value or .delta empty") # batchedTrigger 
[docs]    def __invert__(self):                                                        # batchedTrigger
        if self.value is None: raise Exception("batchedTrigger().value is None! Inverting it would not make sense!") # batchedTrigger
        res = batchedTrigger(self.col, self.value); res.inverted = not self.inverted; return res # batchedTrigger 
[docs]    def __ror__(self, it):                                                       # batchedTrigger
        col = self.col; value = self.value; delta = self.delta; adj = self.adj; inv = self.inverted # batchedTrigger
        arr = []; lastE = object() # sentinel                                    # batchedTrigger
        # also, the reason for so many if statements because I don't want to have the ifs inside the for loops, cause that's gonna be slow # batchedTrigger
        if value is None and delta is None: # normal mode                        # batchedTrigger
            if col is None:                                                      # batchedTrigger
                for e in it:                                                     # batchedTrigger
                    if e == lastE: arr.append(e)                                 # batchedTrigger
                    else:                                                        # batchedTrigger
                        if len(arr): yield arr                                   # batchedTrigger
                        arr = [e]                                                # batchedTrigger
                    lastE = e                                                    # batchedTrigger
            elif isinstance(col, int):                                           # batchedTrigger
                for row in it:                                                   # batchedTrigger
                    e = row[col]                                                 # batchedTrigger
                    if e == lastE: arr.append(row)                               # batchedTrigger
                    else:                                                        # batchedTrigger
                        if len(arr): yield arr                                   # batchedTrigger
                        arr = [row]                                              # batchedTrigger
                    lastE = e                                                    # batchedTrigger
            else:                                                                # batchedTrigger
                for row in it:                                                   # batchedTrigger
                    e = tuple([row[x] for x in col])                             # batchedTrigger
                    if e == lastE: arr.append(row)                               # batchedTrigger
                    else:                                                        # batchedTrigger
                        if len(arr): yield arr                                   # batchedTrigger
                        arr = [row]                                              # batchedTrigger
                    lastE = e                                                    # batchedTrigger
            if len(arr): yield arr                                               # batchedTrigger
        elif value is not None: # normal mode, but only returns specified value  # batchedTrigger
            if col is None:                                                      # batchedTrigger
                for e in it:                                                     # batchedTrigger
                    if e == lastE: arr.append(e)                                 # batchedTrigger
                    else:                                                        # batchedTrigger
                        if len(arr) and (lastE == value) ^ inv: yield arr        # batchedTrigger
                        arr = [e]                                                # batchedTrigger
                    lastE = e                                                    # batchedTrigger
            elif isinstance(col, int):                                           # batchedTrigger
                for row in it:                                                   # batchedTrigger
                    e = row[col]                                                 # batchedTrigger
                    if e == lastE: arr.append(row)                               # batchedTrigger
                    else:                                                        # batchedTrigger
                        if len(arr) and (lastE == value) ^ inv: yield arr        # batchedTrigger
                        arr = [row]                                              # batchedTrigger
                    lastE = e                                                    # batchedTrigger
            else:                                                                # batchedTrigger
                value = tuple(value)                                             # batchedTrigger
                for row in it:                                                   # batchedTrigger
                    e = tuple([row[x] for x in col])                             # batchedTrigger
                    if e == lastE: arr.append(row)                               # batchedTrigger
                    else:                                                        # batchedTrigger
                        if len(arr) and (lastE == value) ^ inv: yield arr        # batchedTrigger
                        arr = [row]                                              # batchedTrigger
                    lastE = e                                                    # batchedTrigger
            if len(arr) and (lastE == value) ^ inv: yield arr                    # batchedTrigger
        elif delta is not None: # delta mode                                     # batchedTrigger
            lastE = float("-inf"); arr = []                                      # batchedTrigger
            if adj:                                                              # batchedTrigger
                if col is None:                                                  # batchedTrigger
                    for e in it:                                                 # batchedTrigger
                        if abs(e-lastE) < delta: arr.append(e)                   # batchedTrigger
                        else:                                                    # batchedTrigger
                            if len(arr): yield arr                               # batchedTrigger
                            arr = [e]                                            # batchedTrigger
                        lastE = e                                                # batchedTrigger
                elif isinstance(col, int):                                       # batchedTrigger
                    for row in it:                                               # batchedTrigger
                        e = row[col]                                             # batchedTrigger
                        if abs(e-lastE) < delta: arr.append(row)                 # batchedTrigger
                        else:                                                    # batchedTrigger
                            if len(arr): yield arr                               # batchedTrigger
                            arr = [row]                                          # batchedTrigger
                        lastE = e                                                # batchedTrigger
                else: raise Exception("Multiple columns not supported in delta mode, as it doesn't make any sense") # batchedTrigger
            else:                                                                # batchedTrigger
                if col is None:                                                  # batchedTrigger
                    for e in it:                                                 # batchedTrigger
                        if abs(e-lastE) < delta: arr.append(e)                   # batchedTrigger
                        else:                                                    # batchedTrigger
                            if len(arr): yield arr                               # batchedTrigger
                            arr = [e]; lastE = e                                 # batchedTrigger
                elif isinstance(col, int):                                       # batchedTrigger
                    for row in it:                                               # batchedTrigger
                        e = row[col]                                             # batchedTrigger
                        if abs(e-lastE) < delta: arr.append(row)                 # batchedTrigger
                        else:                                                    # batchedTrigger
                            if len(arr): yield arr                               # batchedTrigger
                            arr = [row]; lastE = e                               # batchedTrigger
                else: raise Exception("Multiple columns not supported in delta mode, as it doesn't make any sense") # batchedTrigger
            if len(arr): yield arr                                               # batchedTrigger
        else: raise Exception("Unreachable")                                     # batchedTrigger  
nothing = object()                                                               # batchedTrigger
[docs]class window(BaseCli):                                                           # window
[docs]    def __init__(self, n, newList=False, pad=nothing):                           # window
        """Slides window of size n forward and yields the windows.
Example::
    # returns [[0, 1, 2], [1, 2, 3], [2, 3, 4]]
    range(5) | window(3) | deref()
    # returns [[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, None], [4, None, None]]
    range(5) | window(3, pad=None) | deref()
If you are doing strange transformations to the result, like
transposing it, then it might complain that the internal deque
(double-ended queue) mutated during iteration. In that case,
then set ``newList`` to True. It's not True by default because
multiple lists will be created, all of which needs memory
allocation, which will be slower::
    # takes 15ms
    range(100000) | window(100) | ignore()
    # takes 48ms, because of allocating lists
    range(100000) | window(100) | ignore()
See also: :class:`~batched`
:param n: size of the window
:param newList: whether to create a new list out of every window or
    not. If False (default), less robust but faster. If True, more
    robust but slower
:param pad: whether to pad the output stream on the end, so that it
    has the same number of elements as the input stream or not"""                # window
        self.n = n; self.listF = (lambda x: list(x)) if newList else (lambda x: iter(x)) # window
        self.pad = pad; self.padBool = pad is not nothing # why do this? Cause in applyMp, "nothing" takes on multiple identities # window 
[docs]    def __ror__(self, it):                                                       # window
        n = self.n; pad = self.pad; q = deque([], n); listF = self.listF         # window
        for e in it:                                                             # window
            q.append(e)                                                          # window
            if len(q) == n: yield listF(q); q.popleft()                          # window
        if self.padBool:                                                         # window
            for i in range(n-1): q.append(pad); yield listF(q); q.popleft()      # window 
    def _jsF(self, meta):                                                        # window
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # window
        if self.padBool: raise Exception("window._jsF() doesn't support custom pad value yet") # window
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.window({cli.kjs.v(self.n)})", fIdx # window 
[docs]class groupBy(BaseCli):                                                          # groupBy
[docs]    def __init__(self, column:int, separate:bool=False, removeCol:bool=None):    # groupBy
        """Groups table by some column.
Example::
    a = [[2.3, 5],
         [3.4, 2],
         [4.5, 2],
         [5.6, 5],
         [6.7, 1]]
    a | groupBy(1) | deref()
This returns::
    [[[6.7, 1]],
     [[3.4, 2], [4.5, 2]],
     [[2.3, 5], [5.6, 5]]]
Should have O(n log(n)) time complexity. What if ``separate`` is True::
    a | groupBy(1, True) | deref()
This returns::
    [[1, [[6.7]]],
     [2, [[3.4], [4.5]]],
     [5, [[2.3], [5.6]]]]
What if ``removeCol`` is False::
    a | groupBy(1, True, False) | deref()
This returns::
    [[1, [[6.7, 1]]],
     [2, [[3.4, 2], [4.5, 2]]],
     [5, [[2.3, 5], [5.6, 5]]]]
There's another perspective and way to think about this operation. A lot of
libraries (like pandas) expect the uncompressed, "flat" version (the variable
``a`` in the examples above). But throughout my time using cli, the grouped
version (separate=True) is usually much more useful and amenable to
transformations. It also occupies less memory too, as the columns with duplicated
elements are deleted.
So, you can sort of think :class:`groupBy` is converting pandas dataframes
into a more easily digestible form. But because the prevalence of those libraries,
after doing all the transformations you want, sometimes it's necessary to flatten
it again, which :class:`ungroup` does.
If you want to group text lines by some pattern im them, :class:`~k1lib.cli.grep.grep`
with ``sep=True`` might be better for you.
:param column: which column to group by
:param separate: whether to separate out the column to sort of form a dict or not. See example
:param removeCol: whether to remove the grouped-by column. Defaults to True if
    ``separate=True``, and False if ``separate=False``"""                        # groupBy
        self.column = column; self.separate = separate                           # groupBy
        if removeCol is None: removeCol = separate # if separate, then remove cols, else don't do it # groupBy
        self.removeCol = removeCol                                               # groupBy 
[docs]    def __ror__(self, it):                                                       # groupBy
        it = [list(e) for e in it]; c = self.column; separate = self.separate; removeCol = self.removeCol # groupBy
        it = it | cli.sort(c, False); sentinel = object(); it = iter(it); a = [next(it, sentinel)] # groupBy
        if a[0] is sentinel: return                                              # groupBy
        v = a[0][c]                                                              # groupBy
        try:                                                                     # groupBy
            while True:                                                          # groupBy
                e = next(it)                                                     # groupBy
                if e[c] == v: a.append(e)                                        # groupBy
                else:                                                            # groupBy
                    if removeCol: a = a | ~cli.cut(c)                            # groupBy
                    yield [v, a] if separate else a; a = [e]; v = a[0][c]        # groupBy
        except StopIteration:                                                    # groupBy
            if len(a) > 0:                                                       # groupBy
                if removeCol: a = a | ~cli.cut(c)                                # groupBy
                yield [v, a] if separate else a                                  # groupBy 
    def _jsF(self, meta):                                                        # groupBy
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # groupBy
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.groupBy({cli.kjs.v(self.column)}, {cli.kjs.v(self.separate)}, {cli.kjs.v(self.removeCol)})", fIdx # groupBy 
[docs]class ungroup(BaseCli):                                                          # ungroup
[docs]    def __init__(self, single=True, begin=True, insertCol:bool=True):            # ungroup
        """Ungroups things that were grouped using a specific mode of
:class:`groupBy`. Particularly useful to transform some complex data
structure into a flat dataframe so that you can plug into pandas. Example::
    # returns [[3, 1.2], [3, 3.4], [5, 6], [5, 8], [5, 11]]
    [[3, [1.2, 3.4]], [5, [6, 8, 11]]] | ungroup() | deref()
    # returns [[3, 1.2], [3, 3.4], [5, 6], [5, 8], [5, 11]]
    [[3, [[1.2], [3.4]]], [5, [[6], [8], [11]]]] | ungroup(False) | deref()
    # returns [[1.2, 3], [3.4, 3], [6, 5], [8, 5], [11, 5]]
    [[3, [1.2, 3.4]], [5, [6, 8, 11]]] | ungroup(begin=False) | deref()
:param single: whether the table in each group has a single column or not
:param begin: whether to insert the column at the beginning or at the end.
    Only works if ``insertCol`` is True
:param insertCol: whether to insert the column into the table or not"""          # ungroup
        self.single = single; self.begin = begin; self.insertCol = insertCol     # ungroup 
[docs]    def __ror__(self, it):                                                       # ungroup
        preprocess = cli.apply(cli.wrapList().all(), 1) if self.single else cli.iden(); begin = self.begin # ungroup
        if self.insertCol: return it | preprocess | ~cli.apply(lambda x, arr: arr | cli.insert(x, begin).all()) | cli.joinStreams() # ungroup
        else: return it | preprocess | cli.cut(1) | cli.joinStreams()            # ungroup 
    def _jsF(self, meta):                                                        # ungroup
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # ungroup
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.ungroup({cli.kjs.v(self.single)}, {cli.kjs.v(self.begin)}, {cli.kjs.v(self.insertCol)})", fIdx # ungroup 
[docs]class insertColumn(BaseCli):                                                     # insertColumn
[docs]    def __init__(self, column:List[Any], begin=True, fill=""):                   # insertColumn
        """Inserts a column at beginning or end.
Example::
    # returns [['a', 1, 2], ['b', 3, 4]]
    [[1, 2], [3, 4]] | insertColumn(["a", "b"]) | deref()
    # returns [[1, 2, 'a'], [3, 4, 'b']]
    [[1, 2], [3, 4]] | insertColumn(["a", "b"], begin=False) | deref()"""        # insertColumn
        self.column = column; self.begin = begin; self.fill = fill               # insertColumn
        self._f = transpose.wrap(insert(column, begin=begin), fill=fill)         # insertColumn 
[docs]    def __ror__(self, it): return it | self._f                                   # insertColumn 
    def _jsF(self, meta): return self._f._jsF(meta)                              # insertColumn 
[docs]class insertIdColumn(BaseCli):                                                   # insertIdColumn
[docs]    def __init__(self, table=False, begin=True):                                 # insertIdColumn
        """Inserts an id column at the beginning (or end).
Example::
    # returns [[0, 'a', 2], [1, 'b', 4]]
    [["a", 2], ["b", 4]] | insertIdColumn(True) | deref()
    # returns [[0, 'a'], [1, 'b']]
    "ab" | insertIdColumn()
This has a shorter alias called :class:`insId`
:param table: if False, then insert column to an Iterator[str], else treat
    input as a full fledged table"""                                             # insertIdColumn
        self.table = table; self.begin = begin                                   # insertIdColumn 
[docs]    def __ror__(self, it):                                                       # insertIdColumn
        if self.table:                                                           # insertIdColumn
            if self.begin: return ([i, *e] for i, e in enumerate(it))            # insertIdColumn
            else: return ([*e, i] for i, e in enumerate(it))                     # insertIdColumn
        else:                                                                    # insertIdColumn
            if self.begin: return ([i, e] for i, e in enumerate(it))             # insertIdColumn
            else: return ([e, i] for i, e in enumerate(it))                      # insertIdColumn 
    def _jsF(self, meta):                                                        # insertIdColumn
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # insertIdColumn
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.insertIdColumn({cli.kjs.v(self.table)}, {cli.kjs.v(self.begin)})", fIdx # insertIdColumn 
insId = insertIdColumn                                                           # insertIdColumn
[docs]def unsqueeze(dim:int=0):                                                        # unsqueeze
    """Unsqueeze input iterator.
Example::
    t = [[1, 2], [3, 4], [5, 6]]
    # returns (3, 2)
    t | shape()
    # returns (1, 3, 2)
    t | unsqueeze(0) | shape()
    # returns (3, 1, 2)
    t | unsqueeze(1) | shape()
    # returns (3, 2, 1)
    t | unsqueeze(2) | shape()
Behind the scenes, it's really just ``wrapList().all(dim)``, but the "unsqueeze" name
is a lot more familiar. Also note that the inverse operation "squeeze" is sort of
``item().all(dim)``, if you're sure that this is desirable::
    t = [[1, 2], [3, 4], [5, 6]]
    # returns (3, 2)
    t | unsqueeze(1) | item().all(1) | shape()"""                                # unsqueeze
    return cli.wrapList().all(dim)                                               # unsqueeze 
def oUnsqueeze(cs, ts, metadata): # reminder: if change this then also change the example in llvm.rst # oUnsqueeze
    a = cs[0]; t = ts[0]; i = 0;                                                 # oUnsqueeze
    if not isinstance(t, tArrayTypes): return None                               # oUnsqueeze
    while isinstance(a, cli.apply) and a.normal: i += 1; a = a.f                 # oUnsqueeze
    if not isinstance(a, cli.wrapList): return None                              # oUnsqueeze
    t = t.__class__(t.child, t.rank+1 if t.rank is not None else None)           # oUnsqueeze
    if isinstance(t, tNpArray): return [cli.aS(lambda x: np.expand_dims(x, i)).hint(t)] # oUnsqueeze
    else: return [cli.aS(lambda x: x.unsqueeze(i)).hint(t)]                      # oUnsqueeze
tOpt.addPass(oUnsqueeze, [cli.apply], 4)                                         # oUnsqueeze
[docs]class count(BaseCli):                                                            # count
[docs]    def __init__(self, max=False):                                               # count
        """Finds unique elements and returns a table with [frequency, value, percent]
columns. Example::
    # returns [[1, 'a', '33%'], [2, 'b', '67%']]
    ['a', 'b', 'b'] | count() | deref()
    # returns [[1, 'a', '50%'], [2, 'b', '100%']]
    ['a', 'b', 'b'] | count(max=True) | deref()
:param max: if True, percentages are of the max value, else they're of the sum
"""                                                                              # count
        super().__init__(); self._max = max                                      # count 
    def _typehint(self, inp):                                                    # count
        i = tAny()                                                               # count
        if isinstance(inp, tListIterSet): i = inp.child                          # count
        return tIter(tCollection(int, i, str))                                   # count
[docs]    def __ror__(self, it:Iterator[str]):                                         # count
        it = it | cli.apply(lambda row: (tuple(row) if isinstance(row, list) else row)) # count
        c = Counter(it); s = max(c.values()) if self._max else sum(c.values())   # count
        return [[v, k, f"{round(100*v/s)}%"] for k, v in c.items()] # has to scan through the entire thing anyway, which is long enough already, so just turn it into a list # count 
[docs]    @staticmethod                                                                # count
    def join():                                                                  # count
        """Joins multiple counts together.
Example::
    # returns [[2, 'a', '33%'], [4, 'b', '67%']]
    ['a', 'b', 'b'] | repeat(2) | applyMp(count() | deref()) | count.join() | deref()
This is useful when you want to get the count of a really long list/iterator using multiple cores""" # count
        def inner(counts):                                                       # count
            values = defaultdict(lambda: 0)                                      # count
            for _count in counts:                                                # count
                if _count is None: continue                                      # count
                for v, k, *_ in _count:                                          # count
                    values[k] += v                                               # count
            s = values.values() | cli.toSum()                                    # count
            return [[v, k, f"{round(100*v/s)}%"] for k, v in values.items()]     # count
        return cli.applyS(inner)                                                 # count 
    def _jsF(self, meta):                                                        # count
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # count
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.count()", fIdx          # count 
[docs]class hist(BaseCli):                                                             # hist
[docs]    def __init__(self, bins:int=30, dropZero:bool=False):                        # hist
        """Bins a long 1d array. Effectively creating a historgram, without
actually plotting it. Example::
    np.random.randn(1000) | hist(5)
That returns something like::
    (array([-2.31449761, -1.17406889, -0.03364017,  1.10678854,  2.24721726]),
     array([ 41, 207, 432, 265,  55]),
     1.1404287156493986)
This format goes with :meth:`~matplotlib.pyplot.bar` directly like this::
    np.random.randn(1000) | hist(10) | ~aS(plt.bar)
If you have tons of data that's handled in multiple processes, but you want to
get an overall histogram, you can do something like this::
    # bad solution, runs slow, accurate
    fileNames | applyMp(cat() | toFloat() | aS(list)) | joinStreams() | hist() | ~aS(plt.bar)
    # good solution, runs fast, slightly inaccurate
    fileNames | applyMp(cat() | toFloat() | hist(300)) | hist.join() | ~aS(plt.bar)
Let's say in each process, you have 10M records, and that you have 1000 processes in total.
In the first solution, you transfer all records (10B records total) to a single process,
then calculate the histogram of them. The transfer overhead is going to be absolutely
enourmous, as well as the computation. This really defeats the purpose of doing
multiprocessing.
In the second solution, you "convert" 10M records into 600 numbers for each process,
which scales up to 600k numbers for all processes. Although big, but certainly
manageable with current hardware. So the data transfer cost is not a lot at all. The
histogram merging part also executes relatively fast, as it only creates an internal
array of 3M length. See over :meth:`hist.join` for param details
:param bins: how many bins should the histogram has?
:param dropZero: if True, will eliminate buckets that has no element in them"""  # hist
        self.bins = bins; self.dropZero = dropZero                               # hist 
[docs]    def __ror__(self, it):                                                       # hist
        if not isinstance(it, settings.arrayTypes): it = list(it)                # hist
        y, x = np.histogram(it, bins=self.bins)                                  # hist
        delta = x[1] - x[0]; x = (x[1:] + x[:-1])/2                              # hist
        if self.dropZero: idx = y > 0; return x[idx], y[idx], delta              # hist
        return x, y, delta                                                       # hist 
[docs]    @staticmethod                                                                # hist
    def join(scale:float=1e4, bins:int=None, log:bool=True, xlog:bool=False):    # hist
        """Joins multiple histograms together.
Example::
    a = np.random.randn(1000); b = np.random.randn(1000)+3
    [a, b] | joinStreams() | hist() | head(2) | ~aS(plt.plot) # ---------------------------------- Ground truth
    [a, b] | hist(300).all() | hist.join(scale=1e4) | head(2) | ~aS(plt.plot) # ------------------ Log joining
    [a, b] | hist(300).all() | hist.join(scale=1e4, log=False) | head(2) | ~aS(plt.plot, ".") # -- Linear joining
    plt.legend(["Ground truth", "Log", "Linear"]); plt.grid(True); plt.ylabel("Frequency"); plt.xlabel("Value");
This results in this:
.. image:: ../images/hist1.png
As you can see, this process is only approximate, but is accurate enough in everyday
use. If you are a normal user, then this is probably enough. However, if you're a
mathematician and really care about the accuracy of this, read on.
.. admonition:: Performance vs accuracy
    As mentioned in :class:`hist`, joining histograms from across processes can really speed
    things up big time. But the joining process is complicated, with multiple parameters
    and different tradeoffs for each config. In this example, scale = 1e4, bins = 30,
    OG bins = 300, log = True. "OG bins" is the number of bins coming into :meth:`hist.join`
    To get the best accuracy possible, you should set scale and OG bins high. If better
    performance is desired, you should first lower scale, then lower OG bins, then finally lower
    bins.
.. admonition:: Log scale
    Take a look at this piece of code::
        a, b = np.random.randn(1000)*1, np.random.randn(3000000)*0.3+3
        [a, b] | joinStreams() | hist() | head(2) | ~aS(plt.plot)
        [a, b] | hist(300).all() | hist.join(scale=1e4) | head(2) | ~aS(plt.plot)
        [a, b] | hist(300).all() | hist.join(scale=1e4, log=False) | head(2) | ~aS(plt.plot, ".")
        plt.yscale("log"); plt.legend(["Ground truth", "Log", "Linear"]); plt.grid(True); plt.ylabel("Frequency"); plt.xlabel("Value");
    This results in:
    .. image:: ../images/hist2.png
    This shows how log mode is generally better than linear mode when the frequencies span
    across multiple orders of magnitude. So why not delete linear mode directly? Well, I have
    not formally proved that log scale case fully covers the linear case, although in practice
    it seems so. So just to be cautious, let's leave it in
.. admonition:: Scale
    The setup is just like in the "Log scale" section, but with scale = 1e3 instead of the default 1e4:
    .. image:: ../images/hist3.png
    Remember that the higher the scale, the more accurate, but also the longer it runs. If
    the difference in high and low frequencies are bigger than scale, then the low
    frequency values are dropped.
:param scale: how big a range of frequencies do we want to capture?
:param bins: output bins. If not specified automatically defaults to 1/10 the
    original number of bins
:param log: whether to transform everything to log scale internally on the y axis
:param xlog: whether to transform everything to log scale internally on the x axis""" # hist
        def inner(it):                                                           # hist
            it = it | (cli.apply(cli.apply(math.log), 0) if xlog else cli.iden()) | cli.deref() # hist
            _bins = bins if bins is not None else it | cli.cut(0) | cli.shape(0).all() | cli.toMean() | cli.op()/10 | cli.aS(int) # hist
            maxY = max(it | cli.cut(1) | cli.toMax().all() | cli.toMax() | cli.op()/scale, 1e-9) # hist
            if log: it = it | cli.apply(lambda x: np.log(x+1e-9)-math.log(maxY) | cli.aS(np.exp) | cli.aS(np.round) | cli.op().astype(int), 1) | cli.deref() # hist
            else: it = it | cli.apply(lambda y: (y/maxY).astype(int), 1) | cli.deref() # hist
            x, y, delta = it | cli.cut(0, 1) | cli.transpose().all() | cli.joinStreams() | ~cli.apply(lambda a,b: [a]*b) | cli.joinStreams() | cli.aS(list) | cli.aS(np.array) | hist(_bins) # hist
            return x | ((cli.apply(math.exp) | cli.deref()) if xlog else cli.iden()), y*maxY, delta # hist
        return cli.aS(inner)                                                     # hist  
def _permuteGen(row, pers):                                                      # _permuteGen
    row = list(row); return (row[i] for i in pers)                               # _permuteGen
[docs]class permute(BaseCli):                                                          # permute
[docs]    def __init__(self, *permutations:List[int]):                                 # permute
        """Permutes the columns. Acts kinda like :meth:`torch.Tensor.permute`.
Example::
    # returns [['b', 'a'], ['d', 'c']]
    ["ab", "cd"] | permute(1, 0) | deref()"""                                    # permute
        super().__init__(); self.permutations = permutations                     # permute 
[docs]    def __ror__(self, it:Iterator[str]):                                         # permute
        p = self.permutations                                                    # permute
        for row in it: yield _permuteGen(row, p)                                 # permute 
    def _jsF(self, meta):                                                        # permute
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # permute
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.permute({cli.kjs.vs(self.permutations) | cli.join(', ')})", fIdx # permute 
[docs]class AA_(BaseCli):                                                              # AA_
[docs]    def __init__(self, *idxs:List[int], wraps=False):                            # AA_
        """Returns 2 streams, one that has the selected element, and the other
the rest. Example::
    # returns [5, [1, 6, 3, 7]]
    [1, 5, 6, 3, 7] | AA_(1)
    # returns [[5, [1, 6, 3, 7]]]
    [1, 5, 6, 3, 7] | AA_(1, wraps=True)
You can also put multiple indexes through::
    # returns [[1, [5, 6]], [6, [1, 5]]]
    [1, 5, 6] | AA_(0, 2)
If you don't specify anything, then all indexes will be sliced::
    # returns [[1, [5, 6]], [5, [1, 6]], [6, [1, 5]]]
    [1, 5, 6] | AA_()
As for why the strange name, think of this operation as "AÄ€". In statistics,
say you have a set "A", then "not A" is commonly written as A with an overline
"Ä€". So "AA\_" represents "AÄ€", and that it first returns the selection A.
:param wraps: if True, then the first example will return [[5, [1, 6, 3, 7]]]
    instead, so that A has the same signature as Ā"""                            # AA_
        super().__init__(); self.idxs = idxs; self.wraps = wraps                 # AA_ 
[docs]    def __ror__(self, it:List[Any]) -> List[List[List[Any]]]:                    # AA_
        super().__ror__(it); idxs = self.idxs; it = list(it)                     # AA_
        if len(idxs) == 0: idxs = range(len(it))                                 # AA_
        def gen(idx):                                                            # AA_
            return [it[idx], [v for i, v in enumerate(it) if i != idx]]          # AA_
        if not self.wraps and len(idxs) == 1: return gen(idxs[0])                # AA_
        return [gen(idx) for idx in idxs]                                        # AA_  
[docs]class peek(BaseCli):                                                             # peek
[docs]    def __init__(self):                                                          # peek
        """Returns (firstRow, iterator). This sort of peaks at the first row,
to potentially gain some insights about the internal formats. The returned
iterator is not tampered. Example::
    e, it = iter([[1, 2, 3], [1, 2]]) | peek()
    print(e) # prints "[1, 2, 3]"
    s = 0
    for e in it: s += len(e)
    print(s) # prints "5", or length of 2 lists
You kinda have to be careful about handling the ``firstRow``, because you might
inadvertently alter the iterator::
    e, it = iter([iter(range(3)), range(4), range(2)]) | peek()
    e = list(e) # e is [0, 1, 2]
    list(next(it)) # supposed to be the same as `e`, but is [] instead
The example happens because you have already consumed all elements of the first
row, and thus there aren't any left when you try to call ``next(it)``."""        # peek
        super().__init__()                                                       # peek 
[docs]    def __ror__(self, it:Iterator[Any]) -> Tuple[Any, Iterator[Any]]:            # peek
        it = iter(it); sentinel = object(); row = next(it, sentinel)             # peek
        if row is sentinel: return None, []                                      # peek
        def gen(): yield row; yield from it                                      # peek
        return row, gen()                                                        # peek  
[docs]class peekF(BaseCli):                                                            # peekF
[docs]    def __init__(self, f:Union[BaseCli, Callable[[Any], Any]]):                  # peekF
        r"""Similar to :class:`peek`, but will execute ``f(row)`` and
return the input Iterator, which is not tampered. Example::
    it = lambda: iter([[1, 2, 3], [1, 2]])
    # prints "[1, 2, 3]" and returns [[1, 2, 3], [1, 2]]
    it() | peekF(lambda x: print(x)) | deref()
    # prints "1\n2\n3"
    it() | peekF(headOut()) | deref()"""                                         # peekF
        super().__init__(fs=[f]); self.f = f                                     # peekF 
[docs]    def __ror__(self, it:Iterator[Any]) -> Iterator[Any]:                        # peekF
        it = iter(it); sentinel = object(); row = next(it, sentinel)             # peekF
        if row is sentinel: return []                                            # peekF
        def gen(): yield row; yield from it                                      # peekF
        self.f(row); return gen()                                                # peekF  
settings.add("repeat", k1lib.Settings().add("infBs", 100, "if dealing with infinite lists, how many elements at a time should be processed?"), "settings related to repeat() and repeatFrom()") # peekF
[docs]class repeat(BaseCli):                                                           # repeat
[docs]    def __init__(self, limit:int=None):                                          # repeat
        """Yields a specified amount of the passed in object. If you intend
to pass in an iterator, then make a list out of it first, as second copy of
iterator probably won't work as you will have used it the first time. Example::
    # returns [[1, 2, 3], [1, 2, 3], [1, 2, 3]]
    [1, 2, 3] | repeat(3) | deref()
If ``limit`` is not specified, then this will repeat indefinitely, which may or may
not cause a problem downstream. It could be that some downstream operators want to
get the full view of all elements in the input iterator, but because it's an infinite
iterator, it will never complete executing and get the full view that it wants::
    [1, 2] |  repeat() | deref()  | headOut() # will hang indefinitely, fill RAM up and crash after a while
    [1, 2] | (repeat() | deref()) | headOut() # will not hang, and actually print out the first 10 lines, each line is "[1, 2]"
The solution is to just make a group that starts with ``repeat()``. Then ``repeat()``
can capture all downstream operations, and actually batch up the infinite iterator,
pass each batch into the downstream operations, and merge the results together. The
batch size can be controlled by ``settings.cli.repeat.infBs = 200`` if you need it
for some reason.
See also: :meth:`repeatF`
:param repeat: if None, then repeats indefinitely"""                             # repeat
        super().__init__(capture=True); self.limit = limit                       # repeat 
    def _typehint(self, inp): return tIter(inp)                                  # repeat
    def _all_array_opt(self, it, level):                                         # repeat
        if self.limit is None or self.limit == float("inf"): return NotImplemented # repeat
        sh = it | cli.shape(); s = slice(None, None, None)                       # repeat
        it = it[(*[s]*level, None)]; newSh = [*sh[:level], self.limit, *sh[level:]] # repeat
        if isinstance(it, np.ndarray): return np.broadcast_to(it, newSh) | self.capturedSerial.all(level) # repeat
        elif hasTorch and isinstance(it, torch.Tensor): return it.expand(newSh) | self.capturedSerial.all(level) # repeat
        return NotImplemented                                                    # repeat
[docs]    def __ror__(self, o:Any) -> Iterator[Any]:                                   # repeat
        limit = self.limit if self.limit != None else k1lib.settings.cli.inf; ser = self.capturedSerial # repeat
        def gen():                                                               # repeat
            for i in itertools.count():                                          # repeat
                if i >= limit: break                                             # repeat
                yield o                                                          # repeat
        if limit < float("inf"): return self._all_array_opt(o, 0) if isinstance(o, settings.arrayTypes) else (gen() | ser) # repeat
        return (gen() | ser) if len(self.capturedClis) == 0 else gen() | cli.batched(settings.repeat.infBs) | cli.apply(ser) | cli.joinStreams() # repeat 
    def _jsF(self, meta):                                                        # repeat
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # repeat
        if self.limit is None: raise Exception("repeat._jsF() can't repeat the input array infinitely many times, as there's no iterator concept in JS") # repeat
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.repeat({cli.kjs.v(self.limit)})", fIdx # repeat 
class _repeatF(BaseCli):                                                         # _repeatF
    def __init__(self, limit, kwargs): super().__init__(capture=True); self.limit = limit; self.kwargs = kwargs # _repeatF
    def __ror__(self, f):                                                        # _repeatF
        f = fastF(f); limit = self.limit; kwargs = self.kwargs; ser = self.capturedSerial # _repeatF
        limit = limit if limit != None else k1lib.settings.cli.inf               # _repeatF
        def gen():                                                               # _repeatF
            if len(kwargs) == 0:                                                 # _repeatF
                for i in itertools.count():                                      # _repeatF
                    if i >= limit: break                                         # _repeatF
                    yield f()                                                    # _repeatF
            else:                                                                # _repeatF
                for i in itertools.count():                                      # _repeatF
                    if i >= limit: break                                         # _repeatF
                    yield f(**kwargs)                                            # _repeatF
        if limit < float("inf"): return gen() | ser                              # _repeatF
        return (gen() | ser) if len(self.capturedClis) == 0 else gen() | cli.batched(settings.repeat.infBs) | cli.apply(ser) | cli.joinStreams() # _repeatF
[docs]def repeatF(f=None, limit:int=None, **kwargs):                                   # repeatF
    """Yields a specified amount generated by a specified function.
Example::
    repeatF(lambda: 4, 3) | deref()          # returns [4, 4, 4]
    (lambda: 4) | repeatF(limit=3) | deref() # returns [4, 4, 4]
    repeatF(lambda: 4) | head() | shape(0)   # returns 10
    f = lambda a: a+2
    repeatF(f, 3, a=6) | deref()        # returns [8, 8, 8]
    f | repeatF(limit=3, a=6) | deref() # returns [8, 8, 8]
See :class:`repeat` for a discussion on how to deal with infinite iterators.
If you want to do something like that, then you have to pipe in f instead of
specifying f in the params directly.
:param f: function to repeat. If None, then you can pipe the function in
:param limit: if None, then repeats indefinitely
:param kwargs: extra keyword arguments that you can pass into the function
See also: :class:`repeatFrom`"""                                                 # repeatF
    return _repeatF(limit, kwargs) if f is None else f | _repeatF(limit, kwargs) # repeatF 
[docs]class repeatFrom(BaseCli):                                                       # repeatFrom
[docs]    def __init__(self, limit:int=None):                                          # repeatFrom
        """Yields from a list. If runs out of elements, then do it again for
``limit`` times. Example::
    # returns [1, 2, 3, 1, 2]
    [1, 2, 3] | repeatFrom() | head(5) | deref()
    # returns [1, 2, 3, 1, 2, 3]
    [1, 2, 3] | repeatFrom(2) | deref()
See :class:`repeat` for a discussion on how to deal with infinite iterators
.. note::
    For advanced users who wants to modify the resulting stream mid-way, read this section
    Because this reuses elements inside the input iterator, it's necessary
    that the input feels like a list and not an iterator. So in order to make
    this work::
        # returns [1, 2, 3, 1, 2, 3]
        iter([1, 2, 3]) | repeatFrom(2) | deref()
    It's necessary to turn the input iterator into a list. However, sometimes you
    may want to update the input iterator values, so as to make things extra
    dynamic, like this::
        l = [1, 2, 3]
        def g(): yield from l; yield from l
        def h():
            for i, e in enumerate(g()):
                if i == 3: l.append(5) # modifies the list mid-way
                yield e
        h() | deref() # returns [1, 2, 3, 1, 2, 3, 5]
    But if you do this, it wouldn't work::
        l = [1, 2, 3]
        def h():
            for i, e in enumerate(iter(l) | repeatFrom(2)):
                if i == 3: l.append(5)
                yield e
        h() | deref() # returns [1, 2, 3, 1, 2, 3]
    This is because internally, :class:`repeatFrom` turns the iterator into a
    list, and continues yielding from that list, and thus won't use the updated
    values. To do it, you have to make the input feels like a list (can get length)::
        l = [1, 2, 3]
        def h():
            for i, e in enumerate(l | repeatFrom(2)): # ---------------- changed section
                if i == 3: l.append(5)
                yield e
        h() | deref() # returns [1, 2, 3, 1, 2, 3, 5]
:param limit: if None, then repeats indefinitely"""                              # repeatFrom
        super().__init__(capture=True); self.limit = limit                       # repeatFrom 
    def _typehint(self, inp):                                                    # repeatFrom
        i = tAny()                                                               # repeatFrom
        if isinstance(inp, tListIterSet): i = inp.child                          # repeatFrom
        if isinstance(inp, tArrayTypes): i = inp                                 # repeatFrom
        return tIter(i)                                                          # repeatFrom
    def _all_array_opt(self, it, level:int):                                     # repeatFrom
        if self.limit is None or self.limit == float("inf"): return NotImplemented # repeatFrom
        return it | (repeat(self.limit) | joinStreams()).all(level) | self.capturedSerial.all(level) # repeatFrom
[docs]    def __ror__(self, it:Iterator[Any]) -> Iterator[Any]:                        # repeatFrom
        limit = self.limit or k1lib.settings.cli.inf; ser = self.capturedSerial  # repeatFrom
        def gen(it):                                                             # repeatFrom
            try: len(it)                                                         # repeatFrom
            except: it = list(it)                                                # repeatFrom
            for i in itertools.count():                                          # repeatFrom
                if i >= limit: break                                             # repeatFrom
                yield from it                                                    # repeatFrom
        if limit < float("inf"): return self._all_array_opt(it, 0) if isinstance(it, settings.arrayTypes) else (gen(it) | ser) # repeatFrom
        return (gen(it) | ser) if len(self.capturedClis) == 0 else gen(it) | cli.batched(settings.repeat.infBs) | cli.apply(ser) | cli.joinStreams() # repeatFrom 
    def _jsF(self, meta):                                                        # repeatFrom
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # repeatFrom
        if self.limit is None: raise Exception(f"repeatFrom._jsF() can't repeat the input array infinitely many times, as there's no iterator concept in JS") # repeatFrom
        return f"const {fIdx} = ({dataIdx}) => {dataIdx}.repeatFrom({cli.kjs.v(self.limit)})", fIdx # repeatFrom 
def oneHotRow(i, n): ans = [0]*n; ans[i] = 1; return ans                         # oneHotRow
[docs]class oneHot(BaseCli):                                                           # oneHot
    _groups = {}                                                                 # oneHot
[docs]    def __init__(self, col, n:int=0, group:str=None, sep:bool=False):            # oneHot
        """One-hot encode some column in a table.
Example::
    a = [
     [1, 2, "A"],
     [3, 4, "B"],
     [5, 6, "C"]]
    b = [
     [7, 8, "A"],
     [9, 10, "B"],
     [11, 12, "B"]]
    [*a, *b] | oneHot(2) | deref()
    [*a, *b] | oneHot(2, 3, "abcd") | deref()
Last 2 statements both return this::
    [[1, 2, 1, 0, 0],
     [3, 4, 0, 1, 0],
     [5, 6, 0, 0, 1],
     [7, 8, 1, 0, 0],
     [9, 10, 0, 1, 0],
     [11, 12, 0, 1, 0]]
You can also separate the encoded column out like this::
    [*a, *b] | oneHot(2, sep=True) | deref()
Which returns this::
    [[1, 2, [1, 0, 0]],
     [3, 4, [0, 1, 0]],
     [5, 6, [0, 0, 1]],
     [7, 8, [1, 0, 0]],
     [9, 10, [0, 1, 0]],
     [11, 12, [0, 1, 0]]]
The natural way to do this is to use with without ``n`` and ``group`` parameters.
But sometimes, your one hot encoding is spreaded across multiple datasets in
multiple dataloaders, and so the order and length of the encoding might not be
the same, which will mess up your training process.
That's why, you can specify ``group``, which will share encoding information
across all :class:`oneHot` clis that have the same group name. If you choose to
do this then you have to also specify what's the size of the encoding, because
the cli can't really infer the size when it potentially has not seen all the data
right?
:param col: which column one hot encode and expand into
:param n: (optional) total number of different elements
:param group: (optional) group name
:param sep: (optional) whether to separate the variable out into its own list""" # oneHot
        self.col = col; self.n = n; self.group = group; self.sep = sep           # oneHot
        if (n != 0 and group is not None) and (n == 0 or group is None):         # oneHot
            raise Exception("You have to specify both `n` and `group` at the same time if you want to use them") # oneHot
        if group is not None:                                                    # oneHot
            if group not in oneHot._groups:                                      # oneHot
                oneHot._groups[group] = dict()                                   # oneHot
            self.d = oneHot._groups[group]                                       # oneHot
        else: self.d = dict()                                                    # oneHot 
    def _typehint(self, inp):                                                    # oneHot
         # TODO                                                                  # oneHot
        if isinstance(inp, tListIterSet):                                        # oneHot
            if isinstance(inp.child, tListIterSet):                              # oneHot
                pass                                                             # oneHot
            elif isinstance(inp.child, tListIterSet):                            # oneHot
                pass                                                             # oneHot
            pass                                                                 # oneHot
        return tIter(tAny())                                                     # oneHot
[docs]    def __ror__(self, it):                                                       # oneHot
        c = self.col; d = self.d; n = self.n; sep = self.sep                     # oneHot
        if n == 0:                                                               # oneHot
            it = it | cli.deref(2); n = it | cli.cut(c) | cli.aS(set) | cli.shape(0) # oneHot
        for row in it:                                                           # oneHot
            e = row[c]                                                           # oneHot
            try: e[0]; len(e)                                                    # oneHot
            except: e = list(e)                                                  # oneHot
            if e not in d: d[e] = oneHotRow(len(d), n)                           # oneHot
            if sep: yield [*row[:c], d[e], *row[c+1:]]                           # oneHot
            else:   yield [*row[:c], *d[e], *row[c+1:]]                          # oneHot
        return                                                                   # oneHot
        _d = it | cli.cut(c) | cli.aS(set) | cli.sort(None, False) | cli.deref(); n = len(_d) # oneHot
        _d = _d | insertIdColumn(begin=False) | cli.apply(cli.aS(oneHotRow, n), 1) | transpose() | cli.toDict() # oneHot
        for row in it: yield [*row[:c], *_d[row[c]], *row[c+1:]]                 # oneHot  
[docs]class latch(BaseCli):                                                            # latch
[docs]    def __init__(self, timeline, before=True, startTok=None, endTok=None):       # latch
        """Latches 1 timeline (B) onto another timeline (A).
Example::
    a = [[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"], [6, "f"], [7, "g"], [8, "h"], [9, "i"]]
    b = [[2.1, "2.1"], [3.1, "3.1"]]
    c = [[4.5, "4.5"], [7.3, "7.3"]]
    a[:5] | latch(b)        | deref() # returns [[1, 'a', None], [2, 'b', None], [3, 'c', '2.1'], [4, 'd', '3.1'], [5, 'e', '3.1']]
    a[:5] | latch(b, False) | deref() # returns [[1, 'a', '2.1'], [2, 'b', '2.1'], [3, 'c', '3.1'], [4, 'd', None], [5, 'e', None]]
    b     | latch(a)        | deref() # returns [[2.1, '2.1', 'b'], [3.1, '3.1', 'c']]
    b     | latch(a, False) | deref() # returns [[2.1, '2.1', 'c'], [3.1, '3.1', 'd']]
    # latching timelines that don't overlap
    b | latch(c)        | deref() # returns [[2.1, '2.1', None], [3.1, '3.1', None]]
    b | latch(c, False) | deref() # returns [[2.1, '2.1', '4.5'], [3.1, '3.1', '4.5']]
    c | latch(b)        | deref() # returns [[4.5, '4.5', '3.1'], [7.3, '7.3', '3.1']]
    c | latch(b, False) | deref() # returns [[4.5, '4.5', None], [7.3, '7.3', None]]
    # multiple latches also work fine
    a | latch(b) | latch(c) | deref()
Final command returns this::
    [[1, 'a', None, None],
     [2, 'b', None, None],
     [3, 'c', '2.1', None],
     [4, 'd', '3.1', None],
     [5, 'e', '3.1', '4.5'],
     [6, 'f', '3.1', '4.5'],
     [7, 'g', '3.1', '4.5'],
     [8, 'h', '3.1', '7.3'],
     [9, 'i', '3.1', '7.3']]
So essentially, there're 2 timelines, "a" and "b". "a" is usually more densely
packed than "b". The goal is to merge these 2 timelines together, trying to insert
b's data points at various points inside a. Try to figure out the pattern and
behavior from the example.
This cli makes several assumptions:
- All timelines inside the argument (if ``a | latch(b)``, then I'm talking about b) have only 2 columns: (time, data)
- Timelines piped into (if ``a | latch(b)``, then I'm talking about a) can have multiple columns. First column has to be time
- All timelines are sorted by time already, to avoid resorting
See also: :class:`~k1lib.cli.utils.syncStepper`
:param timeline: sparser timeline
:param before: whether to use the timeline B's point before or after timeline A's point
:param startTok: token to use for A's samples where B has not gone into the picture yet
:param endTok: token to use for A's samples where B has yielded all elements"""  # latch
        self.timeline = timeline; self.before = before; self.startTok = startTok; self.endTok = endTok # latch 
[docs]    def __ror__(self, it):                                                       # latch
        timeline = [[[-float("inf"), self.startTok]], self.timeline, [[float("inf"), self.endTok]]] | cli.joinSt() # latch
        before = self.before; lastT, lastT_desc = next(timeline); t, t_desc = next(timeline) # latch
        for row in it:                                                           # latch
            while row[0] > t:                                                    # latch
                lastT = t; lastT_desc = t_desc                                   # latch
                t, t_desc = next(timeline)                                       # latch
            yield [*row, lastT_desc if before else t_desc]                       # latch