# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for functions that sort of changes the table
structure in a dramatic way. They're the core transformations
"""
from typing import List, Union, Iterator, Callable, Any, Tuple, Dict
from collections import defaultdict, Counter, deque
from k1lib.cli.init import patchDefaultDelim, BaseCli, oneToMany, T, Table, fastF
import k1lib.cli as cli
import itertools, numpy as np, torch, k1lib
__all__ = ["transpose", "reshape", "joinList", "splitW",
"joinStreams", "yieldSentinel", "joinStreamsRandom", "activeSamples",
"table", "batched", "window", "groupBy", "collate",
"insertRow", "insertColumn", "insertIdColumn",
"toDict", "toDictF", "expandE", "unsqueeze",
"count", "permute", "accumulate", "AA_", "peek", "peekF",
"repeat", "repeatF", "repeatFrom"]
[docs]class transpose(BaseCli):
[docs] def __init__(self, dim1:int=0, dim2:int=1, fill=None):
"""Join multiple columns and loop through all rows. Aka transpose.
Example::
# returns [[1, 4], [2, 5], [3, 6]]
[[1, 2, 3], [4, 5, 6]] | transpose() | deref()
# returns [[1, 4], [2, 5], [3, 6], [0, 7]]
[[1, 2, 3], [4, 5, 6, 7]] | transpose(fill=0) | deref()
Multidimensional transpose works just like :meth:`torch.transpose` too::
# returns (2, 7, 5, 3), but detected Tensor, so it will use builtin :meth:`torch.transpose`
torch.randn(2, 3, 5, 7) | transpose(3, 1) | shape()
# also returns (2, 7, 5, 3), but actually does every required computation. Can be slow if shape is huge
torch.randn(2, 3, 5, 7) | deref(igT=False) | transpose(3, 1) | shape()
Can also work with numpy arrays (although has to be passed in like a function and can't be piped in)::
# returns (5, 3, 2)
transpose(0, 2)(np.random.randn(2, 3, 5)).shape
Be careful with infinite streams, as transposing stream of shape (inf, 5) will
hang this operation! Either don't do it, or temporarily limit all infinite streams like
this::
with settings.cli.context(inf=21):
# returns (3, 21)
[2, 1, 3] | repeat() | transpose() | shape()
Also be careful with empty streams, as you might not get any results at all::
# returns [], as the last stream has no elements
[[1, 2], [3, 4], []] | transpose() | deref()
# returns [[1, 3, 0], [2, 4, 0]]
[[1, 2], [3, 4], []] | transpose(fill=0) | deref()
:param fill: if not None, then will try to zip longest with this fill value"""
super().__init__(); self.fill = fill
self.d1 = min(dim1, dim2); self.d2 = max(dim1, dim2)
[docs] def __ror__(self, it:Iterator[Iterator[T]]) -> Table[T]:
d1 = self.d1; d2 = self.d2; fill = self.fill
if isinstance(it, torch.Tensor): return it.transpose(d1, d2)
if isinstance(it, np.ndarray):
dims = list(range(len(it.shape)))
temp = dims[d1]; dims[d1] = dims[d2]; dims[d2] = temp
return it.transpose(dims)
if d1 != 0: return it | cli.serial(*([transpose(fill=fill).all(i) for i in range(d1, d2)] + [transpose(fill=fill).all(i-1) for i in range(d2-1, d1, -1)]))
if self.fill is None: return zip(*it)
else: return itertools.zip_longest(*it, fillvalue=fill)
[docs] @staticmethod
def fill(fill="", dim1:int=0, dim2:int=1):
"""Convenience method to fill in missing elements of a table.
Example::
# returns [[1, 2, 3], [4, 5, 0]]
[[1, 2, 3], [4, 5]] | transpose.fill(0) | deref()
# also returns [[1, 2, 3], [4, 5, 0]], demonstrating how it works underneath
[[1, 2, 3], [4, 5]] | transpose(fill=0) | transpose(fill=0) | deref()"""
return transpose(dim1, dim2, fill=fill) | transpose(dim1, dim2, fill=fill)
[docs] @staticmethod
def wrap(f, dim1:int=0, dim2:int=1, fill=None):
"""Wraps ``f`` around 2 :class:`transpose`s, can be useful in combination with
:class:`k1lib.cli.init.mtmS`. Example::
# returns [[1, 4, 3, 4], [8, 81, 10, 11]]
[range(1, 5), range(8, 12)] | transpose.wrap(mtmS.f(apply(op()**2), 1)) | deref()
# also returns [[1, 4, 3, 4], [8, 81, 10, 11]], demonstrating the typical way to do this
[range(1, 5), range(8, 12)] | apply(op()**2, 1) | deref()
The example given is sort of to demonstrate this only. Most of the time, just use
:class:`~k1lib.cli.modifier.apply` with columns instead. But sometimes you need direct
access to a column, so this is how you can do it."""
if not isinstance(f, BaseCli): f = cli.applyS(f)
return transpose(dim1, dim2, fill) | f | transpose(dim1, dim2, fill)
def _formStructure(it, dims, dimI):
if dimI >= len(dims): return next(it)
return [_formStructure(it, dims, dimI+1) for i in range(dims[dimI])]
[docs]class reshape(BaseCli):
[docs] def __init__(self, *dims):
"""Reshapes the input stream into the desired shape.
Example::
# returns [[0, 1, 2], [3, 4, 5]]
range(6) | reshape(2, 3) | deref()
# returns [[0, 1], [2, 3], [4, 5]]
range(6) | reshape(3, 2) | deref()
# returns [[0, 1], [2, 3], [4, 5]], stopped early
range(100) | reshape(3, 2) | deref()
# returns [[0, 1, 2], [3, 4, 5]], can leave out first dimension
range(6) | reshape(-1, 3) | deref()
# returns [[0, 1, 2]], won't include 2nd element, as it ran out of elements
range(5) | reshape(-1, 3) | deref()
# throws error, as it ran out of elements and can't fulfill the request
range(6) | reshape(3, 3) | deref()
Unlike :meth:`torch.reshape`, the input piped into this has to be a simple iterator.
If you have a complex data structure with multiple dimensions, turn that into a simple
iterator with :class:`joinStreams` first, like this::
# returns [[[0, 1, 2]], [[3, 4, 5]]]
[[[0], [1]], [[2], [3]], [[4], [5]]] | joinStreams(2) | reshape(2, 1, 3) | deref()"""
self.dims = dims
[docs] def __ror__(self, it):
it = iter(it)
if self.dims[0] == -1:
try:
while True: yield _formStructure(it, self.dims, 1)
except StopIteration: pass
else:
for i in range(self.dims[0]): yield _formStructure(it, self.dims, 1)
[docs]class joinList(BaseCli):
[docs] def __init__(self, element=None, begin=True):
"""Join element into list.
Example::
# returns [5, 2, 6, 8]
[5, [2, 6, 8]] | joinList() | deref()
# also returns [5, 2, 6, 8]
[2, 6, 8] | joinList(5) | deref()
:param element: the element to insert. If None, then takes the input [e, [...]],
else takes the input [...] as usual"""
super().__init__(); self.element = element; self.begin = begin
[docs] def __ror__(self, it:Tuple[T, Iterator[T]]) -> Iterator[T]:
it = iter(it)
if self.element is None:
if self.begin: yield next(it); yield from next(it)
else: e = next(it); yield from next(it); yield e
else:
if self.begin: yield self.element; yield from it
else: yield from it; yield self.element
[docs]class splitW(BaseCli):
[docs] def __init__(self, *weights:List[float]):
"""Splits elements into multiple weighted lists. If no weights are provided,
then automatically defaults to [0.8, 0.2]. Example::
# returns [[0, 1, 2, 3, 4, 5, 6, 7], [8, 9]]
range(10) | splitW(0.8, 0.2) | deref()
# same as the above
range(10) | splitW() | deref()"""
super().__init__();
if len(weights) == 0: weights = [0.8, 0.2]
self.weights = np.array(weights)
[docs] def __ror__(self, it):
it = list(it); ws = self.weights; c = 0
ws = (ws * len(it) / ws.sum()).astype(int)
for w in ws[:-1]: yield it[c:c+w]; c += w
yield it[c:]
[docs]class joinStreams(BaseCli):
[docs] def __init__(self, dims=1):
"""Joins multiple streams.
Example::
# returns [1, 2, 3, 4, 5]
[[1, 2, 3], [4, 5]] | joinStreams() | deref()
# returns [[0, 1], [2], [3, 4, 5], [6, 7, 8], [], [9, 10]]
[[[0, 1], [2], [3, 4, 5]], [[6, 7, 8], [], [9, 10]]] | joinStreams() | deref()
# returns [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[[[0, 1], [2], [3, 4, 5]], [[6, 7, 8], [], [9, 10]]] | joinStreams(2) | deref()
Sometimes, you may want to impose some dimensional structure after joining all streams
together, which :class:`reshape` does."""
if dims < 1: raise AttributeError(f"`dims` ({dims}) can't be less than 1, as it doesn't make any sense!")
self.multi = cli.serial(*(joinStreams().all(dims-d-1) for d in range(dims))) if dims > 1 else None
[docs] def __ror__(self, streams:Iterator[Iterator[T]]) -> Iterator[T]:
if self.multi != None: yield from streams | self.multi
else:
for stream in streams: yield from stream
import random
def rand(n):
while True: yield random.randrange(n)
yieldSentinel = object()
[docs]class joinStreamsRandom(BaseCli):
[docs] def __init__(self):
"""Join multiple streams randomly. If any streams runs out, then quits. If
any stream yields :data:`yieldSentinel`, then just ignores that result and
continue. Could be useful in active learning. Example::
# could return [0, 1, 10, 2, 11, 12, 13, ...], with max length 20, typical length 18
[range(0, 10), range(10, 20)] | joinStreamsRandom() | deref()
stream2 = [[-5, yieldSentinel, -4, -3], yieldSentinel | repeat()] | joinStreams()
# could return [-5, -4, 0, -3, 1, 2, 3, 4, 5, 6], demonstrating yieldSentinel
[range(7), stream2] | joinStreamsRandom() | deref()"""
super().__init__()
[docs] def __ror__(self, streams:Iterator[Iterator[T]]) -> Iterator[T]:
streams = [iter(st) for st in streams]
try:
for streamIdx in rand(len(streams)):
o = next(streams[streamIdx])
if not o is yieldSentinel: yield o # "not is" to fix numpy `==`
except StopIteration: pass
[docs]class activeSamples(BaseCli):
[docs] def __init__(self, limit:int=100, p:float=0.95):
"""Yields active learning samples.
Example::
o = activeSamples()
ds = range(10) # normal dataset
ds = [o, ds] | joinStreamsRandom() # dataset with active learning capability
next(ds) # returns 0
next(ds) # returns 1
next(ds) # returns 2
o.append(20)
next(ds) # can return 3 or 20
next(ds) # can return (4 or 20) or 4
So the point of this is to be a generator of samples. You can define your dataset
as a mix of active learning samples and standard samples. Whenever there's a data
point that you want to focus on, you can add it to ``o`` and it will eventially yield
it.
.. warning::
It might not be a good idea to set param ``limit`` to higher numbers than
100. This is because, the network might still not understand a wrong sample
after being shown multiple times, and will keep adding that wrong sample
back in, distracting it from other samples, and reduce network's accuracy
after removing active learning from it.
If ``limit`` is low enough (from my testing, 30-100 should be fine), then
old wrong samples will be kicked out, allowing for a fresh stream of wrong
samples coming in, and preventing the problem above. If you found that
removing active learning makes the accuracy drops dramatically, then try
decreasing the limit.
:param limit: max number of active samples. Discards samples if number of samples
is over this.
:param p: probability of actually adding the samples in"""
super().__init__(); self.p = p
self.samples = deque([], limit)
[docs] def append(self, item):
"""Adds 1 sample."""
if random.random() < self.p: self.samples.append(item)
[docs] def extend(self, items):
"""Adds multiple samples."""
for item in items: self.append(item)
def __iter__(self):
samples = self.samples
while True:
if len(samples) == 0: yield yieldSentinel
else: yield samples.popleft()
[docs]def table(delim:str=None):
"""Basically ``op().split(delim).all()``. This exists because this is used
quite a lot in bioinformatics. Example::
# returns [['a', 'bd'], ['1', '2', '3']]
["a|bd", "1|2|3"] | table("|") | deref()"""
return cli.op().split(patchDefaultDelim(delim)).all()
def _batched(it, bs, includeLast):
l = []; it = iter(it)
try:
while True:
for i in range(bs): l.append(next(it))
yield l; l = []
except StopIteration:
if includeLast: yield l
[docs]class batched(BaseCli):
[docs] def __init__(self, bs=32, includeLast=False):
"""Batches the input stream.
Example::
# returns [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
range(11) | batched(3) | deref()
# returns [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
range(11) | batched(3, True) | deref()
# returns [[0, 1, 2, 3, 4]]
range(5) | batched(float("inf"), True) | deref()
# returns []
range(5) | batched(float("inf"), False) | deref()
Can work well and fast with :class:`torch.Tensor` and :class:`np.ndarray`::
# both returns torch.Tensor of shape (2, 3, 4, 5)
torch.randn(6, 4, 5) | batched(3)
torch.randn(7, 4, 5) | batched(3)
"""
super().__init__(); self.bs = bs; self.includeLast = includeLast
[docs] def __ror__(self, it):
bs = self.bs; includeLast = self.includeLast
if bs == float("inf"):
if includeLast: return [it]
return []
if not includeLast and isinstance(it, k1lib.settings.cli.arrayTypes):
n = it.shape[0] // bs; it = it[:n*bs]
return it.reshape(n, bs, *it.shape[1:])
return _batched(it, bs, includeLast)
[docs]class window(BaseCli):
[docs] def __init__(self, n, newList=False):
"""Slides window of size n forward and yields the windows.
Example::
# returns [[0, 1, 2], [1, 2, 3], [2, 3, 4]]
range(5) | window(3) | deref()
If you are doing strange transformations to the result, like
transposing it, then it might complain that the internal deque
(double-ended queue) mutated during iteration. In that case,
then set ``newList`` to True. It's not True by default because
multiple lists will be created, all of which needs memory
allocation, which will be slower::
# takes 15ms
range(100000) | window(100) | ignore()
# takes 48ms, because of allocating lists
range(100000) | window(100) | ignore()"""
self.n = n; self.listF = (lambda x: list(x)) if newList else (lambda x: iter(x))
[docs] def __ror__(self, it):
n = self.n; q = deque([], n); listF = self.listF
for e in it:
q.append(e)
if len(q) == n: yield listF(q); q.popleft()
[docs]class groupBy(BaseCli): # TODO: doesn't work if groupBy column is a 1D tensor?
[docs] def __init__(self, column:int, hashable=False):
"""Groups table by some column.
Example::
[[2.3, 5],
[3.4, 2],
[4.5, 2],
[5.6, 5],
[6.7, 1]] | groupBy(1) | deref()
This returns::
[[[2.3, 5],
[5.6, 5]],
[[3.4, 2],
[4.5, 2]],
[[6.7, 1]]]
By default, ``hashable`` param is False, which has O(n^2) time complexity,
but can handle everything. If you benchmark everything and found out that
this step is the bottlebeck, then you can set ``hashable`` to True, which
has O(n) time complexity. However, you have to be sure that the column's
elements are actually hashable, so that it can be converted into a set
internally. For example, an unhashable class is :class:`torch.Tensor`::
# returns False
torch.tensor(2) in set([torch.tensor(2), torch.tensor(3)])
# returns True
torch.tensor(2) == torch.tensor(2)
So, you have to convert the 0-d tensors to single ints/floats first, in
order to use the fast mode.
:param column: which column to group by
:param hashable: whether the selected column is hashable or not"""
self.column = column; self.hashable = hashable
[docs] def __ror__(self, it):
it = it | cli.deref(2); c = self.column
if self.hashable:
for v in it | cli.cut(c) | cli.toSet():
yield it | cli.filt(cli.op()[c] == v)
else:
vs = [] # why not just convert this to a set? Because we actually want to use the "==" operator, instead of hasing it, because there're some subtle difference
for v in it | cli.cut(c):
if v not in vs:
vs.append(v); yield it | cli.filt(cli.op()[c] == v)
[docs]def collate():
"""Puts individual columns into a tensor.
Example::
# returns [tensor([ 0, 10, 20]), tensor([ 1, 11, 21]), tensor([ 2, 12, 22])]
[range(0, 3), range(10, 13), range(20, 23)] | collate() | toList()"""
return transpose() | cli.apply(lambda row: torch.tensor(row))
[docs]def insertRow(*row:List[T]):
"""Inserts a row right before every other rows. See also: :meth:`joinList`."""
return joinList(row)
[docs]def insertColumn(*column, begin=True, fill=""):
"""Inserts a column at beginning or end.
Example::
# returns [['a', 1, 2], ['b', 3, 4]]
[[1, 2], [3, 4]] | insertColumn("a", "b") | deref()"""
return transpose(fill=fill) | joinList(column, begin) | transpose(fill=fill)
[docs]def insertIdColumn(table=False, begin=True, fill=""):
"""Inserts an id column at the beginning (or end).
Example::
# returns [[0, 'a', 2], [1, 'b', 4]]
[["a", 2], ["b", 4]] | insertIdColumn(True) | deref()
# returns [[0, 'a'], [1, 'b']]
"ab" | insertIdColumn()
:param table: if False, then insert column to an Iterator[str], else treat
input as a full fledged table"""
f = (cli.toRange() & transpose(fill=fill)) | joinList(begin=begin) | transpose(fill=fill)
if table: return f
else: return cli.wrapList() | transpose() | f
[docs]class toDict(BaseCli):
[docs] def __init__(self):
"""Converts 2 Iterators, 1 key, 1 value into a dictionary.
Example::
# returns {1: 3, 2: 4}
[[1, 2], [3, 4]] | toDict()"""
pass
[docs] def __ror__(self, it:Tuple[Iterator[T], Iterator[T]]) -> dict:
return {_k:_v for _k, _v in zip(*it)}
[docs]class toDictF(BaseCli):
[docs] def __init__(self, keyF:Callable[[Any], str]=None, valueF:Callable[[Any], Any]=None):
"""Transform an incoming stream into a dict using a function for
values. Example::
names = ["wanda", "vision", "loki", "mobius"]
names | toDictF(valueF=lambda s: len(s)) # will return {"wanda": 5, "vision": 6, ...}
names | toDictF(lambda s: s.title(), lambda s: len(s)) # will return {"Wanda": 5, "Vision": 6, ...}
"""
super().__init__(fs=[keyF, valueF]); self.keyF = keyF or (lambda s: s)
self.valueF = valueF or (lambda s: s)
[docs] def __ror__(self, keys:Iterator[Any]) -> Dict[Any, Any]:
keyF = self.keyF; valueF = self.valueF
return {keyF(key):valueF(key) for key in keys}
[docs]class expandE(BaseCli):
[docs] def __init__(self, f:Callable[[T], List[T]], column:int):
"""Expands table element to multiple columns.
Example::
# returns [['abc', 3, -2], ['de', 2, -5]]
[["abc", -2], ["de", -5]] | expandE(lambda e: (e, len(e)), 0) | deref()
:param f: Function that transforms 1 row element to multiple elements"""
super().__init__(fs=[f]); self.f = f; self.column = column
[docs] def __ror__(self, it):
f = self.f; c = self.column
def gen(row):
for i, e in enumerate(row):
if i == c: yield from f(e)
else: yield e
return (gen(row) for row in it)
[docs]def unsqueeze(dim:int=0):
"""Unsqueeze input iterator.
Example::
t = [[1, 2], [3, 4], [5, 6]]
# returns (3, 2)
t | shape()
# returns (1, 3, 2)
t | unsqueeze(0) | shape()
# returns (3, 1, 2)
t | unsqueeze(1) | shape()
# returns (3, 2, 1)
t | unsqueeze(2) | shape()
Behind the scenes, it's really just ``wrapList().all(dim)``, but the "unsqueeze" name
is a lot more familiar. Also note that the inverse operation "squeeze" is sort of
``item().all(dim)``, if you're sure that this is desirable::
t = [[1, 2], [3, 4], [5, 6]]
# returns (3, 2)
t | unsqueeze(1) | item().all(1) | shape()"""
return cli.wrapList().all(dim)
[docs]class count(BaseCli):
[docs] def __init__(self):
"""Finds unique elements and returns a table with [frequency, value, percent]
columns. Example::
# returns [[1, 'a', '33%'], [2, 'b', '67%']]
['a', 'b', 'b'] | count() | deref()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[str]):
it = it | cli.apply(lambda row: (tuple(row) if isinstance(row, list) else row))
c = Counter(it); s = sum(c.values())
for k, v in c.items(): yield [v, k, f"{round(100*v/s)}%"]
[docs] @staticmethod
def join():
"""Joins multiple counts together.
Example::
# returns [[2, 'a', '33%'], [4, 'b', '67%']]
['a', 'b', 'b'] | repeat(2) | applyMp(count() | deref()) | count.join() | deref()
This is useful when you want to get the count of a really long list/iterator using multiple cores"""
def inner(counts):
values = defaultdict(lambda: 0)
for _count in counts:
for v, k, *_ in _count:
values[k] += v
s = values.values() | cli.toSum()
for k, v in values.items(): yield [v, k, f"{round(100*v/s)}%"]
return cli.applyS(inner)
def _permuteGen(row, pers):
row = list(row); return (row[i] for i in pers)
[docs]class permute(BaseCli):
[docs] def __init__(self, *permutations:List[int]):
"""Permutes the columns. Acts kinda like :meth:`torch.Tensor.permute`.
Example::
# returns [['b', 'a'], ['d', 'c']]
["ab", "cd"] | permute(1, 0) | deref()"""
super().__init__(); self.permutations = permutations
[docs] def __ror__(self, it:Iterator[str]):
p = self.permutations
for row in it: yield _permuteGen(row, p)
[docs]class accumulate(BaseCli):
[docs] def __init__(self, columnIdx:int=0, avg=False):
"""Groups lines that have the same row[columnIdx], and
add together all other columns, assuming they're numbers. Example::
# returns [['a', 10.5, 9.5, 14.5], ['b', 1.1, 2.2, 3.3]]
[["a", 1.1, 2.2, 3.4],
["a", 1.1, 2.2, 7.8],
["a", 8.3, 5.1, 3.3],
["b", 1.1, 2.2, 3.3]] | accumulate(0) | deref()
:param columnIdx: common column index to accumulate
:param avg: calculate average values instead of sum"""
super().__init__(); self.columnIdx = columnIdx; self.avg = avg
self.dict = defaultdict(lambda: defaultdict(lambda: 0))
self.keyAppearances = defaultdict(lambda: 0)
[docs] def __ror__(self, it:Iterator[str]):
for row in it:
row = list(row); key = row[self.columnIdx]
row.pop(self.columnIdx); self.keyAppearances[key] += 1
for i, e in enumerate(row):
try: self.dict[key][i] += float(e)
except: self.dict[key][i] = e
for key, cols in self.dict.items():
ncol = len(cols)
if self.avg:
for i, col in enumerate(cols):
if isinstance(col, (int, float)): cols[i] /= self.keyAppearances[key]
elems = list(cols.values()); elems.insert(self.columnIdx, key); yield elems
[docs]class AA_(BaseCli):
[docs] def __init__(self, *idxs:List[int], wraps=False):
"""Returns 2 streams, one that has the selected element, and the other
the rest. Example::
# returns [5, [1, 6, 3, 7]]
[1, 5, 6, 3, 7] | AA_(1)
# returns [[5, [1, 6, 3, 7]]]
[1, 5, 6, 3, 7] | AA_(1, wraps=True)
You can also put multiple indexes through::
# returns [[1, [5, 6]], [6, [1, 5]]]
[1, 5, 6] | AA_(0, 2)
If you don't specify anything, then all indexes will be sliced::
# returns [[1, [5, 6]], [5, [1, 6]], [6, [1, 5]]]
[1, 5, 6] | AA_()
As for why the strange name, think of this operation as "AĀ". In statistics,
say you have a set "A", then "not A" is commonly written as A with an overline
"Ā". So "AA\_" represents "AĀ", and that it first returns the selection A.
:param wraps: if True, then the first example will return [[5, [1, 6, 3, 7]]]
instead, so that A has the same signature as Ā"""
super().__init__(); self.idxs = idxs; self.wraps = wraps
[docs] def __ror__(self, it:List[Any]) -> List[List[List[Any]]]:
super().__ror__(it); idxs = self.idxs; it = list(it)
if len(idxs) == 0: idxs = range(len(it))
def gen(idx):
return [it[idx], [v for i, v in enumerate(it) if i != idx]]
if not self.wraps and len(idxs) == 1: return gen(idxs[0])
return [gen(idx) for idx in idxs]
[docs]class peek(BaseCli):
[docs] def __init__(self):
"""Returns (firstRow, iterator). This sort of peaks at the first row,
to potentially gain some insights about the internal formats. The returned
iterator is not tampered. Example::
e, it = iter([[1, 2, 3], [1, 2]]) | peek()
print(e) # prints "[1, 2, 3]"
s = 0
for e in it: s += len(e)
print(s) # prints "5", or length of 2 lists
You kinda have to be careful about handling the ``firstRow``, because you might
inadvertently alter the iterator::
e, it = iter([iter(range(3)), range(4), range(2)]) | peek()
e = list(e) # e is [0, 1, 2]
list(next(it)) # supposed to be the same as `e`, but is [] instead
The example happens because you have already consumed all elements of the first
row, and thus there aren't any left when you try to call ``next(it)``."""
super().__init__()
[docs] def __ror__(self, it:Iterator[T]) -> Tuple[T, Iterator[T]]:
it = iter(it); sentinel = object(); row = next(it, sentinel)
if row == sentinel: return None, []
def gen(): yield row; yield from it
return row, gen()
[docs]class peekF(BaseCli):
[docs] def __init__(self, f:Union[BaseCli, Callable[[T], T]]):
r"""Similar to :class:`peek`, but will execute ``f(row)`` and
return the input Iterator, which is not tampered. Example::
it = lambda: iter([[1, 2, 3], [1, 2]])
# prints "[1, 2, 3]" and returns [[1, 2, 3], [1, 2]]
it() | peekF(lambda x: print(x)) | deref()
# prints "1\n2\n3"
it() | peekF(headOut()) | deref()"""
super().__init__(fs=[f]); self.f = f
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
it = iter(it); sentinel = object(); row = next(it, sentinel)
if row == sentinel: return []
def gen(): yield row; yield from it
self.f(row); return gen()
[docs]class repeat(BaseCli):
[docs] def __init__(self, limit:int=None):
"""Yields a specified amount of the passed in object. If you intend
to pass in an iterator, then make a list out of it first, as second copy of
iterator probably won't work as you will have used it the first time. Example::
# returns [[1, 2, 3], [1, 2, 3], [1, 2, 3]]
[1, 2, 3] | repeat(3) | toList()
:param repeat: if None, then repeats indefinitely"""
super().__init__(); self.limit = limit
[docs] def __ror__(self, o:T) -> Iterator[T]:
limit = self.limit if self.limit != None else k1lib.settings.cli.inf
for i in itertools.count():
if i >= limit: break
yield o
[docs]def repeatF(f, limit:int=None, **kwargs):
"""Yields a specified amount generated by a specified function.
Example::
# returns [4, 4, 4]
repeatF(lambda: 4, 3) | toList()
# returns 10
repeatF(lambda: 4) | head() | shape(0)
f = lambda a: a+2
# returns [8, 8, 8]
repeatF(f, 3, a=6) | toList()
:param limit: if None, then repeats indefinitely
:param kwargs: extra keyword arguments that you can pass into the function
See also: :class:`repeatFrom`"""
f = fastF(f); limit = limit if limit != None else k1lib.settings.cli.inf
if len(kwargs) == 0:
for i in itertools.count():
if i >= limit: break
yield f()
else:
for i in itertools.count():
if i >= limit: break
yield f(**kwargs)
[docs]class repeatFrom(BaseCli):
[docs] def __init__(self, limit:int=None):
"""Yields from a list. If runs out of elements, then do it again for
``limit`` times. Example::
# returns [1, 2, 3, 1, 2]
[1, 2, 3] | repeatFrom() | head(5) | deref()
# returns [1, 2, 3, 1, 2, 3]
[1, 2, 3] | repeatFrom(2) | deref()
:param limit: if None, then repeats indefinitely"""
super().__init__(); self.limit = limit
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
it = list(it); limit = self.limit or k1lib.settings.cli.inf
for i in itertools.count():
if i >= limit: break
yield from it