# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for functions that cuts out specific parts of the table
"""
from typing import Callable, Union, List, overload, Iterator, Any, Set, Tuple
from k1lib.cli.init import BaseCli, Table, T, fastF
import k1lib.cli as cli; import k1lib, os, math
from k1lib.cli.typehint import *
import numpy as np; from collections import deque
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["filt", "filter_", "inSet", "contains", "empty",
"isNumeric", "instanceOf",
"head", "tail", "cut", "rows",
"intersection", "union", "unique", "breakIf", "mask", "tryout"]
settings = k1lib.settings.cli
[docs]class filt(BaseCli):
[docs] def __init__(self, predicate:Callable[[T], bool], column:int=None, catchErrors:bool=False):
"""Filters out elements.
Examples::
# returns [2, 6], grabbing all the even elements
[2, 3, 5, 6] | filt(lambda x: x%2 == 0) | deref()
# returns [3, 5], grabbing all the odd elements
[2, 3, 5, 6] | ~filt(lambda x: x%2 == 0) | deref()
# returns [[2, 'a'], [6, 'c']], grabbing all the even elements in the 1st column
[[2, "a"], [3, "b"], [5, "a"], [6, "c"]] | filt(lambda x: x%2 == 0, 0) | deref()
# throws error, because strings can't mod divide
[1, 2, "b", 8] | filt(lambda x: x % 2 == 0) | deref()
# returns [2, 8]
[1, 2, "b", 8] | filt(lambda x: x % 2 == 0, catchErrors=True) | deref()
You can also pass in :class:`~k1lib.cli.modifier.op`, for extra intuitiveness::
# returns [2, 6]
[2, 3, 5, 6] | filt(op() % 2 == 0) | deref()
# returns ['abc', 'a12']
["abc", "def", "a12"] | filt(op().startswith("a")) | deref()
# returns [3, 4, 5, 6, 7, 8, 9]
range(100) | filt(3 <= op() < 10) | deref()
If you pass in :class:`numpy.ndarray` or :class:`torch.Tensor`, then it will
automatically use the C-accelerated versions if possible, like this::
# returns np.array([2, 3, 4]), instead of iter([2, 3, 4])
np.array([1, 2, 3, 4]) | filt(lambda x: x>=2) | deref()
# returns [2, 3, 4], instead of np.array([2, 3, 4]), because `math.exp` can't operate on numpy arrays
np.array([1, 2, 3, 4]) | filt(lambda x: math.exp(x) >= 3) | deref()
If you need more extensive filtering capabilities involving text, check out :class:`~k1lib.cli.grep.grep`
If "filt" is too hard to remember, this cli also has an alias :class:`filter_`
that kinda mimics Python's ``filter()``.
:param predicate: function that returns True or False
:param column: if not specified, then filters elements of the input
array, else filters the specific column only
:param catchErrors: whether to catch errors in the function or not (reject
elements that raise errors). Runs slower if enabled though"""
fs = [predicate]; super().__init__(fs)
if column and column < 0: raise Exception(f"Filtering using a function on a negative-indexed column ({column}) is not supported")
f = fs[0]; _fP = fastF(f); self.column = column
if catchErrors:
def g(x):
try: return _fP(x)
except: return False
self.predicate = g
else: self.predicate = _fP
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
p = self.predicate; c = self.column
if c is None:
if isinstance(it, settings.arrayTypes):
try: return it[p(it)]
except Exception as e: print(e)
return (l for l in it if p(l))
else:
if isinstance(it, settings.arrayTypes):
try: return it[p(it[:,c])]
except: pass
def gen():
for es in it:
es = list(es)
if c < len(es) and p(es[c]): yield es
return gen()
[docs] def __invert__(self):
"""Negate the condition"""
def f(s):
if isinstance(s, settings.arrayTypes):
res = self.predicate(s) # can cause an exception, but that's ok, as that's the signal telling the code in __ror__ to not pass in array types
if isinstance(res, settings.arrayTypes): return ~res
return not self.predicate(s)
return filt(f, self.column)
def __neg__(self):
"""Also negates the condition"""
return ~self
[docs] def split(self):
"""Splits the input into positive and negative samples.
Example::
# returns [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]]
range(10) | filt(lambda x: x%2 == 0).split() | deref()
# also returns [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]], exactly like above
range(10) | filt(lambda x: x%2 == 0) & filt(lambda x: x%2 != 0) | deref()"""
f = self.predicate; c = self.column; return filt(f, c) & ~filt(f, c)
filter_ = filt
[docs]def inSet(values:Set[Any], column:int=None) -> filt:
"""Filters out lines that is not in the specified set.
Example::
# returns [2, 3]
range(5) | inSet([2, 8, 3]) | deref()
# returns [0, 1, 4]
range(5) | ~inSet([2, 8, 3]) | deref()"""
if not isinstance(values, (set, dict)): values = set(values)
return filt(lambda l: l in values, column)
[docs]def contains(s:str, column:int=None) -> filt:
"""Filters out lines that don't contain the specified substring. Sort of similar
to :class:`~k1lib.cli.grep.grep`, but this is simpler, and can be inverted.
Example::
# returns ['abcd', '2bcr']
["abcd", "0123", "2bcr"] | contains("bc") | deref()"""
return filt(lambda e: s in e, column)
[docs]class empty(BaseCli):
[docs] def __init__(self, reverse=False):
"""Filters out streams that is not empty. Almost always used inverted,
but "empty" is a short, sweet name that's easy to remember. Example::
# returns [[1, 2], ['a']]
[[], [1, 2], [], ["a"]] | ~empty() | deref()
:param reverse: not intended to be used by the end user. Do ``~empty()`` instead."""
super().__init__(); self.reverse = reverse
[docs] def __ror__(self, streams:Iterator[Iterator[T]]) -> Iterator[Iterator[T]]:
r = self.reverse
for stream in streams:
try:
item, it = stream | cli.peek()
if not r:
if it == []: yield it
else:
if it != []: yield it
except StopIteration: pass
[docs] def __invert__(self):
return empty(not self.reverse)
[docs]def isNumeric(column:int=None) -> filt:
"""Filters out a line if that column is not a number.
Example::
# returns [0, 2, '3']
[0, 2, "3", "a"] | isNumeric() | deref()"""
def f(v):
try: float(v); return True
except ValueError: return False
return filt(f, column)
[docs]def instanceOf(cls:Union[type, Tuple[type]], column:int=None) -> filt:
"""Filters out lines that is not an instance of the given type.
Example::
# returns [2]
[2, 2.3, "a"] | instanceOf(int) | deref()
# returns [2, 2.3]
[2, 2.3, "a"] | instanceOf((int, float)) | deref()"""
if isinstance(cls, list): cls = tuple(cls)
return filt(lambda e: isinstance(e, cls), column)
def sliceable(it):
try: it[0]; len(it); return True
except: return False
def _head(n, inverted, it):
it = iter(it)
if n is None:
if not inverted: yield from it
else: return
elif n >= 0:
if not inverted:
for i, line in zip(range(n), it): yield line
else:
for i, line in zip(range(n), it): pass
yield from it
else:
if not inverted: # head to -3
n = abs(n); queue = deque()
for line in it:
queue.append(line)
if len(queue) > n: yield queue.popleft()
else: yield from deque(it, -n) # -3 to end
[docs]class head(BaseCli):
[docs] def __init__(self, n=10):
"""Only outputs first ``n`` elements. You can also negate it (like
``~head(5)``), which then only outputs after first ``n`` lines. Examples::
"abcde" | head(2) | deref() # returns ["a", "b"]
"abcde" | ~head(2) | deref() # returns ["c", "d", "e"]
"0123456" | head(-3) | deref() # returns ['0', '1', '2', '3']
"0123456" | ~head(-3) | deref() # returns ['4', '5', '6']
"012" | head(None) | deref() # returns ['0', '1', '2']
"012" | ~head(None) | deref() # returns []
You can also pass in fractional head::
range(20) | head(0.25) | deref() # returns [0, 1, 2, 3, 4], or the first 25% of samples
Also works well and fast with :class:`numpy.ndarray`, :class:`torch.Tensor`
and other sliceable types::
# returns (10,)
np.linspace(1, 3) | head(10) | shape()"""
super().__init__(); self.n = n; self.inverted = False
def _typehint(self, inp):
if isinstance(inp, tListIter): return inp
if isinstance(inp, tArrayTypes): return inp
if inp == str: return str
return tIter(tAny())
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
n = self.n; inverted = self.inverted
if n is not None and round(n) != n: # fractional head
if not sliceable(it): it = list(it)
i = int(len(it)*n)
return it[i:] if inverted else it[:i]
if inverted and n is None: return [] # special case
if sliceable(it): return it[n:] if inverted else it[:n]
else: return _head(self.n, self.inverted, it)
[docs] def __invert__(self):
h = head(self.n); h.inverted = not self.inverted
return h
[docs] def split(self):
"""Splits the list up into a head and tail sections.
Example::
# returns [[0, 1, 2, 3], [4, 5, 6, 7, 8, 9]]
range(10) | head(4).split() | deref()
This only splits it into 2 parts. If you want to split it up
into many more parts with specified checkpoints, check out
:class:`~k1lib.cli.structural.splitC`."""
return self & ~self
[docs]def tail(n:int=10):
"""Basically an inverted :class:`head`.
Examples::
range(10) | tail(3) | deref() # returns [7, 8, 9]"""
return ~head(-n)
class lazyList:
def __init__(self, it):
self.it = iter(it); self.elems = []
def __getitem__(self, idx):
elems = self.elems; it = self.it
for _ in range(len(elems)-1, idx): elems.append(next(it))
return elems[idx]
[docs]class rows(BaseCli):
[docs] def __init__(self, *rows:List[int]):
"""Selects specific elements given an iterator of indexes.
Space complexity O(1) as a list is not constructed (unless you're
slicing it in really weird way). Example::
"0123456789" | rows(2) | toList() # returns ["2"]
"0123456789" | rows(5, 8) | toList() # returns ["5", "8"]
"0123456789" | rows()[2:5] | toList() # returns ["2", "3", "4"]
"0123456789" | ~rows()[2:5] | toList() # returns ["0", "1", "5", "6", "7", "8", "9"]
"0123456789" | ~rows()[:7:2] | toList() # returns ['1', '3', '5', '7', '8', '9']
"0123456789" | rows()[:-4] | toList() # returns ['0', '1', '2', '3', '4', '5']
"0123456789" | ~rows()[:-4] | toList() # returns ['6', '7', '8', '9']
Why it's called "rows" is because I couldn't find a good name for
it. There was :class:`cut`, which the name of an actual bash cli
that selects out columns given indicies. When I needed a way to
do what this cli does, it was in the context of selecting out rows,
so the name stuck.
If you want to just pick out the nth item from the iterator, instead of doing
this::
iter(range(10)) | rows(3) | item() # returns 3
... you can use the shorthand :class:`~k1lib.cli.utils.rItem` instead::
iter(range(10)) | rItem(3) # returns 3
:param rows: ints for the row indices"""
if len(rows) == 1 and isinstance(rows[0], slice):
self.slice = rows[0]; self.idxMode = False
else: self.rows = rows; self.sortedRows = sorted(rows); self.idxMode = True
self.inverted = False
def __getitem__(self, _slice):
start, stop, step = _slice.start, _slice.stop, _slice.step
if step == None or step == 1:
if start == None and stop == None: return cli.iden()
if start == None: return head(stop)
if stop == None: return ~head(start)
elif step == 0: return cli.ignore()
answer = rows(_slice); answer.inverted = self.inverted; return answer
[docs] def __invert__(self): self.inverted = not self.inverted; return self
[docs] def __ror__(self, it:Iterator[str]):
if not self.inverted:
if self.idxMode:
it = list(it) if self.sortedRows[0] < 0 else lazyList(it)
for idx in self.rows: yield it[idx]
else: yield from list(it)[self.slice]
else:
it = list(it); n = len(it)
if self.idxMode:
idxs = set((e if e >= 0 else n+e) for e in self.rows)
else: idxs = set(range(n)[self.slice])
yield from (e for i, e in enumerate(it) if i not in idxs)
[docs]class cut(BaseCli):
[docs] def __init__(self, *columns:List[int]):
"""Cuts out specific columns, sliceable. Examples::
["0123456789", "abcdefghij"] | cut(5, 8) | deref() # returns [['5', '8'], ['f', 'i']]
["0123456789", "abcdefghij"] | cut(8, 5) | deref() # returns [['8', '5'], ['i', 'f']], demonstrating permutation-safe
["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']]
["0123456789"] | cut(8, 5) | deref() # returns [['8', '5']], demonstrating permutation-safe
["0123456789", "abcdefghij"] | cut(2) | deref() # returns ['2', 'c'], instead of [['2'], ['c']] as usual
["0123456789"] | cut(2) | deref() # returns ['2']
["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']]
["0123456789"] | ~cut()[:7:2] | deref() # returns [['1', '3', '5', '7', '8', '9']]
In the first example, you can imagine that we're operating on this table::
0123456789
abcdefghij
Then, we want to grab the 5th and 8th column (0-indexed), which forms this table::
58
fi
So, result of that is just ``[['5', '8'], ['f', 'i']]``
In the fourth example, if you're only cutting out 1 column, then it
will just grab that column directly, instead of putting it in a list.
If you pass in :class:`numpy.ndarray` or :class:`torch.Tensor`, then it will
automatically use the C-accelerated versions, like this::
torch.randn(4, 5, 6) | cut(2, 3) # returns tensor of shape (4, 2, 6)
torch.randn(4, 5, 6) | cut(2) # returns tensor of shape (4, 6)
torch.randn(4, 5, 6) | ~cut()[2:] # returns tensor of shape (4, 2, 6)
.. warning::
TD;DR: inverted negative indexes are a bad thing when rows don't have the same number of elements
Everything works fine when all of your rows have the same number of elements. But things might behave a
little strangely if they don't. For example::
# returns [['2', '3', '4'], ['2', '3', '4', '5', '6', '7']]. Different number of columns, works just fine
["0123456", "0123456789"] | cut()[2:-2] | deref()
# returns [['0', '1', '8', '9'], ['a', 'b', 'i', 'j']]. Same number of columns, works just fine
["0123456789", "abcdefghij"] | ~cut()[2:-2] | deref()
# returns [['0', '1', '5', '6'], ['0', '1', '5', '6', '7', '8', '9']]. Different number of columns, unsupported invert case
["0123456", "0123456789"] | ~cut()[2:-2] | deref()
Why does this happen? It peeks at the first row, determines that ~[2:-2] is equivalent
to [:2] and [5:] combined and not [:2] and [-2:] combined. When applied to the second row,
[-2:] goes from 5->9, hence the result. Another edge case would be::
# returns [['0', '1', '2', '3', '5', '6'], ['0', '1', '2', '3', '5', '6', '7', '8', '9']]
["0123456", "0123456789"] | ~cut(-3) | deref()
Like before, it peeks the first row and translate ~(-3) into ~4, which is equivalent to [:4] and [5:].
But when applied to the second row, it now carries the meaning ~4, instead of ~(-3).
Why don't I just fix these edge cases? Because the run time for it would be completely unacceptable,
as we'd have to figure out what's the columns to include in the result for every row. This could
easily be O(n^3). Of course, with more time optimizing, this could be solved, but this is the only
extreme edge case and I don't feel like putting in the effort to optimize it."""
super().__init__()
if len(columns) == 1 and isinstance(columns[0], slice): columns = columns[0]
self.columns = columns; self.inverted = False # columns: list[int] | slice
[docs] def __ror__(self, it:Table[T]) -> Table[T]:
columns = self.columns; inverted = self.inverted
isArray = isinstance(it, settings.arrayTypes)#; isArray = False
if isArray: nCols = len(it[0]); prs = rs = range(nCols) # range(nColumns). "prs" for padded rs
else: # carefully peaking first row and get the number of columns
it = iter(it); sentinel = object(); row = next(it, sentinel)
if row is sentinel: return []
row = list(row); it = it | cli.insert(row); nCols = len(row)
rs = range(nCols); prs = range(nCols+20) # 20 for longer rows below. Also "rs" is not a great name, deal with it
if isinstance(columns, slice):
if not inverted: return it[:,columns] if isArray else (row[columns] for row in (list(row) for row in it))
columns = list(set(rs[columns]))
columns = [e if e >= 0 else nCols + e for e in columns] # clear negative indicies
if self.inverted: columns = list(set(e for e in prs if e not in columns))
if len(columns) == 1: c = columns[0]; return it[:,c] if isArray else (r[c] for r in (list(row) for row in it) if len(r) > c)
else: return it[:,columns] if isArray else ([row[c] for c in columns if c < len(row)] for row in (list(row) for row in it))
def __getitem__(self, idx): answer = cut(idx); answer.inverted = self.inverted; return answer
[docs] def __invert__(self): self.inverted = not self.inverted; return self
[docs]class intersection(BaseCli):
[docs] def __init__(self, column=None):
"""Returns the intersection of multiple streams.
Example::
# returns set([2, 4, 5])
[[1, 2, 3, 4, 5], [7, 2, 4, 6, 5]] | intersection()
# returns ['2g', '4h', '5j']
[["1a", "2b", "3c", "4d", "5e"], ["7f", "2g", "4h", "6i", "5j"]] | intersection(0) | deref()
:param column: what column to apply the intersection
on. Defaulted to None"""
super().__init__(); self.column = column
def _typehint(self, inp):
if self.column is None:
if isinstance(inp, tArrayTypes): return tSet(inp.child)
if isinstance(inp, tListIterSet):
if isinstance(inp.child, tListIterSet):
return tSet(inp.child.child)
return tSet(tAny())
if isinstance(inp, tCollection):
a = inp.children[0]
for e in inp.children:
if not isinstance(e, tListIterSet): return tSet(tAny())
if e.child != a.child: return tSet(tAny())
return tSet(a.child)
return tSet(tAny());
else: return tAny()
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]:
c = self.column
if c is None:
answer = None
for it in its:
if answer is None: answer = set(it); continue
answer = answer.intersection(it)
return set() if answer is None else answer
else:
its = its | cli.deref(2); ans = {}
ids = its | cut(c).all() | intersection() | cli.aS(set)
for it in its:
for row in it:
e = row[c]
if e in ids: ans[e] = row
return ans.values()
[docs]class union(BaseCli):
[docs] def __init__(self):
"""Returns the union of multiple streams.
Example::
# returns {0, 1, 2, 10, 11, 12, 13, 14}
[range(3), range(10, 15)] | union()
"""
super().__init__()
def _typehint(self, inp):
return intersection()._typehint(inp)
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]:
answer = set()
for it in its: answer = set.union(answer, set(it))
return answer
[docs]class unique(BaseCli):
[docs] def __init__(self, column:int=None):
"""Filters out non-unique row elements.
Example::
# returns [[1, "a"], [2, "a"]]
[[1, "a"], [2, "a"], [1, "b"]] | unique(0) | deref()
# returns [0, 1, 2, 3, 4]
[*range(5), *range(3)] | unique() | deref()
In the first example, because the 3rd element's first column is
1, which has already appeared, so it will be filtered out.
:param column: the column to detect unique elements. Can be
None, which will behave like converting the input iterator
into a set, but this cli will maintain the order"""
super().__init__(); self.column = column
[docs] def __ror__(self, it:Table[T]) -> Table[T]:
c = self.column
if c is None:
terms = set()
for e in it:
if e not in terms: yield e
terms.add(e)
else:
terms = set()
for row in it:
row = list(row); e = row[c]
if e not in terms: yield row
terms.add(e)
[docs]class breakIf(BaseCli):
[docs] def __init__(self, f):
"""Breaks the input iterator if a condition is met.
Example::
# returns [0, 1, 2, 3, 4, 5]
[*range(10), 2, 3] | breakIf(lambda x: x > 5) | deref()"""
fs = [f]; super().__init__(fs); self.f = fs[0]; self._fC = fastF(self.f)
def _typehint(self, inp):
if isinstance(inp, tListIterSet): return tIter(inp.child)
return tIter(tAny())
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
f = self._fC
for line in it:
if f(line): break
yield line
[docs]class mask(BaseCli):
[docs] def __init__(self, mask:Iterator[bool]):
"""Masks the input stream.
Example::
# returns [0, 1, 3]
range(5) | mask([True, True, False, True, False]) | deref()
# returns torch.tensor([0, 1, 3])
torch.tensor(range(5)) | mask([True, True, False, True, False])"""
super().__init__(); self.mask = mask
[docs] def __ror__(self, it):
if isinstance(it, settings.arrayTypes):
return it[list(self.mask)]
return (e for e, m in zip(it, self.mask) if m)
[docs]class tryout(BaseCli):
end = object()
[docs] def __init__(self, result=None):
"""Wraps every cli operation after this in a try-catch block, returning ``result``.
This can be a little finicky. Example::
# returns 9
3 | (tryout("failed") | op()**2)
# returns "failed", instead of raising an exception
"3" | (tryout("failed") | op()**2)
# returns "unsupported operand type(s) for ** or pow(): 'str' and 'int'"
"3" | (tryout(Exception) | op()**2)
By default, this ``tryout()`` object will gobble up all clis behind it and wrap
them inside a try-catch block. This might be undesirable, so you can stop it early::
# returns "failed"
3 | (tryout("failed") | op()**2 | aS(str) | op()**2)
# raises an exception, because it does not errors after `tryout.end`
3 | (tryout("failed") | op()**2 | tryout.end | aS(str) | op()**2)
:param result: result to return if there is an exception. If passed in the class
`Exception`, then will return the exception's string instead"""
self.clis = []; self.ser = None; self.result = result; self.absorbing = True
def __or__(self, it):
if it is tryout.end: self.absorbing = False; return self
if isinstance(it, BaseCli):
if self.absorbing: self.clis.append(it); self.ser = None; return self
else: return super().__or__(it)
else: raise Exception("Can't pipe tryout() to a non-cli tool");
[docs] def __ror__(self, it):
if self.ser is None:
self.ser = cli.serial(*self.clis)
if len(self.clis) == 0: raise Exception("tryout() currently does not wrap around any other cli. You may need to change `data | tryout() | cli1() | cli2()` into `data | (tryout() | cli1() | cli2())`")
try: return it | self.ser
except Exception as e: return str(e) if self.result is Exception else self.result