# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for functions that cuts out specific parts of the table
"""
from typing import Callable, Union, List, overload, Iterator, Any, Set, Tuple
from k1lib.cli.init import BaseCli, settings, Table, T
import k1lib.cli as cli
import k1lib, os
from collections import deque
__all__ = ["filt", "isValue", "isFile", "inSet", "contains", "empty",
"startswith", "endswith",
"isNumeric", "instanceOf", "inRange",
"head", "columns", "cut", "rows",
"intersection", "union", "unique", "breakIf", "mask"]
[docs]class filt(BaseCli):
[docs] def __init__(self, predicate:Callable[[T], bool], column:int=None):
"""Filters out lines.
Examples::
# returns [2, 6]
[2, 3, 5, 6] | filt(lambda x: x%2 == 0) | deref()
# returns [3, 5]
[2, 3, 5, 6] | ~filt(lambda x: x%2 == 0) | deref()
# returns [[2, 'a'], [6, 'c']]
[[2, "a"], [3, "b"], [5, "a"], [6, "c"]] | filt(lambda x: x%2 == 0, 0) | deref()
:param column:
- if integer, then predicate(row[column])
- if None, then predicate(row)"""
super().__init__(fs=[predicate])
self.predicate = predicate; self.column = column
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
super().__ror__(it)
p = self.predicate; c = self.column
if c is None: yield from (l for l in it if p(l))
else:
for es in it:
es = list(es)
if c < len(es) and p(es[c]): yield es
[docs] def __invert__(self):
"""Negate the condition"""
return filt(lambda s: not self.predicate(s), self.column)
[docs]def isValue(value, column:int=None) -> filt:
"""Filters out lines that is different from the given value.
Example::
# returns [2, 2]
[1, 2, 3, 2, 1] | isValue(2) | deref()
# returns [1, 3, 1]
[1, 2, 3, 2, 1] | ~isValue(2) | deref()
# returns [[1, 2]]
[[1, 2], [2, 1], [3, 4]] | isValue(2, 1) | deref()"""
return filt(lambda l: l == value, column)
[docs]def isFile() -> filt:
"""Filters out non-files.
Example::
# returns ["a.py", "b.py"], if those files really do exist
["a.py", "hg/", "b.py"] | isFile()"""
return filt(lambda l: os.path.isfile(l))
[docs]def inSet(values:Set[Any], column:int=None) -> filt:
"""Filters out lines that is not in the specified set.
Example::
# returns [2, 3]
range(5) | inSet([2, 8, 3]) | deref()
# returns [0, 1, 4]
range(5) | ~inSet([2, 8, 3]) | deref()"""
values = set(values)
return filt(lambda l: l in values, column)
[docs]def contains(s:str, column:int=None) -> filt:
"""Filters out lines that don't contain the specified substring. Sort of similar
to :class:`~k1lib.cli.grep.grep`, but this is simpler, and can be inverted.
Example::
# returns ['abcd', '2bcr']
["abcd", "0123", "2bcr"] | contains("bc") | deref()"""
return filt(lambda e: s in e, column)
[docs]class empty(BaseCli):
[docs] def __init__(self, reverse=False):
"""Filters out streams that is not empty. Almost always used inverted,
but "empty" is a short, sweet name easy to remember. Example::
# returns [[1, 2], ['a']]
[[], [1, 2], [], ["a"]] | ~empty() | deref()
:param reverse: not intended to be used by the end user. Do ``~empty()`` instead."""
super().__init__(); self.reverse = reverse
[docs] def __ror__(self, streams:Iterator[Iterator[T]]) -> Iterator[Iterator[T]]:
super().__ror__(streams); r = self.reverse
for stream in streams:
try:
item, it = stream | cli.peek()
if not r:
if it == []: yield it
else:
if it != []: yield it
except StopIteration: pass
[docs] def __invert__(self):
return empty(not self.reverse)
[docs]def startswith(s:str, column:int=None) -> filt:
"""Filters out lines that don't start with `s`.
Example::
# returns ['ab', 'ac']
["ab", "cd", "ac"] | startswith("a") | deref()
# returns ['cd']
["ab", "cd", "ac"] | ~startswith("a") | deref()"""
return filt(lambda l: l.startswith(s), column)
[docs]def endswith(s:str, column:int=None) -> filt:
"""Filters out lines that don't end with `s`. See also: :meth:`startswith`"""
return filt(lambda l: l.endswith(s), column)
[docs]def isNumeric(column:int=None) -> filt:
"""Filters out a line if that column is not a number.
Example:
# returns [0, 2, '3']
[0, 2, "3", "a"] | isNumeric() | deref()"""
def f(v):
try: float(v); return True
except ValueError: return False
return filt(f, column)
[docs]def instanceOf(cls:Union[type, Tuple[type]], column:int=None) -> filt:
"""Filters out lines that is not an instance of the given type.
Example::
# returns [2]
[2, 2.3, "a"] | instanceOf(int) | deref()
# returns [2, 2.3]
[2, 2.3, "a"] | instanceOf((int, float)) | deref()"""
if isinstance(cls, list): cls = tuple(cls)
return filt(lambda e: isinstance(e, cls), column)
[docs]def inRange(min:float=float("-inf"), max:float=float("inf"), column:int=None) -> filt:
"""Checks whether a column is in range or not.
Example::
# returns [-2, 3, 6]
[-2, -8, 3, 6] | inRange(min=-3) | deref()
# returns [-8]
[-2, -8, 3, 6] | ~inRange(min=-3) | deref()"""
return filt(lambda e: e >= min and e < max, column)
[docs]class head(BaseCli):
[docs] def __init__(self, n:int=10):
"""Only outputs first ``n`` lines. You can also negate it (like
``~head(5)``), which then only outputs after first ``n`` lines. Examples::
"abcde" | head(2) | deref() # returns ["a", "b"]
"abcde" | ~head(2) | deref() # returns ["c", "d", "e"]
"0123456" | head(-3) | deref() # returns ['0', '1', '2', '3']
"0123456" | ~head(-3) | deref() # returns ['4', '5', '6']"""
super().__init__(); self.n = n; self.inverted = False
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
super().__ror__(it); n = self.n
if n >= 0:
if not self.inverted:
for i, line in enumerate(it):
if i >= n: return
yield line
else:
for i, line in enumerate(it):
if i < n: continue
yield line
else:
n = abs(n); queue = deque()
if not self.inverted: # head to -3
for line in it:
queue.append(line)
if len(queue) > n: yield queue.popleft()
else: # -3 to end
for line in it:
queue.append(line)
if len(queue) > n: queue.popleft()
yield from queue
[docs] def __invert__(self): self.inverted = not self.inverted; return self
class rowsList(BaseCli):
"""Space-expensive implementation for :class:`rows`, without a lot of
flexibility. Just used for slices with negative start/stop really. Don't use
this directly, use :class:`rows` instead"""
def __init__(self, _slice):
super().__init__(); self._slice = _slice; self.inverted = False
def __ror__(self, it:Iterator[str]):
super().__ror__(it)
it = list(it); full = range(len(it))
rows = full[self._slice]
if self.inverted: rows = [e for e in full if e not in rows]
for row in rows: yield it[row]
def __invert__(self): self.inverted = True; return self
[docs]class rows(BaseCli):
[docs] def __init__(self, *rows:List[int]):
"""Cuts out specific rows. Space complexity O(1) as a list is not
constructed (unless you're using some really weird slices).
:param rows: ints for the row indices
Example::
"0123456789" | rows(2) | deref() # returns ["2"]
"0123456789" | rows(5, 8) | deref() # returns ["5", "8"]
"0123456789" | rows()[2:5] | deref() # returns ["2", "3", "4"]
"0123456789" | ~rows()[2:5] | deref() # returns ["0", "1", "5", "6", "7", "8", "9"]
"0123456789" | ~rows()[:7:2] | deref() # returns ['1', '3', '5', '7', '8', '9']
"0123456789" | rows()[:-4] | deref() # returns ['0', '1', '2', '3', '4', '5']
"0123456789" | ~rows()[:-4] | deref() # returns ['6', '7', '8', '9']"""
super().__init__()
if len(rows) == 1 and isinstance(rows[0], slice):
s = rows[0]
start = s.start if s.start is not None else float("-inf")
stop = s.stop if s.stop is not None else float("inf")
self.domain = k1lib.Domain([start, stop])
self.every = s.step or 1 # only used for slices really
else:
self.domain = k1lib.Domain.fromInts(*rows)
self.every = 1
self.inverted = False
def _every(self, every): self.every = every; return self
def __getitem__(self, _slice):
s1, s2 = _slice.start, _slice.stop
a = (_slice.start or 0) < 0; b = (_slice.stop or 0) < 0
c = (_slice.step or 1)
if a or b: # at least 1 is negative
if c == 1:
if b:
if s1 is None: return head(s2) # [None, -3]
else: return ~head(s1) | head(s2) # [5, -3]
if a and s2 is None: return ~head(s1) # [-3, None]
# else case is [-10, 6], which is weird, so just stick to the long one
return rowsList(_slice) # worst case scenario
answer = rows(_slice); answer.inverted = self.inverted; return answer
[docs] def __invert__(self): self.inverted = not self.inverted; return self
[docs] def __ror__(self, it:Iterator[str]):
super().__ror__(it)
true, false = (False, True) if self.inverted else (True, False)
def gates():
gate = self.domain.intIter(self.every); x = 0
for i in gate:
while x < i: yield false; x += 1
yield true; x += 1
while True: yield false
return (row for gate, row in zip(gates(), it) if gate)
[docs]class columns(BaseCli):
[docs] def __init__(self, *columns:List[int]):
"""Cuts out specific columns, sliceable. Examples::
["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']]
["0123456789"] | cut(2) | deref() # returns ['2']
["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']]
["0123456789"] | ~cut()[:7:2] | deref() # returns [['1', '3', '5', '7', '8', '9']]
If you're selecting only 1 column, then Iterator[T] will be returned, not
Table[T]."""
super().__init__()
if len(columns) == 1 and isinstance(columns[0], slice): columns = columns[0]
self.columns = columns; self.inverted = False
[docs] def __ror__(self, it:Table[T]) -> Table[T]:
super().__ror__(it); columns = self.columns; it = iter(it)
sentinel = object(); row = next(it, sentinel)
if row == sentinel: return []
row = list(row); rs = range(len(row)); it = it | cli.joinList(row)
if isinstance(columns, slice): columns = set(rs[columns])
if self.inverted: columns = set(e for e in rs if e not in columns)
if len(columns) == 1: c = list(columns)[0]; return (list(row)[c] for row in it)
else: return ((e for i, e in enumerate(row) if i in columns) for row in it)
def __getitem__(self, idx):
answer = columns(idx); answer.inverted = self.inverted; return answer
[docs] def __invert__(self): self.inverted = not self.inverted; return self
cut = columns
[docs]class intersection(BaseCli):
"""Returns the intersection of multiple streams.
Example::
# returns set([2, 4, 5])
[[1, 2, 3, 4, 5], [7, 2, 4, 6, 5]] | intersection()"""
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]:
answer = None
for it in its:
if answer is None: answer = set(it); continue
answer = answer.intersection(it)
return answer
[docs]class union(BaseCli):
"""Returns the union of multiple streams.
Example::
# returns {0, 1, 2, 10, 11, 12, 13, 14}
[range(3), range(10, 15)] | union()
"""
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]:
answer = set()
for it in its: answer = set.union(answer, set(it))
return answer
[docs]class unique(BaseCli):
[docs] def __init__(self, column:int):
"""Filters out non-unique row elements.
Example::
# returns [[1, "a"], [2, "a"]]
[[1, "a"], [2, "a"], [1, "b"]] | unique(0) | deref()
:param column: doesn't have the default case of None, because you can always use
:class:`k1lib.cli.utils.toSet`"""
super().__init__(); self.column = column
[docs] def __ror__(self, it:Table[T]) -> Table[T]:
self.__ror__(it); terms = set(); c = self.column
for row in it:
row = list(row); e = row[c]
if e not in terms: yield row
terms.add(e)
[docs]class breakIf(BaseCli):
[docs] def __init__(self, f):
"""Breaks the input iterator if a condition is met.
Example::
# returns [0, 1, 2, 3, 4, 5]
[*range(10), 2, 3] | breakIf(lambda x: x > 5) | deref()"""
super().__init__(); self.f = f
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
super().__ror__(it); f = self.f
for line in it:
if f(line): break
yield line
[docs]class mask(BaseCli):
[docs] def __init__(self, mask:Iterator[bool]):
"""Masks the input stream.
Example::
# returns [0, 1, 3]
range(5) | mask([True, True, False, True, False]) | deref()"""
super().__init__(); self.mask = mask
[docs] def __ror__(self, it):
for e, m in zip(it, self.mask):
if m: yield e