# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for functions that sort of changes the table
structure in a dramatic way. They're the core transformations
"""
from typing import List, Union, Iterator, Callable, Any
from collections import defaultdict, Counter
from k1lib.bioinfo.cli.init import patchDefaultDelim, BaseCli, oneToMany
import k1lib.bioinfo.cli as cli
__all__ = ["joinColumns", "joinRows", "joinStreams",
"splitColumns", "insertRow", "insertIdColumn",
"toDict", "split", "count", "permute", "accumulate", "AA_",
"infinite"]
[docs]class joinColumns(BaseCli):
[docs] def __init__(self, delim:str=None, sep:bool=False):
"""Join multiple columns and loop through all rows
:param sep: if True, don't join row elements into a list, and keep them
separate in a tuple
"""
self.delim = patchDefaultDelim(delim); self.sep = sep
[docs] def __ror__(self, it:Iterator[Iterator[str]]):
if self.sep:
for lineElems in zip(*it): yield lineElems
else:
for lineElems in zip(*it):
yield self.delim.join(lineElems | cli.toStr())
[docs]class joinRows(BaseCli):
"""Join multiple stream of rows"""
[docs] def __ror__(self, streams:Iterator[Iterator[Any]]) -> Iterator[Any]:
for stream in streams: yield from stream
joinStreams = joinRows
[docs]class splitColumns(BaseCli):
[docs] def __init__(self, delim:str=None):
"""Splits lines into multiple columns, and return the columns individually"""
self.delim = patchDefaultDelim(delim)
self.lists = defaultdict(lambda: [])
[docs] def __ror__(self, it):
for line in it:
for i, elem in enumerate(line.split(self.delim)):
self.lists[i].append(elem)
return list(self.lists.values())
[docs]class insertRow(BaseCli):
[docs] def __init__(self, *columns:Union[List[str], str], delim:str=None):
"""Inserts a row right before everything else"""
if isinstance(columns, tuple) and len(columns) == 1 and isinstance(columns[0], (list, tuple)):
columns = columns[0]
self.columns = columns; self.delim = patchDefaultDelim(delim)
[docs] def __ror__(self, it:Iterator[str]):
yield self.delim.join(self.columns)
for e in it: yield e
[docs]def insertIdColumn(begin=True, delim:str=None):
"""Inserts an id column at the beginning (or end)"""
if begin: return (cli.toRange() & cli.identity()) | cli.joinColumns(delim)
else: return (cli.identity() & cli.toRange()) | cli.joinColumns(delim)
[docs]class toDict(BaseCli):
[docs] def __init__(self, keyF:Callable[[Any], str]=None, valueF:Callable[[Any], Any]=None):
"""Transform an incoming stream into a dict using a function for
values. Example::
names = ["wanda", "vision", "loki", "mobius"]
names | toDict(valueF=lambda s: len(s)) # will return {"wanda": 5, "vision": 6, ...}
names | toDict(lambda s: s.title(), lambda s: len(s)) # will return {"Wanda": 5, "Vision": 6, ...}
"""
self.keyF = keyF or (lambda s: s)
self.valueF = valueF or (lambda s: s)
[docs] def __ror__(self, keys:Iterator[str]):
keyF = self.keyF; valueF = self.valueF
return {keyF(key):valueF(key) for key in keys}
[docs]class split(BaseCli):
[docs] def __init__(self, delim:str=None, idx:int=None):
"""Splits each line using a delimiter, and outputs the
parts as a separate line.
:param idx: if available, only outputs the element at that index"""
self.delim = patchDefaultDelim(delim); self.idx = idx
[docs] def __ror__(self, it:Iterator[str]):
if self.idx == None:
for line in it:
for elem in line.split(self.delim): yield elem
else:
for line in it:
elems = line.split(self.delim)
yield elems[self.idx] if self.idx < len(elems) else None
[docs]class count(BaseCli):
[docs] def __init__(self, delim:str=None):
"""Finds unique elements and returns a generator of "{value} {key}"."""
self.delim = patchDefaultDelim(delim)
[docs] def __ror__(self, it:Iterator[str]):
c = Counter(it); s = sum(c.values())
for k, v in c.items():
yield f"{v}{self.delim}{k}{self.delim}{round(100*v/s)}%"
[docs]class permute(BaseCli):
[docs] def __init__(self, permutations:List[int], delim:str=None):
"""Permutes the columns. Acts like torch.permute(...)"""
self.permutations = permutations
self.delim = patchDefaultDelim(delim)
[docs] def __ror__(self, it:Iterator[str]):
for line in it:
elems = line.split(self.delim)
yield self.delim.join([elems[i] for i in self.permutations])
[docs]class accumulate(BaseCli):
[docs] def __init__(self, column:int=0, avg=False, delim:str=None):
"""Groups lines that have the same line.split(delim)[column], and
add together all other columns, assuming they're floats
Args:
column: common column to accumulate
avg: calculate average values instead of sum
delim: specify delimiter between columns"""
self.column = column; self.avg = avg
self.delim = patchDefaultDelim(delim)
self.dict = defaultdict(lambda: defaultdict(lambda: 0))
[docs] def __ror__(self, it:Iterator[str]):
for line in it:
elems = line.split(self.delim); key = elems[self.column]
elems.pop(self.column)
for i, elem in enumerate(elems):
try: self.dict[key][i] += float(elem)
except: self.dict[key][i] = elem
for key, values in self.dict.items():
n = len(self.dict[key].keys())
if self.avg:
for i in range(n):
if isinstance(self.dict[key][i], float):
self.dict[key][i] /= n
elems = [str(self.dict[key][i]) for i in range(n)]
elems.insert(self.column, key)
yield self.delim.join(elems)
[docs]class AA_(BaseCli):
[docs] def __init__(self, *idxs:List[int], wraps=False):
"""Returns 2 streams, one that has the selected element, and the other
the rest. Example::
[1, 5, 6, 3, 7] | AA_(1) # will return [5, [1, 6, 3, 7]]
You can also put multiple indexes through::
[1, 5, 6] | AA_(0, 2) # will return [[1, [5, 6]], [6, [1, 5]]]
If you put None in, then all indexes will be sliced::
[1, 5, 6] | AA_(0, 2)
# will return:
# [[1, [5, 6]],
# [5, [1, 6]],
# [6, [1, 5]]]
As for why the strange name, think of this operation as "AĀ". In statistics,
say you have a set "A", then "not A" is commonly written as A with an overline
"Ā". So "AA\_" represents "AĀ", and that it first returns the selection A
first.
:param wraps: if True, then the first example will return [[5, [1, 6, 3, 7]]]
instead, so that A has the same signature as Ā
"""
self.idxs = idxs; self.wraps = wraps
[docs] def __ror__(self, it:List[Any]) -> List[List[List[Any]]]:
idxs = self.idxs; it = list(it)
if len(idxs) == 1 and idxs[0] is None: idxs = range(len(it))
def gen(idx):
return [it[idx], [v for i, v in enumerate(it) if i != idx]]
if not self.wraps and len(idxs) == 1: return gen(idxs[0])
return [gen(idx) for idx in idxs]
[docs]class infinite(BaseCli):
"""Takes in a stream and yields an infinite amount of them. Example:
.. code-block::
# returns [[1, 2, 3], [1, 2, 3], [1, 2, 3]]
[1, 2, 3] | infinite() | head(3) | toList()
"""
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Iterator[Any]]:
it = list(it)
while True: yield it