Source code for k1lib.bioinfo.cli.structural

# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for functions that sort of changes the table
structure in a dramatic way. They're the core transformations
"""
from typing import List, Union, Iterator, Callable, Any, Tuple, Dict
from collections import defaultdict, Counter
from k1lib.bioinfo.cli.init import patchDefaultDelim, BaseCli, oneToMany, T, Table
import k1lib.bioinfo.cli as cli
import itertools
__all__ = ["joinColumns", "transpose", "splitColumns", "joinList", "joinStreams",
           "insertRow", "insertColumn", "insertIdColumn",
           "toDict", "split", "table", "stitch", "listToTable",
           "count", "accumulate", "AA_", "infinite"]
[docs]class joinColumns(BaseCli):
[docs] def __init__(self, fillValue=None): """Join multiple columns and loop through all rows. Aka transpose. :param fillValue: if not None, then will try to zip longest with this fill value Example:: # returns [[1, 4], [2, 5], [3, 6]] [[1, 2, 3], [4, 5, 6]] | joinColumns() | dereference() # returns [[1, 4], [2, 5], [3, 6], [0, 7]] [[1, 2, 3], [4, 5, 6, 7]] | joinColumns(0) | dereference()""" self.fillValue = fillValue
[docs] def __ror__(self, it:Iterator[Iterator[T]]) -> Table[T]: if self.fillValue is None: yield from zip(*it) else: yield from itertools.zip_longest(*it, fillvalue=self.fillValue)
splitColumns = transpose = joinColumns
[docs]class joinList(BaseCli):
[docs] def __init__(self, element=None, begin=True): """Join element into list. :param element: the element to insert. If None, then takes the input [e, [...]], else takes the input [...] as usual Example:: # returns [5, 2, 6, 8] [5, [2, 6, 8]] | joinList() # also returns [5, 2, 6, 8] [2, 6, 8] | joinList(5) """ self.element = element; self.begin = begin
[docs] def __ror__(self, it:Tuple[T, Iterator[T]]) -> Iterator[T]: it = iter(it) if self.element is None: if self.begin: yield next(it); yield from next(it) else: e = next(it); yield from next(it); yield e else: if self.begin: yield self.element; yield from it else: yield from it; yield self.element
[docs]class joinStreams(BaseCli): """Join multiple streams. Example:: # returns [1, 2, 3, 4, 5] [[1, 2, 3], [4, 5]] | joinStreams() | dereference() """
[docs] def __ror__(self, streams:Iterator[Iterator[T]]) -> Iterator[T]: for stream in streams: yield from stream
[docs]def insertRow(*row:List[T]): """Inserts a row right before every other rows. See also: :meth:`joinList`.""" return joinList(row)
[docs]def insertColumn(*column, begin=True): """Inserts a column at beginning or end. Example:: # returns [['a', 1, 2], ['b', 3, 4]] [[1, 2], [3, 4]] | insertColumn("a", "b") | dereference() """ return transpose() | joinList(column, begin) | transpose()
[docs]def insertIdColumn(table=False, begin=True): """Inserts an id column at the beginning (or end). :param table: if False, then insert column to an Iterator[str], else treat input as a full fledged table Example:: # returns [[0, 'a', 2], [1, 'b', 4]] [["a", 2], ["b", 4]] | insertIdColumn(True) | dereference() # returns [[0, 'a'], [1, 'b']] "ab" | insertIdColumn()""" f = (cli.toRange() & transpose()) | joinList(begin=begin) | transpose() if table: return f else: return cli.wrapList() | transpose() | f
[docs]class toDict(BaseCli):
[docs] def __init__(self, keyF:Callable[[Any], str]=None, valueF:Callable[[Any], Any]=None): """Transform an incoming stream into a dict using a function for values. Example:: names = ["wanda", "vision", "loki", "mobius"] names | toDict(valueF=lambda s: len(s)) # will return {"wanda": 5, "vision": 6, ...} names | toDict(lambda s: s.title(), lambda s: len(s)) # will return {"Wanda": 5, "Vision": 6, ...} """ self.keyF = keyF or (lambda s: s) self.valueF = valueF or (lambda s: s)
[docs] def __ror__(self, keys:Iterator[Any]) -> Dict[Any, Any]: keyF = self.keyF; valueF = self.valueF return {keyF(key):valueF(key) for key in keys}
[docs]class split(BaseCli):
[docs] def __init__(self, delim:str=None, idx:int=None): """Splits each line using a delimiter, and outputs the parts as a separate line. :param idx: if available, only outputs the element at that index""" self.delim = patchDefaultDelim(delim); self.idx = idx
[docs] def __ror__(self, it:Iterator[str]): if self.idx == None: for line in it: for elem in line.split(self.delim): yield elem else: for line in it: elems = line.split(self.delim) yield elems[self.idx] if self.idx < len(elems) else None
[docs]class table(BaseCli):
[docs] def __init__(self, delim:str=None): """Splits lines to rows (List[str]) using a delimiter. Example:: # returns [['a', 'bd'], ['1', '2', '3']] ["a|bd", "1|2|3"] | table("|") | dereference() """ self.delim = patchDefaultDelim(delim)
[docs] def __ror__(self, it:Iterator[str]) -> Table[str]: return (line.split(self.delim) for line in it)
[docs]class stitch(BaseCli):
[docs] def __init__(self, delim:str=None): """Stitches elements in a row together, so they become a simple string. See also: :class:`k1lib.bioinfo.cli.output.pretty`""" self.delim = patchDefaultDelim(delim)
[docs] def __ror__(self, it:Table[str]) -> Iterator[str]: d = self.delim for row in it: yield d.join(row)
[docs]def listToTable(): """Turns Iterator[T] into Table[T]""" return cli.wrapList() | transpose()
[docs]class count(BaseCli): """Finds unique elements and returns a table with [frequency, value, percent] columns. Example:: # returns [[1, 'a', '33%'], [2, 'b', '67%']] ['a', 'b', 'b'] | count() | dereference() """
[docs] def __ror__(self, it:Iterator[str]): c = Counter(it); s = sum(c.values()) for k, v in c.items(): yield [v, k, f"{round(100*v/s)}%"]
[docs]class accumulate(BaseCli):
[docs] def __init__(self, columnIdx:int=0, avg=False): """Groups lines that have the same row[columnIdx], and add together all other columns, assuming they're numbers :param columnIdx: common column index to accumulate :param avg: calculate average values instead of sum""" self.columnIdx = columnIdx; self.avg = avg self.dict = defaultdict(lambda: defaultdict(lambda: 0))
[docs] def __ror__(self, it:Iterator[str]): for row in it: row = list(row); key = row[self.columnIdx] row.pop(self.columnIdx) for i, e in enumerate(row): try: self.dict[key][i] += float(e) except: self.dict[key][i] = e for key, values in self.dict.items(): n = len(self.dict[key].keys()) if self.avg: for i in range(n): if isinstance(self.dict[key][i], (int, float)): self.dict[key][i] /= n elems = [str(self.dict[key][i]) for i in range(n)] elems.insert(self.columnIdx, key) yield elems
[docs]class AA_(BaseCli):
[docs] def __init__(self, *idxs:List[int], wraps=False): """Returns 2 streams, one that has the selected element, and the other the rest. Example:: [1, 5, 6, 3, 7] | AA_(1) # will return [5, [1, 6, 3, 7]] You can also put multiple indexes through:: [1, 5, 6] | AA_(0, 2) # will return [[1, [5, 6]], [6, [1, 5]]] If you put None in, then all indexes will be sliced:: [1, 5, 6] | AA_(0, 2) # will return: # [[1, [5, 6]], # [5, [1, 6]], # [6, [1, 5]]] As for why the strange name, think of this operation as "AĀ". In statistics, say you have a set "A", then "not A" is commonly written as A with an overline "Ā". So "AA\_" represents "AĀ", and that it first returns the selection A first. :param wraps: if True, then the first example will return [[5, [1, 6, 3, 7]]] instead, so that A has the same signature as Ā """ self.idxs = idxs; self.wraps = wraps
[docs] def __ror__(self, it:List[Any]) -> List[List[List[Any]]]: idxs = self.idxs; it = list(it) if len(idxs) == 1 and idxs[0] is None: idxs = range(len(it)) def gen(idx): return [it[idx], [v for i, v in enumerate(it) if i != idx]] if not self.wraps and len(idxs) == 1: return gen(idxs[0]) return [gen(idx) for idx in idxs]
[docs]class infinite(BaseCli): """Yields an infinite amount of the passed in object. Example: .. code-block:: # returns [[1, 2, 3], [1, 2, 3], [1, 2, 3]] [1, 2, 3] | infinite() | head(3) | toList() """
[docs] def __ror__(self, o:T) -> Iterator[T]: try: o = list(o) except: pass while True: yield o