# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for functions that sort of changes the table
structure in a dramatic way. They're the core transformations
"""
from typing import List, Union, Iterator, Callable, Any, Tuple, Dict
from collections import defaultdict, Counter, deque
from k1lib.cli.init import patchDefaultDelim, BaseCli, oneToMany, T, Table, fastF, yieldT
import k1lib.cli as cli; from k1lib.cli.typehint import *
import itertools, numpy as np, k1lib, math; import matplotlib.pyplot as plt
try: import torch; hasTorch = True
except: torch = k1lib.Object().withAutoDeclare(lambda: type("RandomClass", (object, ), {})); hasTorch = True
__all__ = ["transpose", "reshape", "insert", "splitW", "splitC",
"joinStreams", "flatten", "joinStreamsRandom", "activeSamples",
"table", "batched", "window", "groupBy", "ungroup",
"insertColumn", "insertIdColumn",
"expandE", "unsqueeze",
"count", "hist", "permute", "accumulate", "AA_", "peek", "peekF",
"repeat", "repeatF", "repeatFrom", "oneHot", "indexTable"]
settings = k1lib.settings.cli
[docs]class transpose(BaseCli):
[docs] def __init__(self, dim1:int=0, dim2:int=1, fill=None):
"""Join multiple columns and loop through all rows. Aka transpose.
Example::
# returns [[1, 4], [2, 5], [3, 6]]
[[1, 2, 3], [4, 5, 6]] | transpose() | deref()
# returns [[1, 4], [2, 5], [3, 6], [0, 7]]
[[1, 2, 3], [4, 5, 6, 7]] | transpose(fill=0) | deref()
Multidimensional transpose works just like :meth:`torch.transpose` too::
# returns (2, 7, 5, 3), but detected Tensor, so it will use builtin :meth:`torch.transpose`
torch.randn(2, 3, 5, 7) | transpose(3, 1) | shape()
# also returns (2, 7, 5, 3), but actually does every required computation. Can be slow if shape is huge
torch.randn(2, 3, 5, 7) | deref(igT=False) | transpose(3, 1) | shape()
Can also work with numpy arrays::
# returns (5, 3, 2)
np.random.randn(2, 3, 5) | transpose(0, 2) | op().shape
Be careful with infinite streams, as transposing stream of shape (inf, 5) will
hang this operation! Either don't do it, or temporarily limit all infinite streams like
this::
with settings.cli.context(inf=21):
# returns (3, 21)
[2, 1, 3] | repeat() | transpose() | shape()
Also be careful with empty streams, as you might not get any results at all::
# returns [], as the last stream has no elements
[[1, 2], [3, 4], []] | transpose() | deref()
# returns [[1, 3, 0], [2, 4, 0]]
[[1, 2], [3, 4], []] | transpose(fill=0) | deref()
:param fill: if not None, then will try to zip longest with this fill value"""
super().__init__(); self.fill = fill
self.d1 = min(dim1, dim2); self.d2 = max(dim1, dim2)
self.normal = self.d1 == 0 and self.d2 == 1
def _typehint(self, inp):
if isinstance(inp, tArrayTypes):
if inp.rank is None: return inp
if inp.rank > max(self.d1, self.d2): return inp
else: return tAny() # this case doesn't quite exist
if self.d1 == 0 and self.d2 == 1:
if isinstance(inp, tListIterSet):
if isinstance(inp.child, tListIterSet):
return tIter(tList(inp.child.child))
return tAny()
[docs] def __ror__(self, it:Iterator[Iterator[T]]) -> Table[T]:
d1 = self.d1; d2 = self.d2; fill = self.fill
if isinstance(it, torch.Tensor): return it.transpose(d1, d2)
if isinstance(it, np.ndarray):
dims = list(range(len(it.shape)))
temp = dims[d1]; dims[d1] = dims[d2]; dims[d2] = temp
return it.transpose(dims)
if d1 != 0 or d2 != 1: return it | cli.serial(*([transpose(fill=fill).all(i) for i in range(d1, d2)] + [transpose(fill=fill).all(i-1) for i in range(d2-1, d1, -1)]))
if self.fill is None: return zip(*it)
else: return itertools.zip_longest(*it, fillvalue=fill)
[docs] @staticmethod
def fill(fill="", dim1:int=0, dim2:int=1):
"""Convenience method to fill in missing elements of a table.
Example::
# returns [[1, 2, 3], [4, 5, 0]]
[[1, 2, 3], [4, 5]] | transpose.fill(0) | deref()
# also returns [[1, 2, 3], [4, 5, 0]], demonstrating how it works underneath
[[1, 2, 3], [4, 5]] | transpose(fill=0) | transpose(fill=0) | deref()"""
return transpose(dim1, dim2, fill=fill) | transpose(dim1, dim2, fill=fill)
[docs] @staticmethod
def wrap(f, dim1:int=0, dim2:int=1, fill=None):
"""Wraps ``f`` around 2 :class:`transpose`, can be useful in combination with
:class:`k1lib.cli.init.mtmS`. Example::
# returns [[1, 4, 3, 4], [8, 81, 10, 11]]
[range(1, 5), range(8, 12)] | transpose.wrap(mtmS.f(apply(op()**2), 1)) | deref()
# also returns [[1, 4, 3, 4], [8, 81, 10, 11]], demonstrating the typical way to do this
[range(1, 5), range(8, 12)] | apply(op()**2, 1) | deref()
The example given is sort of to demonstrate this only. Most of the time, just use
:class:`~k1lib.cli.modifier.apply` with columns instead. But sometimes you need direct
access to a column, so this is how you can do it."""
if not isinstance(f, BaseCli): f = cli.applyS(f)
return transpose(dim1, dim2, fill) | f | transpose(dim1, dim2, fill)
#tOpt.clearPasses()
def oTranspose1(cs, ts, metadata): # `transpose() | transpose().all()` to `transpose() | apply(aS(torch.stack) | transpose())`
tr, ap = cs; t = ts[0]
if (not ap.normal) or (not tr.normal): return None
if (not isinstance(ap.f, transpose)) or (not ap.f.normal): return None
if isinstance(t, tListIterSet) and (isinstance(t.child, tTensor) or isinstance(t.child, tCollection) and isinstance(tLowest(*t.child.children), tTensor)):
return [transpose(), cli.apply(cli.aS(torch.stack) | transpose())]
tOpt.addPass(oTranspose1, [transpose, cli.apply])
def _formStructure(it, dims, dimI):
if dimI >= len(dims): return next(it)
return [_formStructure(it, dims, dimI+1) for i in range(dims[dimI])]
[docs]class reshape(BaseCli):
[docs] def __init__(self, *dims):
"""Reshapes the input stream into the desired shape.
Example::
# returns [[0, 1, 2], [3, 4, 5]]
range(6) | reshape(2, 3) | deref()
# returns [[0, 1], [2, 3], [4, 5]]
range(6) | reshape(3, 2) | deref()
# returns [[0, 1], [2, 3], [4, 5]], stopped early
range(100) | reshape(3, 2) | deref()
# returns [[0, 1, 2], [3, 4, 5]], can leave out first dimension
range(6) | reshape(-1, 3) | deref()
# returns [[0, 1, 2]], won't include 2nd element, as it ran out of elements
range(5) | reshape(-1, 3) | deref()
# throws error, as it ran out of elements and can't fulfill the request
range(6) | reshape(3, 3) | deref()
Unlike :meth:`torch.reshape`, the input piped into this has to be a simple iterator.
If you have a complex data structure with multiple dimensions, turn that into a simple
iterator with :class:`joinStreams` first, like this::
# returns [[[0, 1, 2]], [[3, 4, 5]]]
[[[0], [1]], [[2], [3]], [[4], [5]]] | joinStreams(2) | reshape(2, 1, 3) | deref()"""
self.dims = dims
[docs] def __ror__(self, it):
it = iter(it)
if self.dims[0] == -1:
try:
while True: yield _formStructure(it, self.dims, 1)
except StopIteration: pass
else:
for i in range(self.dims[0]): yield _formStructure(it, self.dims, 1)
[docs]class insert(BaseCli):
[docs] def __init__(self, element, begin=True):
"""Join element into list.
Example::
# returns [5, 2, 6, 8]
[2, 6, 8] | insert(5) | deref()
# returns [2, 6, 8, 5]
[2, 6, 8] | insert(5, begin=False) | deref()
# returns [[3, 1], 2, 6, 8]
[2, 6, 8] | insert([3, 1]) | deref()
:param element: the element to insert"""
super().__init__(); self.element = element; self.begin = begin; self.expand = False
[docs] def __ror__(self, it:Tuple[T, Iterator[T]]) -> Iterator[T]:
element = self.element; it = iter(it)
if self.begin: yield element; yield from it
else: yield from it; yield element
[docs]class splitW(BaseCli):
[docs] def __init__(self, *weights:List[float]):
"""Splits elements into multiple weighted lists. If no weights are provided,
then automatically defaults to [0.8, 0.2]. Example::
# returns [[0, 1, 2, 3, 4, 5, 6, 7], [8, 9]]
range(10) | splitW(0.8, 0.2) | deref()
# same as the above
range(10) | splitW() | deref()
This also works with array types::
torch.randn(100, 3) | splitW() # returns 2 tensors with shapes (80, 3) and (20, 3)
See also: :class:`splitC`"""
super().__init__();
if len(weights) == 0: weights = [0.8, 0.2]
self.weights = np.array(weights)
[docs] def __ror__(self, it):
try: it[0]; len(it)
except: it = list(it)
ws = self.weights; c = 0; ws = (ws * len(it) / ws.sum()).astype(int)
for w in ws[:-1]: yield it[c:c+w]; c += w
yield it[c:]
[docs]class splitC(BaseCli):
[docs] def __init__(self, *checkpoints:List[float]):
"""Splits elements into multiple checkpoint-delimited lists.
Example::
# returns [[0, 1], [2, 3, 4], [5, 6, 7, 8, 9]]
range(10) | splitC(2, 5) | deref()
# returns ['01', '234', '56789']
"0123456789" | splitC(2, 5) | deref()
Here, you're specifying 2 checkpoints, 2 and 5, so it will split
the list up into 3 sections. First section is 0-2, second section
is 2-5, third section is 5-end. You can pass in fractional
checkpoints too::
# returns [[0, 1], [2, 3, 4, 5], [6, 7, 8, 9]]
range(10) | splitC(0.2, 0.6) | deref()
This cli might be unintuitive to remember, so if you want to just
split it up into 2 parts, check out :meth:`~k1lib.cli.filt.head.split`.
If you want to split things up by weighted length, check
out :class:`splitW`"""
self.checkpoints = checkpoints | cli.aS(np.array)
self.intMode = checkpoints | cli.apply(lambda x: int(x) == x) | cli.aS(all)
[docs] def __ror__(self, it):
try: it[0]; len(it)
except: it = list(it)
cs = self.checkpoints
if not self.intMode: cs = (cs * len(it)).astype(int)
cs = sorted(cs); yield it[:cs[0]]
for i in range(len(cs)-1): yield it[cs[i]:cs[i+1]]
yield it[cs[-1]:]
[docs]class joinStreams(BaseCli):
[docs] def __init__(self, dims=1):
"""Joins multiple streams.
Example::
# returns [1, 2, 3, 4, 5]
[[1, 2, 3], [4, 5]] | joinStreams() | deref()
# returns [[0, 1], [2], [3, 4, 5], [6, 7, 8], [], [9, 10]]
[[[0, 1], [2], [3, 4, 5]], [[6, 7, 8], [], [9, 10]]] | joinStreams() | deref()
# returns [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[[[0, 1], [2], [3, 4, 5]], [[6, 7, 8], [], [9, 10]]] | joinStreams(2) | deref()
If you pass in :class:`numpy.ndarray` or :class:`torch.Tensor`, then it will
automatically use the C-accelerated version, like this::
# returns Tensor with shape (6, 4)
torch.randn(2, 3, 4) | joinStreams()
# returns array with shape (6, 4)
np.random.randn(2, 3, 4) | joinStreams()
Sometimes, you may want to impose some dimensional structure after joining all streams
together, which :class:`reshape` does.
If "joinStreams" is too unintuitive to remember, there's also an alias called
:class:`flatten`.
:param dims: how many ``joinStreams()`` do you want to do consecutively?"""
if dims < 1: raise AttributeError(f"`dims` ({dims}) can't be less than 1, as it doesn't make any sense!")
self.multi = cli.serial(*(joinStreams() for d in range(dims))) if dims > 1 else None
[docs] def __ror__(self, streams:Iterator[Iterator[T]]) -> Iterator[T]:
if self.multi != None:
return streams | self.multi
def gen(): yield from streams | self.multi
return gen()
else:
if isinstance(streams, settings.arrayTypes): return streams.reshape(-1, *streams.shape[2:])
else:
def gen():
for stream in streams: yield from stream
return gen()
flatten = joinStreams
def probScale(ps, t): # t from 0 -> 1, for typical usage
l = np.log(ps); avg = l.mean()
a = (l-avg)*t+avg; a -= a.max()
ans = np.exp(a); return ans/ans.sum()
import random
def rand(n, ps=None):
if ps is None:
while True: yield random.randrange(n)
else:
while True: yield from np.random.choice(n, size=100, p=ps)
[docs]class joinStreamsRandom(BaseCli):
[docs] def __init__(self, alpha=0, ps=None):
"""Join multiple streams randomly. If any streams runs out, then quits. If
any stream yields :data:`~k1lib.cli.init.yieldT`, then just ignores that result and
continue. Could be useful in active learning. Example::
# could return [0, 1, 10, 2, 11, 12, 13, ...], with max length 20, typical length 18
[range(0, 10), range(10, 20)] | joinStreamsRandom() | deref()
stream2 = [[-5, yieldT, -4, -3], yieldT | repeat()] | joinStreams()
# could return [-5, -4, 0, -3, 1, 2, 3, 4, 5, 6], demonstrating yieldT
[range(7), stream2] | joinStreamsRandom() | deref()
By default, all streams are treated equally, and are yielded with equal probabilities.
However, you can tweak these probabilities a little bit, to your liking. This is
controlled by the parameter ``alpha``:
.. image:: ../images/probScale.png
If ``alpha`` is 0, then all probabilities will be the same. If ``alpha`` is 1,
then all probabilities are proportional to the length of the input stream. The
original intention was to vary ``alpha`` just from 0 to 1, but it can actually
be of any number::
[range(0, 10), range(10, 100)] | joinStreamsRandom(0) | shape(0) # returns around 21, because it favors both streams equally
[range(0, 10), range(10, 100)] | joinStreamsRandom(1) | shape(0) # returns around 90, because it favors the second array 9x more
[range(0, 10), range(10, 100)] | joinStreamsRandom(100) | shape(0) # returns 90, because it highly favors the second array
[range(0, 10), range(10, 100)] | joinStreamsRandom(-100) | shape(0) # returns 10, because it highly favors the first array
:param alpha: if not zero, does a weighted joining, instead of totally uniform probability
:param ps: if specified, use these probabilities, else try to determine from the lengths of the input streams"""
super().__init__(); self.alpha = alpha; self.ps = ps
[docs] def __ror__(self, streams:Iterator[Iterator[T]]) -> Iterator[T]:
alpha = self.alpha; ps = self.ps; streams = list(streams); nStreams = len(streams)
if alpha != 0:
if ps is None:
try: ps = np.array([len(st) for st in streams])
except:
streams = [list(st) for st in streams]
ps = np.array([len(st) for st in streams])
else: ps = np.array(list(ps))*1.0
else: ps = np.array([1/nStreams]*nStreams)
ps = probScale(ps/ps.sum(), alpha)
streams = [iter(st) for st in streams]
try:
for streamIdx in rand(len(streams), ps):
o = next(streams[streamIdx])
if not o is yieldT: yield o # "not is" to fix numpy `==`
except StopIteration: pass
[docs]class activeSamples(BaseCli):
[docs] def __init__(self, limit:int=100, p:float=0.95):
"""Yields active learning samples.
Example::
o = activeSamples()
ds = range(10) # normal dataset
ds = [o, ds] | joinStreamsRandom() # dataset with active learning capability
next(ds) # returns 0
next(ds) # returns 1
next(ds) # returns 2
o.append(20)
next(ds) # can return 3 or 20
next(ds) # can return (4 or 20) or 4
So the point of this is to be a generator of samples. You can define your dataset
as a mix of active learning samples and standard samples. Whenever there's a data
point that you want to focus on, you can add it to ``o`` and it will eventially yield
it.
.. warning::
It might not be a good idea to set param ``limit`` to higher numbers than
100. This is because, the network might still not understand a wrong sample
after being shown multiple times, and will keep adding that wrong sample
back in, distracting it from other samples, and reduce network's accuracy
after removing active learning from it.
If ``limit`` is low enough (from my testing, 30-100 should be fine), then
old wrong samples will be kicked out, allowing for a fresh stream of wrong
samples coming in, and preventing the problem above. If you found that
removing active learning makes the accuracy drops dramatically, then try
decreasing the limit.
:param limit: max number of active samples. Discards samples if number of samples
is over this.
:param p: probability of actually adding the samples in"""
super().__init__(); self.p = p
self.samples = deque([], limit)
[docs] def append(self, item):
"""Adds 1 sample."""
if random.random() < self.p: self.samples.append(item)
[docs] def extend(self, items):
"""Adds multiple samples."""
for item in items: self.append(item)
def __iter__(self):
samples = self.samples
while True:
if len(samples) == 0: yield yieldT
else: yield samples.popleft()
[docs]def table(delim:str=None):
"""Basically ``op().split(delim).all()``. This exists because this is used
quite a lot in bioinformatics. Example::
# returns [['a', 'bd'], ['1', '2', '3']]
["a|bd", "1|2|3"] | table("|") | deref()"""
return cli.op().split(patchDefaultDelim(delim)).all()
def _batch(it, bs, includeLast):
l = []; it = iter(it)
try:
while True:
for i in range(bs): l.append(next(it))
yield l; l = []
except StopIteration:
if includeLast and len(l) > 0: yield l
def _batchRange(it, bs, includeLast):
start, stop, step = it.start, it.stop, it.step
lastCur = start; cur = lastCur + bs*step
while cur <= stop:
yield range(lastCur, cur, step)
lastCur = cur; cur += bs*step
if includeLast and lastCur < stop:
yield range(lastCur, stop, step)
def _batchSliceable(it, bs, includeLast):
cur = 0; n = len(it)
while (cur+1)*bs <= n:
yield it[cur*bs:(cur+1)*bs]
cur += 1
if includeLast:
ans = it[cur*bs:(cur+1)*bs]
if len(ans) > 0: yield ans
[docs]class batched(BaseCli):
[docs] def __init__(self, bs=32, includeLast=False):
"""Batches the input stream.
Example::
# returns [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
range(11) | batched(3) | deref()
# returns [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
range(11) | batched(3, True) | deref()
# returns [[0, 1, 2, 3, 4]]
range(5) | batched(float("inf"), True) | deref()
# returns []
range(5) | batched(float("inf"), False) | deref()
Can work well and fast with :class:`torch.Tensor` and :class:`numpy.ndarray`::
# both returns torch.Tensor of shape (2, 3, 4, 5)
torch.randn(6, 4, 5) | batched(3)
torch.randn(7, 4, 5) | batched(3)
Also, if input is a :class:`range`, then to save time, a bunch of other
ranges will be returned, instead of a bunch of lists, for performance::
# returns [range(0, 3), range(3, 6), range(6, 9)]
range(11) | batched(3) | toList()
"""
super().__init__(); self.bs = bs; self.includeLast = includeLast
[docs] def __ror__(self, it):
bs = self.bs; includeLast = self.includeLast
if bs == float("inf"):
if includeLast: return [it]
return []
if not includeLast and isinstance(it, k1lib.settings.cli.arrayTypes):
n = it.shape[0] // bs; it = it[:n*bs]
return it.reshape(n, bs, *it.shape[1:])
if isinstance(it, range): return _batchRange(it, bs, includeLast)
try: it[0]; len(it); return _batchSliceable(it, bs, includeLast)
except: return _batch(it, bs, includeLast)
nothing = object()
[docs]class window(BaseCli):
[docs] def __init__(self, n, newList=False, pad=nothing):
"""Slides window of size n forward and yields the windows.
Example::
# returns [[0, 1, 2], [1, 2, 3], [2, 3, 4]]
range(5) | window(3) | deref()
# returns [[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, None], [4, None, None]]
range(5) | window(3, pad=None) | deref()
If you are doing strange transformations to the result, like
transposing it, then it might complain that the internal deque
(double-ended queue) mutated during iteration. In that case,
then set ``newList`` to True. It's not True by default because
multiple lists will be created, all of which needs memory
allocation, which will be slower::
# takes 15ms
range(100000) | window(100) | ignore()
# takes 48ms, because of allocating lists
range(100000) | window(100) | ignore()
:param n: size of the window
:param newList: whether to create a new list out of every window or
not. If False (default), less robust but faster. If True, more
robust but slower
:param pad: whether to pad the output stream on the end, so that it
has the same number of elements as the input stream or not"""
self.n = n; self.listF = (lambda x: list(x)) if newList else (lambda x: iter(x))
self.pad = pad; self.padBool = pad is not nothing # why do this? Cause in applyMp, "nothing" takes on multiple identities
[docs] def __ror__(self, it):
n = self.n; pad = self.pad; q = deque([], n); listF = self.listF
for e in it:
q.append(e)
if len(q) == n: yield listF(q); q.popleft()
if self.padBool:
for i in range(n-1):
q.append(pad)
yield listF(q); q.popleft()
[docs]class groupBy(BaseCli):
[docs] def __init__(self, column:int, separate:bool=False, removeCol:bool=None):
"""Groups table by some column.
Example::
a = [[2.3, 5],
[3.4, 2],
[4.5, 2],
[5.6, 5],
[6.7, 1]]
a | groupBy(1) | deref()
This returns::
[[[6.7, 1]],
[[3.4, 2], [4.5, 2]],
[[2.3, 5], [5.6, 5]]]
Should have O(n log(n)) time complexity. What if ``separate`` is True::
a | groupBy(1, True) | deref()
This returns::
[[1, [[6.7]]],
[2, [[3.4], [4.5]]],
[5, [[2.3], [5.6]]]]
What if ``removeCol`` is False::
a | groupBy(1, True, False) | deref()
This returns::
[[1, [[6.7, 1]]],
[2, [[3.4, 2], [4.5, 2]]],
[5, [[2.3, 5], [5.6, 5]]]]
There's another perspective and way to think about this operation. A lot of
libraries (like pandas) expect the uncompressed, "flat" version (the variable
``a`` in the examples above). But throughout my time using cli, the grouped
version (separate=True) is usually much more useful and amenable to
transformations. It also occupies less memory too, as the columns with duplicated
elements are deleted.
So, you can sort of think :class:`groupBy` is converting pandas dataframes
into a more easily digestible form. But because the prevalence of those libraries,
after doing all the transformations you want, sometimes it's necessary to flatten
it again, which :class:`ungroup` does.
If you want to group text lines by some pattern im them, :class:`~k1lib.cli.grep.grep`
might be better for you.
:param column: which column to group by
:param separate: whether to separate out the column to sort of form a dict or not. See example
:param removeCol: whether to remove the grouped-by column. Defaults to True if
``separate=True``, and False if ``separate=False``"""
self.column = column; self.separate = separate
if removeCol is None: removeCol = separate # if separate, then remove cols, else don't do it
self.removeCol = removeCol
[docs] def __ror__(self, it):
it = it | cli.deref(2); c = self.column; separate = self.separate; removeCol = self.removeCol
it = it | cli.sort(c, False); sentinel = object()
it = iter(it); a = [next(it, sentinel)]
if a[0] is sentinel: return
v = a[0][c]
try:
while True:
e = next(it)
if e[c] == v: a.append(e)
else:
if removeCol: a = a | ~cli.cut(c)
if separate: yield [v, a]
else: yield a
a = [e]; v = a[0][c]
except StopIteration:
if len(a) > 0:
if removeCol: a = a | ~cli.cut(c)
if separate: yield [v, a]
else: yield a
[docs]class ungroup(BaseCli):
[docs] def __init__(self, single=False, begin=False, insertCol:bool=True):
"""Ungroups things that were grouped using a specific mode of
:class:`groupBy`. Particularly useful to transform some complex data
structure into a flat dataframe so that you can plug into pandas. Example::
a = [[2.3, 5], [3.4, 2], [4.5, 2], [5.6, 5], [6.7, 1]]
# returns [[6.7, 1], [3.4, 2], [4.5, 2], [2.3, 5], [5.6, 5]]
a | groupBy(1, True) | ungroup() | deref()
# returns [[6.7], [3.4], [4.5], [2.3], [5.6]]
a | groupBy(1, True) | ungroup(False) | deref()
Just as a reminder, this is the output of :class:`groupBy` after executing ``a | groupBy(1, True)``::
[[1, [[6.7]]],
[2, [[3.4], [4.5]]],
[5, [[2.3], [5.6]]]]
A lot of times, your data is a little bit different, like this perhaps::
[[1, [6.7]],
[2, [3.4, 4.5]],
[5, [2.3, 5.6]]]
A way to fix this would be to add ``apply(wrapList().all(), 1)`` before :class:`ungroup`.
But because this is so common, I've added in the parameter ``single`` for that. Just set
it to True::
# returns [[6.7, 1], [3.4, 2], [4.5, 2], [2.3, 5], [5.6, 5]]
[[1, [6.7]],
[2, [3.4, 4.5]],
[5, [2.3, 5.6]]] | ungroup(single=True)
:param single: whether the table in each group has a single column or not
:param begin: whether to insert the column at the beginning or at the end.
Only works if ``insertCol`` is True
:param insertCol: whether to insert the column into the table or not"""
self.insertCol = insertCol; self.single = single; self.begin = begin
[docs] def __ror__(self, it):
preprocess = cli.apply(cli.wrapList().all(), 1) if self.single else cli.iden(); begin = self.begin
if self.insertCol: return it | preprocess | ~cli.apply(lambda x, arr: arr | cli.insert(x, begin).all()) | cli.joinStreams()
else: return it | preprocess | cli.cut(1) | cli.joinStreams()
[docs]class insertColumn(BaseCli):
[docs] def __init__(self, column:List[T], begin=True, fill=""):
"""Inserts a column at beginning or end.
Example::
# returns [['a', 1, 2], ['b', 3, 4]]
[[1, 2], [3, 4]] | insertColumn(["a", "b"]) | deref()
# returns [[1, 2, 'a'], [3, 4, 'b']]
[[1, 2], [3, 4]] | insertColumn(["a", "b"], begin=False) | deref()"""
self.column = column; self.begin = begin; self.fill = fill
[docs] def __ror__(self, it):
return it | transpose(fill=self.fill) | insert(self.column, begin=self.begin) | transpose(fill=self.fill)
[docs]def insertIdColumn(table=False, begin=True):
"""Inserts an id column at the beginning (or end).
Example::
# returns [[0, 'a', 2], [1, 'b', 4]]
[["a", 2], ["b", 4]] | insertIdColumn(True) | deref()
# returns [[0, 'a'], [1, 'b']]
"ab" | insertIdColumn()
:param table: if False, then insert column to an Iterator[str], else treat
input as a full fledged table"""
def a(it):
if table:
if begin: return ([i, *e] for i, e in enumerate(it))
else: return ([*e, i] for i, e in enumerate(it))
else:
if begin: return ([i, e] for i, e in enumerate(it))
else: return ([e, i] for i, e in enumerate(it))
return cli.aS(a)
[docs]class expandE(BaseCli):
[docs] def __init__(self, f:Callable[[T], List[T]], column:int):
"""Expands table element to multiple columns.
Example::
# returns [['abc', 3, -2], ['de', 2, -5]]
[["abc", -2], ["de", -5]] | expandE(lambda e: (e, len(e)), 0) | deref()
:param f: Function that transforms 1 row element to multiple elements"""
super().__init__(fs=[f]); self.f = f; self.column = column
[docs] def __ror__(self, it):
f = self.f; c = self.column
def gen(row):
for i, e in enumerate(row):
if i == c: yield from f(e)
else: yield e
return (gen(row) for row in it)
[docs]def unsqueeze(dim:int=0):
"""Unsqueeze input iterator.
Example::
t = [[1, 2], [3, 4], [5, 6]]
# returns (3, 2)
t | shape()
# returns (1, 3, 2)
t | unsqueeze(0) | shape()
# returns (3, 1, 2)
t | unsqueeze(1) | shape()
# returns (3, 2, 1)
t | unsqueeze(2) | shape()
Behind the scenes, it's really just ``wrapList().all(dim)``, but the "unsqueeze" name
is a lot more familiar. Also note that the inverse operation "squeeze" is sort of
``item().all(dim)``, if you're sure that this is desirable::
t = [[1, 2], [3, 4], [5, 6]]
# returns (3, 2)
t | unsqueeze(1) | item().all(1) | shape()"""
return cli.wrapList().all(dim)
def oUnsqueeze(cs, ts, metadata): # reminder: if change this then also change the example in llvm.rst
a = cs[0]; t = ts[0]; i = 0;
if not isinstance(t, tArrayTypes): return None
while isinstance(a, cli.apply) and a.normal: i += 1; a = a.f
if not isinstance(a, cli.wrapList): return None
t = t.__class__(t.child, t.rank+1 if t.rank is not None else None)
if isinstance(t, tNpArray): return [cli.aS(lambda x: np.expand_dims(x, i)).hint(t)]
else: return [cli.aS(lambda x: x.unsqueeze(i)).hint(t)]
tOpt.addPass(oUnsqueeze, [cli.apply], 4)
[docs]class count(BaseCli):
[docs] def __init__(self):
"""Finds unique elements and returns a table with [frequency, value, percent]
columns. Example::
# returns [[1, 'a', '33%'], [2, 'b', '67%']]
['a', 'b', 'b'] | count() | deref()"""
super().__init__()
def _typehint(self, inp):
i = tAny()
if isinstance(inp, tListIterSet): i = inp.child
return tIter(tCollection(int, i, str))
[docs] def __ror__(self, it:Iterator[str]):
it = it | cli.apply(lambda row: (tuple(row) if isinstance(row, list) else row))
c = Counter(it); s = sum(c.values())
return [[v, k, f"{round(100*v/s)}%"] for k, v in c.items()] # has to scan through the entire thing anyway, which is long enough already, so just turn it into a list
[docs] @staticmethod
def join():
"""Joins multiple counts together.
Example::
# returns [[2, 'a', '33%'], [4, 'b', '67%']]
['a', 'b', 'b'] | repeat(2) | applyMp(count() | deref()) | count.join() | deref()
This is useful when you want to get the count of a really long list/iterator using multiple cores"""
def inner(counts):
values = defaultdict(lambda: 0)
for _count in counts:
if _count is None: continue
for v, k, *_ in _count:
values[k] += v
s = values.values() | cli.toSum()
return [[v, k, f"{round(100*v/s)}%"] for k, v in values.items()]
return cli.applyS(inner)
[docs]class hist(BaseCli):
[docs] def __init__(self, bins:int=30):
"""Bins a long 1d array. Effectively creating a historgram, without
actually plotting it. Example::
np.random.randn(1000) | hist(5)
That returns something like::
(array([-2.31449761, -1.17406889, -0.03364017, 1.10678854, 2.24721726]),
array([ 41, 207, 432, 265, 55]),
1.1404287156493986)
This format goes with :meth:`~matplotlib.pyplot.bar` directly like this::
np.random.randn(1000) | hist(10) | ~aS(plt.bar)
If you have tons of data that's handled in multiple processes, but you want to
get an overall histogram, you can do something like this::
# bad solution, runs slow, accurate
fileNames | applyMp(cat() | toFloat() | aS(list)) | joinStreams() | hist() | ~aS(plt.bar)
# good solution, runs fast, slightly inaccurate
fileNames | applyMp(cat() | toFloat() | hist(300)) | hist.join() | ~aS(plt.bar)
Let's say in each process, you have 10M records, and that you have 1000 processes in total.
In the first solution, you transfer all records (10B records total) to a single process,
then calculate the histogram of them. The transfer overhead is going to be absolutely
enourmous, as well as the computation. This really defeats the purpose of doing
multiprocessing.
In the second solution, you "convert" 10M records into 600 numbers for each process,
which scales up to 600k numbers for all processes. Although big, but certainly
manageable with current hardware. So the data transfer cost is not a lot at all. The
histogram merging part also executes relatively fast, as it only creates an internal
array of 3M length. See over :meth:`hist.join` for param details
:params bins: how many bins should the histogram has?"""
self.bins = bins
[docs] def __ror__(self, it):
if not isinstance(it, settings.arrayTypes): it = list(it)
y, x = np.histogram(it, bins=self.bins)
delta = x[1] - x[0]; x = (x[1:] + x[:-1])/2
return x, y, delta
[docs] @staticmethod
def join(scale:float=1e4, bins:int=None, log:bool=True, xlog:bool=False):
"""Joins multiple histograms together.
Example::
a = np.random.randn(1000); b = np.random.randn(1000)+3
[a, b] | joinStreams() | hist() | head(2) | ~aS(plt.plot) # ---------------------------------- Ground truth
[a, b] | hist(300).all() | hist.join(scale=1e4) | head(2) | ~aS(plt.plot) # ------------------ Log joining
[a, b] | hist(300).all() | hist.join(scale=1e4, log=False) | head(2) | ~aS(plt.plot, ".") # -- Linear joining
plt.legend(["Ground truth", "Log", "Linear"]); plt.grid(True); plt.ylabel("Frequency"); plt.xlabel("Value");
This results in this:
.. image:: ../images/hist1.png
As you can see, this process is only approximate, but is accurate enough in everyday
use. If you are a normal user, then this is probably enough. However, if you're a
mathematician and really care about the accuracy of this, read on.
.. admonition:: Performance vs accuracy
As mentioned in :class:`hist`, joining histograms from across processes can really speed
things up big time. But the joining process is complicated, with multiple parameters
and different tradeoffs for each config. In this example, scale = 1e4, bins = 30,
OG bins = 300, log = True. "OG bins" is the number of bins coming into :meth:`hist.join`
To get the best accuracy possible, you should set scale and OG bins high. If better
performance is desired, you should first lower scale, then lower OG bins, then finally lower
bins.
.. admonition:: Log scale
Take a look at this piece of code::
a, b = np.random.randn(1000)*1, np.random.randn(3000000)*0.3+3
[a, b] | joinStreams() | hist() | head(2) | ~aS(plt.plot)
[a, b] | hist(300).all() | hist.join(scale=1e4) | head(2) | ~aS(plt.plot)
[a, b] | hist(300).all() | hist.join(scale=1e4, log=False) | head(2) | ~aS(plt.plot, ".")
plt.yscale("log"); plt.legend(["Ground truth", "Log", "Linear"]); plt.grid(True); plt.ylabel("Frequency"); plt.xlabel("Value");
This results in:
.. image:: ../images/hist2.png
This shows how log mode is generally better than linear mode when the frequencies span
across multiple orders of magnitude. So why not delete linear mode directly? Well, I have
not formally proved that log scale case fully covers the linear case, although in practice
it seems so. So just to be cautious, let's leave it in
.. admonition:: Scale
The setup is just like in the "Log scale" section, but with scale = 1e3 instead of the default 1e4:
.. image:: ../images/hist3.png
Remember that the higher the scale, the more accurate, but also the longer it runs. If
the difference in high and low frequencies are bigger than scale, then the low
frequency values are dropped.
:param scale: how big a range of frequencies do we want to capture?
:param bins: output bins. If not specified automatically defaults to 1/10 the
original number of bins
:param log: whether to transform everything to log scale internally on the y axis
:param xlog: whether to transform everything to log scale internally on the x axis"""
def inner(it):
it = it | (cli.apply(cli.apply(math.log), 0) if xlog else cli.iden()) | cli.deref()
_bins = bins if bins is not None else it | cli.cut(0) | cli.shape(0).all() | cli.toMean() | cli.op()/10 | cli.aS(int)
maxY = max(it | cli.cut(1) | cli.toMax().all() | cli.toMax() | cli.op()/scale, 1e-9)
if log: it = it | cli.apply(lambda x: np.log(x+1e-9)-math.log(maxY) | cli.aS(np.exp) | cli.aS(np.round) | cli.op().astype(int), 1) | cli.deref()
else: it = it | cli.apply(lambda y: (y/maxY).astype(int), 1) | cli.deref()
x, y, delta = it | cli.cut(0, 1) | cli.transpose().all() | cli.joinStreams() | ~cli.apply(lambda a,b: [a]*b) | cli.joinStreams() | cli.aS(list) | cli.aS(np.array) | hist(_bins)
return x | ((cli.apply(math.exp) | cli.deref()) if xlog else cli.iden()), y*maxY, delta
return cli.aS(inner)
def _permuteGen(row, pers):
row = list(row); return (row[i] for i in pers)
[docs]class permute(BaseCli):
[docs] def __init__(self, *permutations:List[int]):
"""Permutes the columns. Acts kinda like :meth:`torch.Tensor.permute`.
Example::
# returns [['b', 'a'], ['d', 'c']]
["ab", "cd"] | permute(1, 0) | deref()"""
super().__init__(); self.permutations = permutations
[docs] def __ror__(self, it:Iterator[str]):
p = self.permutations
for row in it: yield _permuteGen(row, p)
[docs]class accumulate(BaseCli):
[docs] def __init__(self, columnIdx:int=0, avg=False):
"""Groups lines that have the same row[columnIdx], and
add together all other columns, assuming they're numbers. Example::
# returns [['a', 10.5, 9.5, 14.5], ['b', 1.1, 2.2, 3.3]]
[["a", 1.1, 2.2, 3.4],
["a", 1.1, 2.2, 7.8],
["a", 8.3, 5.1, 3.3],
["b", 1.1, 2.2, 3.3]] | accumulate(0) | deref()
:param columnIdx: common column index to accumulate
:param avg: calculate average values instead of sum"""
super().__init__(); self.columnIdx = columnIdx; self.avg = avg
self.dict = defaultdict(lambda: defaultdict(lambda: 0))
self.keyAppearances = defaultdict(lambda: 0)
[docs] def __ror__(self, it:Iterator[str]):
for row in it:
row = list(row); key = row[self.columnIdx]
row.pop(self.columnIdx); self.keyAppearances[key] += 1
for i, e in enumerate(row):
try: self.dict[key][i] += float(e)
except: self.dict[key][i] = e
for key, cols in self.dict.items():
ncol = len(cols)
if self.avg:
for i, col in enumerate(cols):
if isinstance(col, (int, float)): cols[i] /= self.keyAppearances[key]
elems = list(cols.values()); elems.insert(self.columnIdx, key); yield elems
[docs]class AA_(BaseCli):
[docs] def __init__(self, *idxs:List[int], wraps=False):
"""Returns 2 streams, one that has the selected element, and the other
the rest. Example::
# returns [5, [1, 6, 3, 7]]
[1, 5, 6, 3, 7] | AA_(1)
# returns [[5, [1, 6, 3, 7]]]
[1, 5, 6, 3, 7] | AA_(1, wraps=True)
You can also put multiple indexes through::
# returns [[1, [5, 6]], [6, [1, 5]]]
[1, 5, 6] | AA_(0, 2)
If you don't specify anything, then all indexes will be sliced::
# returns [[1, [5, 6]], [5, [1, 6]], [6, [1, 5]]]
[1, 5, 6] | AA_()
As for why the strange name, think of this operation as "AĀ". In statistics,
say you have a set "A", then "not A" is commonly written as A with an overline
"Ā". So "AA\_" represents "AĀ", and that it first returns the selection A.
:param wraps: if True, then the first example will return [[5, [1, 6, 3, 7]]]
instead, so that A has the same signature as Ā"""
super().__init__(); self.idxs = idxs; self.wraps = wraps
[docs] def __ror__(self, it:List[Any]) -> List[List[List[Any]]]:
super().__ror__(it); idxs = self.idxs; it = list(it)
if len(idxs) == 0: idxs = range(len(it))
def gen(idx):
return [it[idx], [v for i, v in enumerate(it) if i != idx]]
if not self.wraps and len(idxs) == 1: return gen(idxs[0])
return [gen(idx) for idx in idxs]
[docs]class peek(BaseCli):
[docs] def __init__(self):
"""Returns (firstRow, iterator). This sort of peaks at the first row,
to potentially gain some insights about the internal formats. The returned
iterator is not tampered. Example::
e, it = iter([[1, 2, 3], [1, 2]]) | peek()
print(e) # prints "[1, 2, 3]"
s = 0
for e in it: s += len(e)
print(s) # prints "5", or length of 2 lists
You kinda have to be careful about handling the ``firstRow``, because you might
inadvertently alter the iterator::
e, it = iter([iter(range(3)), range(4), range(2)]) | peek()
e = list(e) # e is [0, 1, 2]
list(next(it)) # supposed to be the same as `e`, but is [] instead
The example happens because you have already consumed all elements of the first
row, and thus there aren't any left when you try to call ``next(it)``."""
super().__init__()
[docs] def __ror__(self, it:Iterator[T]) -> Tuple[T, Iterator[T]]:
it = iter(it); sentinel = object(); row = next(it, sentinel)
if row == sentinel: return None, []
def gen(): yield row; yield from it
return row, gen()
[docs]class peekF(BaseCli):
[docs] def __init__(self, f:Union[BaseCli, Callable[[T], T]]):
r"""Similar to :class:`peek`, but will execute ``f(row)`` and
return the input Iterator, which is not tampered. Example::
it = lambda: iter([[1, 2, 3], [1, 2]])
# prints "[1, 2, 3]" and returns [[1, 2, 3], [1, 2]]
it() | peekF(lambda x: print(x)) | deref()
# prints "1\n2\n3"
it() | peekF(headOut()) | deref()"""
super().__init__(fs=[f]); self.f = f
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
it = iter(it); sentinel = object(); row = next(it, sentinel)
if row == sentinel: return []
def gen(): yield row; yield from it
self.f(row); return gen()
[docs]class repeat(BaseCli):
[docs] def __init__(self, limit:int=None):
"""Yields a specified amount of the passed in object. If you intend
to pass in an iterator, then make a list out of it first, as second copy of
iterator probably won't work as you will have used it the first time. Example::
# returns [[1, 2, 3], [1, 2, 3], [1, 2, 3]]
[1, 2, 3] | repeat(3) | toList()
:param repeat: if None, then repeats indefinitely"""
super().__init__(); self.limit = limit
def _typehint(self, inp): return tIter(inp)
[docs] def __ror__(self, o:T) -> Iterator[T]:
limit = self.limit if self.limit != None else k1lib.settings.cli.inf
for i in itertools.count():
if i >= limit: break
yield o
[docs]def repeatF(f, limit:int=None, **kwargs):
"""Yields a specified amount generated by a specified function.
Example::
# returns [4, 4, 4]
repeatF(lambda: 4, 3) | toList()
# returns 10
repeatF(lambda: 4) | head() | shape(0)
f = lambda a: a+2
# returns [8, 8, 8]
repeatF(f, 3, a=6) | toList()
:param limit: if None, then repeats indefinitely
:param kwargs: extra keyword arguments that you can pass into the function
See also: :class:`repeatFrom`"""
f = fastF(f); limit = limit if limit != None else k1lib.settings.cli.inf
if len(kwargs) == 0:
for i in itertools.count():
if i >= limit: break
yield f()
else:
for i in itertools.count():
if i >= limit: break
yield f(**kwargs)
[docs]class repeatFrom(BaseCli):
[docs] def __init__(self, limit:int=None):
"""Yields from a list. If runs out of elements, then do it again for
``limit`` times. Example::
# returns [1, 2, 3, 1, 2]
[1, 2, 3] | repeatFrom() | head(5) | deref()
# returns [1, 2, 3, 1, 2, 3]
[1, 2, 3] | repeatFrom(2) | deref()
.. note::
For advanced users who wants to modify the resulting stream mid-way, read this section
Because this reuses elements inside the input iterator, it's necessary
that the input feels like a list and not an iterator. So in order to make
this work::
# returns [1, 2, 3, 1, 2, 3]
iter([1, 2, 3]) | repeatFrom(2) | deref()
It's necessary to turn the input iterator into a list. However, sometimes you
may want to update the input iterator values, so as to make things extra
dynamic, like this::
l = [1, 2, 3]
def g(): yield from l; yield from l
def h():
for i, e in enumerate(g()):
if i == 3: l.append(5) # modifies the list mid-way
yield e
h() | deref() # returns [1, 2, 3, 1, 2, 3, 5]
But if you do this, it wouldn't work::
l = [1, 2, 3]
def h():
for i, e in enumerate(iter(l) | repeatFrom(2)):
if i == 3: l.append(5)
yield e
h() | deref() # returns [1, 2, 3, 1, 2, 3]
This is because internally, :class:`repeatFrom` turns the iterator into a
list, and continues yielding from that list, and thus won't use the updated
values. To do it, you have to make the input feels like a list (can get length)::
l = [1, 2, 3]
def h():
for i, e in enumerate(l | repeatFrom(2)):
if i == 3: l.append(5)
yield e
h() | deref() # returns [1, 2, 3, 1, 2, 3, 5]
:param limit: if None, then repeats indefinitely"""
super().__init__(); self.limit = limit
def _typehint(self, inp):
i = tAny()
if isinstance(inp, tListIterSet): i = inp.child
if isinstance(inp, tArrayTypes): i = inp
return tIter(i)
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
try: len(it)
except: it = list(it)
limit = self.limit or k1lib.settings.cli.inf
for i in itertools.count():
if i >= limit: break
yield from it
def oneHotRow(i, n): ans = [0]*n; ans[i] = 1; return ans
[docs]class oneHot(BaseCli):
_groups = {}
[docs] def __init__(self, col, n:int=0, group:str=None, sep:bool=False):
"""One-hot encode some column in a table.
Example::
a = [
[1, 2, "A"],
[3, 4, "B"],
[5, 6, "C"]]
b = [
[7, 8, "A"],
[9, 10, "B"],
[11, 12, "B"]]
[*a, *b] | oneHot(2) | deref()
[*a, *b] | oneHot(2, 3, "abcd") | deref()
Last 2 statements both return this::
[[1, 2, 1, 0, 0],
[3, 4, 0, 1, 0],
[5, 6, 0, 0, 1],
[7, 8, 1, 0, 0],
[9, 10, 0, 1, 0],
[11, 12, 0, 1, 0]]
You can also separate the encoded column out like this::
[*a, *b] | oneHot(2, sep=True) | deref()
Which returns this::
[[1, 2, [1, 0, 0]],
[3, 4, [0, 1, 0]],
[5, 6, [0, 0, 1]],
[7, 8, [1, 0, 0]],
[9, 10, [0, 1, 0]],
[11, 12, [0, 1, 0]]]
The natural way to do this is to use with without ``n`` and ``group`` parameters.
But sometimes, your one hot encoding is spreaded across multiple datasets in
multiple dataloaders, and so the order and length of the encoding might not be
the same, which will mess up your training process.
That's why, you can specify ``group``, which will share encoding information
across all :class:`oneHot` clis that have the same group name. If you choose to
do this then you have to also specify what's the size of the encoding, because
the cli can't really infer the size when it potentially has not seen all the data
right?
:param col: which column one hot encode and expand into
:param n: (optional) total number of different elements
:param group: (optional) group name
:param sep: (optional) whether to separate the variable out into its own list"""
self.col = col; self.n = n; self.group = group; self.sep = sep
if (n != 0 and group is not None) and (n == 0 or group is None):
raise Exception("You have to specify both `n` and `group` at the same time if you want to use them")
if group is not None:
if group not in oneHot._groups:
oneHot._groups[group] = dict()
self.d = oneHot._groups[group]
else: self.d = dict()
def _typehint(self, inp):
# TODO
if isinstance(inp, tListIterSet):
if isinstance(inp.child, tListIterSet):
pass
elif isinstance(inp.child, tListIterSet):
pass
pass
return tIter(tAny())
[docs] def __ror__(self, it):
c = self.col; d = self.d; n = self.n; sep = self.sep
if n == 0:
it = it | cli.deref(2); n = it | cli.cut(c) | cli.aS(set) | cli.shape(0)
for row in it:
e = row[c]
try: e[0]; len(e)
except: e = list(e)
if e not in d: d[e] = oneHotRow(len(d), n)
if sep: yield [*row[:c], d[e], *row[c+1:]]
else: yield [*row[:c], *d[e], *row[c+1:]]
return
_d = it | cli.cut(c) | cli.aS(set) | cli.sort(None, False) | cli.deref(); n = len(_d)
_d = _d | insertIdColumn(begin=False) | cli.apply(cli.aS(oneHotRow, n), 1) | transpose() | cli.toDict()
for row in it: yield [*row[:c], *_d[row[c]], *row[c+1:]]
[docs]class indexTable(BaseCli):
[docs] def __init__(self, *cols):
"""Indexes a table by some columns.
Example::
a = [
[0, 3, 0.1],
[0, 4, 0.2],
[1, 3, 0.3],
[1, 4, 0.4],
]
# returns {3: [[0, 3, 0.1], [1, 3, 0.3]], 4: [[0, 4, 0.2], [1, 4, 0.4]]}
a | indexTable(1)
# returns {0: [[0, 3, 0.1], [0, 4, 0.2]], 1: [[1, 3, 0.3], [1, 4, 0.4]]}
a | indexTable(0)
# returns {3: {0: [[0, 3, 0.1]], 1: [[1, 3, 0.3]]}, 4: {0: [[0, 4, 0.2]], 1: [[1, 4, 0.4]]}}
a | indexTable(1, 0)"""
self.cols = cols
[docs] def __ror__(self, it):
if len(self.cols) == 0: return it
col = self.cols[0]
return it | groupBy(col) | cli.apply(lambda group: [group[0][col], group | indexTable(*self.cols[1:])]) | cli.toDict()