# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for functions that cuts out specific parts of the table
"""
from typing import Callable, Union, List, overload, Iterator, Any, Set
from k1lib.bioinfo.cli.init import BaseCli, settings, Table, T
import k1lib.bioinfo.cli as cli
__all__ = ["filt", "isValue", "inSet", "contains", "nonEmptyStream",
"startswith", "endswith",
"isNumeric", "inRange",
"head", "nhead",
"columns", "cut", "rows", "every", "intersection"]
[docs]class filt(BaseCli):
[docs] def __init__(self, predicate:Callable[[str], bool], column:int=None):
"""Filters out lines.
:param column:
- if integer, then predicate(row[column])
- if None, then predicate(line)"""
self.predicate = predicate; self.column = column
[docs] def __ror__(self, it:Iterator[str]):
p = self.predicate; c = self.column
if c is None: yield from (l for l in it if p(l))
else: yield from (es for es in it if c < len(es) and p(es[c]))
[docs] def __invert__(self):
"""Negate the condition"""
return filt(lambda s: not self.predicate(s), self.column)
[docs]def isValue(value, column:int=None):
"""Filters out lines that is different from the given value"""
return filt(lambda l: l == value, column)
[docs]def inSet(values:Set[Any], column:int=None):
"""Filters out lines that is not in the specified set"""
return filt(lambda l: l in values, column)
[docs]def contains(s:str, column:int=None):
"""Filters out lines that don't contain the specified substring"""
return filt(lambda e: s in e, column)
[docs]class nonEmptyStream(BaseCli):
"""Filters out streams that have no rows"""
[docs] def __ror__(self, streams:Iterator[Iterator[Any]]) -> Iterator[Iterator[Any]]:
for stream in streams:
try:
it = iter(stream)
firstValue = next(it)
def newGen():
yield firstValue; yield from it
yield newGen()
except StopIteration: pass
[docs]def startswith(s:str, column:int=None):
"""Filters out lines that don't start with `s`"""
return filt(lambda l: l.startswith(s), column)
[docs]def endswith(s:str, column:int=None):
"""Filters out lines that don't end with `s`"""
return filt(lambda l: l.endswith(s), column)
[docs]def isNumeric(column:int=None):
"""Filters out a line if that column is not a number"""
def f(v):
try: float(v); return True
except ValueError: return False
return filt(f, column)
[docs]def inRange(min:float=None, max:float=None, column:int=None):
"""Checks whether a column is in range or not"""
if min is None: min = float("-inf")
if max is None: max = float("inf")
return filt(lambda e: e >= min and e < max, column)
[docs]class head(BaseCli):
[docs] def __init__(self, n:int=10):
"""Only outputs first {n} lines, preferable over row()[:n]"""
self.n = n
[docs] def __ror__(self, it:Iterator[str]):
for i, line in enumerate(it):
if i >= self.n: break
yield line
[docs]class nhead(BaseCli):
[docs] def __init__(self, n:int=1):
"""Only outputs after first {n} lines, preferable over row()[n:]"""
self.n = n
[docs] def __ror__(self, it:Iterator[str]):
for i, line in enumerate(it):
if i < self.n: continue
yield line
[docs]class columns(BaseCli):
[docs] def __init__(self, *columns:Union[int, slice, List[int]]):
"""Cuts out specific columns, sliceable"""
if len(columns) == 1 and isinstance(columns[0], (list, tuple, slice)): columns = columns[0]
self.columns = columns
[docs] def __ror__(self, it:Table[T]) -> Table[T]:
columns = self.columns
if isinstance(columns, int): columns = set([columns])
if isinstance(columns, list): columns = set(columns)
for i, elems in enumerate(it):
if i == 0 and isinstance(columns, slice):
columns = set(range(len(elems))[columns])
if len(columns) == 1: yield elems[columns[0]]
else: yield (e for i, e in enumerate(elems) if i in columns)
def __getitem__(self, idx): return columns(idx)
cut = columns
[docs]class rows(BaseCli):
[docs] def __init__(self, *rows):
"""Cuts out specific rows. Can do `rows()[5:10]` to get rows 5 to 10"""
if len(rows) == 1 and isinstance(rows[0], (list, tuple)): rows = rows[0]
self.rows = rows
def __getitem__(self, _slice):
answer = rows(); answer.rows = _slice; return answer
[docs] def __ror__(self, it:Iterator[str]):
l = list(it)
if isinstance(self.rows, slice):
self.rows = range(len(l))[self.rows]
for row in self.rows: yield l[row]
[docs]class every(BaseCli):
[docs] def __init__(self, length:int, offset:int=0):
"""Get lines every `length`, starting at a specific `offset`"""
self.length = length; self.offset = offset
[docs] def __ror__(self, it:Iterator[str]):
for i, line in enumerate(it):
if (i - self.offset) % self.length == 0: yield line
[docs]class intersection(BaseCli):
"""Returns the intersection of multiple streams. Example::
[[1, 2, 3, 4, 5], [7, 2, 4, 6, 5]] | intersection() # will return set([2, 4, 5])
"""
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]:
answer = None
for it in its:
if answer is None: answer = set(it); continue
answer = answer.intersection(it)
return answer