Source code for k1lib.bioinfo.cli.filt

# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for functions that cuts out specific parts of the table
"""
from typing import Callable, Union, List, overload, Iterator, Any, Set
from k1lib.bioinfo.cli.init import BaseCli, settings, Table, T
import k1lib.bioinfo.cli as cli
import k1lib
from collections import deque
__all__ = ["filt", "isValue", "inSet", "contains", "nonEmptyStream",
           "startswith", "endswith",
           "isNumeric", "inRange",
           "head", "columns", "cut", "rows", "intersection"]
[docs]class filt(BaseCli):
[docs] def __init__(self, predicate:Callable[[str], bool], column:int=None): """Filters out lines. :param column: - if integer, then predicate(row[column]) - if None, then predicate(line)""" self.predicate = predicate; self.column = column
[docs] def __ror__(self, it:Iterator[str]): p = self.predicate; c = self.column if c is None: yield from (l for l in it if p(l)) else: for es in it: es = list(es) if c < len(es) and p(es[c]): yield es
[docs] def __invert__(self): """Negate the condition""" return filt(lambda s: not self.predicate(s), self.column)
[docs]def isValue(value, column:int=None): """Filters out lines that is different from the given value""" return filt(lambda l: l == value, column)
[docs]def inSet(values:Set[Any], column:int=None): """Filters out lines that is not in the specified set""" return filt(lambda l: l in values, column)
[docs]def contains(s:str, column:int=None): """Filters out lines that don't contain the specified substring""" return filt(lambda e: s in e, column)
[docs]class nonEmptyStream(BaseCli): """Filters out streams that have no rows"""
[docs] def __ror__(self, streams:Iterator[Iterator[Any]]) -> Iterator[Iterator[Any]]: for stream in streams: try: it = iter(stream); firstValue = next(it) def newGen(): yield firstValue; yield from it yield newGen() except StopIteration: pass
[docs]def startswith(s:str, column:int=None): """Filters out lines that don't start with `s`""" return filt(lambda l: l.startswith(s), column)
[docs]def endswith(s:str, column:int=None): """Filters out lines that don't end with `s`""" return filt(lambda l: l.endswith(s), column)
[docs]def isNumeric(column:int=None): """Filters out a line if that column is not a number""" def f(v): try: float(v); return True except ValueError: return False return filt(f, column)
[docs]def inRange(min:float=None, max:float=None, column:int=None): """Checks whether a column is in range or not""" if min is None: min = float("-inf") if max is None: max = float("inf") return filt(lambda e: e >= min and e < max, column)
class rowsList(BaseCli): """Space-expensive implementation for :class:`rows`, without a lot of flexibility. Just used for slices with negative start/stop really. Don't use this directly, use :class:`rows` instead""" def __init__(self, _slice): self._slice = _slice self.inverted = False def __ror__(self, it:Iterator[str]): it = list(it); full = range(len(it)) rows = full[self._slice] if self.inverted: rows = [e for e in full if e not in rows] for row in rows: yield it[row] def __invert__(self): self.inverted = True; return self
[docs]class rows(BaseCli):
[docs] def __init__(self, *rows:List[int]): """Cuts out specific rows. Space complexity O(1) as a list is not constructed (unless you're using some really weird slices). :param rows: ints for the row indices Example:: "0123456789" | rows(2) | dereference() # returns ["2"] "0123456789" | rows(5, 8) | dereference() # returns ["5", "8"] "0123456789" | rows()[2:5] | dereference() # returns ["2", "3", "4"] "0123456789" | ~rows()[2:5] | dereference() # returns ["0", "1", "5", "6", "7", "8", "9"] "0123456789" | ~rows()[:7:2] | dereference() # returns ['1', '3', '5', '7', '8', '9'] "0123456789" | rows()[:-4] | dereference() # returns ['0', '1', '2', '3', '4', '5'] "0123456789" | ~rows()[:-4] | dereference() # returns ['6', '7', '8', '9']""" if len(rows) == 1 and isinstance(rows[0], slice): s = rows[0] start = s.start if s.start is not None else float("-inf") stop = s.stop if s.stop is not None else float("inf") self.domain = k1lib.Domain([start, stop]) self.every = s.step or 1 # only used for slices really else: self.domain = k1lib.Domain.fromInts(*rows) self.every = 1 self.inverted = False
def _every(self, every): self.every = every; return self def __getitem__(self, _slice): s1, s2 = _slice.start, _slice.stop a = (_slice.start or 0) < 0; b = (_slice.stop or 0) < 0 c = (_slice.step or 1) if a or b: # at least 1 is negative if c == 1: if b: if s1 is None: return head(s2) # [None, -3] else: return ~head(s1) | head(s2) # [5, -3] if a and s2 is None: return ~head(s1) # [-3, None] # else case is [-10, 6], which is weird, so just stick to the long one return rowsList(_slice) # worst case scenario answer = rows(_slice); answer.inverted = self.inverted; return answer
[docs] def __invert__(self): self.inverted = not self.inverted; return self
[docs] def __ror__(self, it:Iterator[str]): true, false = (False, True) if self.inverted else (True, False) def gates(): gate = self.domain.intIter(self.every); x = 0 for i in gate: while x < i: yield false; x += 1 yield true; x += 1 while True: yield false return (row for gate, row in zip(gates(), it) if gate)
[docs]class columns(BaseCli):
[docs] def __init__(self, *columns:List[int]): """Cuts out specific columns, sliceable. Examples:: ["0123456789"] | cut(5, 8) | dereference() # returns [['5', '8']] ["0123456789"] | cut(2) | dereference() # returns ['2'] ["0123456789"] | cut(5, 8) | dereference() # returns [['5', '8']] ["0123456789"] | ~cut()[:7:2] | dereference() # returns [['1', '3', '5', '7', '8', '9']] If you're selecting only 1 column, then Iterator[T] will be returned, not Table[T].""" if len(columns) == 1 and isinstance(columns[0], slice): columns = columns[0] self.columns = columns; self.inverted = False
[docs] def __ror__(self, it:Table[T]) -> Table[T]: columns = self.columns; it = iter(it); row = None row, it = it | cli.sample() if row is None: return iter(range(0)) row = list(row); rs = range(len(row)) if isinstance(columns, slice): columns = set(rs[columns]) if self.inverted: columns = set(e for e in rs if e not in columns) if len(columns) == 1: c = list(columns)[0]; return (row[c] for row in it) else: return ((e for i, e in enumerate(row) if i in columns) for row in it)
def __getitem__(self, idx): answer = columns(idx); answer.inverted = self.inverted; return answer
[docs] def __invert__(self): self.inverted = not self.inverted; return self
cut = columns
[docs]class intersection(BaseCli): """Returns the intersection of multiple streams. Example:: [[1, 2, 3, 4, 5], [7, 2, 4, 6, 5]] | intersection() # will return set([2, 4, 5]) """
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]: answer = None for it in its: if answer is None: answer = set(it); continue answer = answer.intersection(it) return answer