# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for functions that cuts out specific parts of the table
"""
from typing import Callable, Union, List, overload, Iterator, Any, Set, Tuple
from k1lib.cli.init import BaseCli, Table, T, fastF
import k1lib.cli as cli; import k1lib, os, torch
import numpy as np; from collections import deque
__all__ = ["filt", "inSet", "contains", "empty",
"isNumeric", "instanceOf", "inRange",
"head", "tail", "columns", "cut", "rows",
"intersection", "union", "unique", "breakIf", "mask"]
settings = k1lib.settings.cli
[docs]class filt(BaseCli):
[docs] def __init__(self, predicate:Callable[[T], bool], column:int=None):
"""Filters out lines.
Examples::
# returns [2, 6]
[2, 3, 5, 6] | filt(lambda x: x%2 == 0) | deref()
# returns [3, 5]
[2, 3, 5, 6] | ~filt(lambda x: x%2 == 0) | deref()
# returns [[2, 'a'], [6, 'c']]
[[2, "a"], [3, "b"], [5, "a"], [6, "c"]] | filt(lambda x: x%2 == 0, 0) | deref()
You can also pass in :class:`~k1lib.cli.modifier.op`, for extra intuitiveness::
# returns [2, 6]
[2, 3, 5, 6] | filt(op() % 2 == 0) | deref()
# returns ['abc', 'a12']
["abc", "def", "a12"] | filt(op().startswith("a")) | deref()
# returns ['abcd', '2bcr']
["abcd", "0123", "2bcr"] | filt("bc" in op()) | deref()
# returns [2, 3]
range(5) | filt(op() in [2, 8, 3]) | deref()
# returns [0, 1, 4]. Does not support `filt(op() not in [2, 8, 3])`. Use inverted filt instead!
range(5) | ~filt(op() in [2, 8, 3]) | deref()
If you need more extensive filtering capabilities, check out :class:`~k1lib.cli.grep.grep`
:param column:
- if integer, then predicate(row[column])
- if None, then predicate(row)"""
fs = [predicate]; super().__init__(fs)
self.predicate = fs[0]; self.column = column
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
p = fastF(self.predicate); c = self.column
if c is None: yield from (l for l in it if p(l))
else:
for es in it:
es = list(es)
if c < len(es) and p(es[c]): yield es
[docs] def __invert__(self):
"""Negate the condition"""
return filt(lambda s: not self.predicate(s), self.column)
def __neg__(self):
"""Also negates the condition"""
return ~self
[docs]def inSet(values:Set[Any], column:int=None) -> filt:
"""Filters out lines that is not in the specified set.
Example::
# returns [2, 3]
range(5) | inSet([2, 8, 3]) | deref()
# returns [0, 1, 4]
range(5) | ~inSet([2, 8, 3]) | deref()
You can also use :class:`~k1lib.cli.modifier.op` like this, so you don't
have to remember this cli::
# returns [2, 3]
range(5) | filt(op() in [2, 8, 3]) | deref()
# returns [0, 1, 4]
range(5) | ~filt(op() in [2, 8, 3]) | deref()
However, this feature is very experimental"""
values = set(values)
return filt(lambda l: l in values, column)
[docs]def contains(s:str, column:int=None) -> filt:
"""Filters out lines that don't contain the specified substring. Sort of similar
to :class:`~k1lib.cli.grep.grep`, but this is simpler, and can be inverted.
Example::
# returns ['abcd', '2bcr']
["abcd", "0123", "2bcr"] | contains("bc") | deref()
You can also use :class:`~k1lib.cli.modifier.op` like this::
# returns ['abcd', '2bcr']
["abcd", "0123", "2bcr"] | filt("bc" in op()) | deref()"""
return filt(lambda e: s in e, column)
[docs]class empty(BaseCli):
[docs] def __init__(self, reverse=False):
"""Filters out streams that is not empty. Almost always used inverted,
but "empty" is a short, sweet name that's easy to remember. Example::
# returns [[1, 2], ['a']]
[[], [1, 2], [], ["a"]] | ~empty() | deref()
:param reverse: not intended to be used by the end user. Do ``~empty()`` instead."""
super().__init__(); self.reverse = reverse
[docs] def __ror__(self, streams:Iterator[Iterator[T]]) -> Iterator[Iterator[T]]:
r = self.reverse
for stream in streams:
try:
item, it = stream | cli.peek()
if not r:
if it == []: yield it
else:
if it != []: yield it
except StopIteration: pass
[docs] def __invert__(self):
return empty(not self.reverse)
[docs]def isNumeric(column:int=None) -> filt:
"""Filters out a line if that column is not a number.
Example::
# returns [0, 2, '3']
[0, 2, "3", "a"] | isNumeric() | deref()"""
def f(v):
try: float(v); return True
except ValueError: return False
return filt(f, column)
[docs]def instanceOf(cls:Union[type, Tuple[type]], column:int=None) -> filt:
"""Filters out lines that is not an instance of the given type.
Example::
# returns [2]
[2, 2.3, "a"] | instanceOf(int) | deref()
# returns [2, 2.3]
[2, 2.3, "a"] | instanceOf((int, float)) | deref()"""
if isinstance(cls, list): cls = tuple(cls)
return filt(lambda e: isinstance(e, cls), column)
[docs]def inRange(min:float=float("-inf"), max:float=float("inf"), column:int=None) -> filt:
"""Checks whether a column is in range or not.
Example::
# returns [-2, 3, 6]
[-2, -8, 3, 6] | inRange(-3, 10) | deref()
# returns [-8]
[-2, -8, 3, 6] | ~inRange(-3, 10) | deref()
If you wish to just check against 1 bound, then use filt directly, like this::
# returns [3, 4]
range(5) | filt(op() >= 3) | deref()"""
return filt(lambda e: e >= min and e < max, column)
def sliceable(it):
try: it[0]; len(it); return True
except: return False
def _head(n, inverted, it):
it = iter(it)
if n is None:
if not inverted: yield from it
else: return
elif n >= 0:
if not inverted:
for i, line in zip(range(n), it): yield line
else:
for i, line in zip(range(n), it): pass
yield from it
else:
if not inverted: # head to -3
n = abs(n); queue = deque()
for line in it:
queue.append(line)
if len(queue) > n: yield queue.popleft()
else: yield from deque(it, -n) # -3 to end
[docs]class head(BaseCli):
[docs] def __init__(self, n=10):
"""Only outputs first ``n`` lines. You can also negate it (like
``~head(5)``), which then only outputs after first ``n`` lines. Examples::
"abcde" | head(2) | deref() # returns ["a", "b"]
"abcde" | ~head(2) | deref() # returns ["c", "d", "e"]
"0123456" | head(-3) | deref() # returns ['0', '1', '2', '3']
"0123456" | ~head(-3) | deref() # returns ['4', '5', '6']
"012" | head(None) | deref() # returns ['0', '1', '2']
"012" | ~head(None) | deref() # returns []
You can also pass in fractional head::
range(20) | head(0.25) | deref() # returns [0, 1, 2, 3, 4], or the first 25% of samples
Also works well and fast with :class:`numpy.ndarray`, :class:`torch.Tensor`
and other sliceable types::
# returns (10,)
np.linspace(1, 3) | head(10) | shape()"""
super().__init__(); self.n = n; self.inverted = False
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
n = self.n; inverted = self.inverted
if n is not None and round(n) != n: # fractional head
if not sliceable(it): raise Exception(f"Can't do fractional head (`head({n})`) if input is not sliceable (aka not a list, tuple, numpy array or pytorch tensors, etc). Convert to a list first by passing through `toList()`")
i = int(len(it)*n)
return it[i:] if inverted else it[:i]
if inverted and n is None: return [] # special case
if sliceable(it): return it[n:] if inverted else it[:n]
else: return _head(self.n, self.inverted, it)
[docs] def __invert__(self): self.inverted = not self.inverted; return self
[docs]def tail(n:int=10):
"""Basically an inverted :class:`head`.
Examples::
range(10) | tail(3) | deref() # returns [7, 8, 9]"""
return ~head(-n)
class lazyList:
def __init__(self, it):
self.it = iter(it); self.elems = []
def __getitem__(self, idx):
elems = self.elems; it = self.it
for _ in range(len(elems)-1, idx): elems.append(next(it))
return elems[idx]
[docs]class rows(BaseCli):
[docs] def __init__(self, *rows:List[int]):
"""Cuts out specific rows. Space complexity O(1) as a list is not
constructed (unless you're using some really weird slices).
:param rows: ints for the row indices
Example::
"0123456789" | rows(2) | toList() # returns ["2"]
"0123456789" | rows(5, 8) | toList() # returns ["5", "8"]
"0123456789" | rows()[2:5] | toList() # returns ["2", "3", "4"]
"0123456789" | ~rows()[2:5] | toList() # returns ["0", "1", "5", "6", "7", "8", "9"]
"0123456789" | ~rows()[:7:2] | toList() # returns ['1', '3', '5', '7', '8', '9']
"0123456789" | rows()[:-4] | toList() # returns ['0', '1', '2', '3', '4', '5']
"0123456789" | ~rows()[:-4] | toList() # returns ['6', '7', '8', '9']"""
if len(rows) == 1 and isinstance(rows[0], slice):
self.slice = rows[0]; self.idxMode = False
else: self.rows = rows; self.sortedRows = sorted(rows); self.idxMode = True
self.inverted = False
def __getitem__(self, _slice):
start, stop, step = _slice.start, _slice.stop, _slice.step
if step == None or step == 1:
if start == None and stop == None: return cli.iden()
if start == None: return head(stop)
if stop == None: return ~head(start)
elif step == 0: return cli.ignore()
answer = rows(_slice); answer.inverted = self.inverted; return answer
[docs] def __invert__(self): self.inverted = not self.inverted; return self
[docs] def __ror__(self, it:Iterator[str]):
if not self.inverted:
if self.idxMode:
it = list(it) if self.sortedRows[0] < 0 else lazyList(it)
for idx in self.rows: yield it[idx]
else: yield from list(it)[self.slice]
else:
it = list(it); n = len(it)
if self.idxMode:
idxs = set((e if e >= 0 else n+e) for e in self.rows)
else: idxs = set(range(n)[self.slice])
yield from (e for i, e in enumerate(it) if i not in idxs)
[docs]class columns(BaseCli):
[docs] def __init__(self, *columns:List[int]):
"""Cuts out specific columns, sliceable. Examples::
["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']]
["0123456789"] | cut(2) | deref() # returns ['2']
["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']]
["0123456789"] | ~cut()[:7:2] | deref() # returns [['1', '3', '5', '7', '8', '9']]
If you're selecting only 1 column, then Iterator[T] will be returned, not
Table[T]."""
super().__init__()
if len(columns) == 1 and isinstance(columns[0], slice): columns = columns[0]
self.columns = columns; self.inverted = False
[docs] def __ror__(self, it:Table[T]) -> Table[T]:
columns = self.columns; it = iter(it)
sentinel = object(); row = next(it, sentinel)
if row == sentinel: return []
row = list(row); rs = range(len(row)+1000) # 1000 for longer rows below
it = it | cli.insert(row)
if isinstance(columns, slice): columns = set(rs[columns])
if self.inverted: columns = set(e for e in rs if e not in columns)
if len(columns) == 1:
c = list(columns)[0];
return (r[c] for r in (list(row) for row in it) if len(r) > c)
else: return ((e for i, e in enumerate(row) if i in columns) for row in it)
def __getitem__(self, idx):
answer = columns(idx); answer.inverted = self.inverted; return answer
[docs] def __invert__(self): self.inverted = not self.inverted; return self
cut = columns
[docs]class intersection(BaseCli):
[docs] def __init__(self):
"""Returns the intersection of multiple streams.
Example::
# returns set([2, 4, 5])
[[1, 2, 3, 4, 5], [7, 2, 4, 6, 5]] | intersection()"""
super().__init__()
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]:
answer = None
for it in its:
if answer is None: answer = set(it); continue
answer = answer.intersection(it)
return answer
[docs]class union(BaseCli):
[docs] def __init__(self):
"""Returns the union of multiple streams.
Example::
# returns {0, 1, 2, 10, 11, 12, 13, 14}
[range(3), range(10, 15)] | union()
"""
super().__init__()
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]:
answer = set()
for it in its: answer = set.union(answer, set(it))
return answer
[docs]class unique(BaseCli):
[docs] def __init__(self, column:int):
"""Filters out non-unique row elements.
Example::
# returns [[1, "a"], [2, "a"]]
[[1, "a"], [2, "a"], [1, "b"]] | unique(0) | deref()
:param column: doesn't have the default case of None, because you can always use
:class:`k1lib.cli.conv.toSet`"""
super().__init__(); self.column = column
[docs] def __ror__(self, it:Table[T]) -> Table[T]:
terms = set(); c = self.column
for row in it:
row = list(row); e = row[c]
if e not in terms: yield row
terms.add(e)
[docs]class breakIf(BaseCli):
[docs] def __init__(self, f):
"""Breaks the input iterator if a condition is met.
Example::
# returns [0, 1, 2, 3, 4, 5]
[*range(10), 2, 3] | breakIf(lambda x: x > 5) | deref()"""
fs = [f]; super().__init__(fs); self.f = fs[0]
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
f = self.f
for line in it:
if f(line): break
yield line
[docs]class mask(BaseCli):
[docs] def __init__(self, mask:Iterator[bool]):
"""Masks the input stream.
Example::
# returns [0, 1, 3]
range(5) | mask([True, True, False, True, False]) | deref()
# returns torch.tensor([0, 1, 3])
torch.tensor(range(5)) | mask([True, True, False, True, False])"""
super().__init__(); self.mask = mask
[docs] def __ror__(self, it):
if isinstance(it, settings.arrayTypes):
return it[list(self.mask)]
return (e for e, m in zip(it, self.mask) if m)