# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for quick modifiers, think of them as changing formats
"""
__all__ = ["apply", "applyMp", "applySingle", "applyS",
"lstrip", "rstrip", "strip",
"upper", "lower", "replace", "remove", "toFloat", "toInt",
"sort", "sortF", "consume", "randomize"]
from typing import Callable, Iterator, Any, Union, List
from k1lib.bioinfo.cli.init import patchDefaultDelim, BaseCli, settings, T
import k1lib.bioinfo.cli as cli, numpy as np, torch
import concurrent.futures as futures
import multiprocessing as mp
from functools import partial
import dill, pickle
def executeFunc(f, *args, **kwargs):
import dill; return dill.loads(f)(*args, **kwargs)
[docs]class applyMp(BaseCli):
[docs] def __init__(self, f:Callable[[T], T], *args, **kwargs):
"""Like :class:`apply`, but execute ``f(row)`` of each row in
multiple processes. Example::
# returns [3, 2]
["abc", "de"] | applyMp(lambda s: len(s)) | dereference()
# returns [5, 6, 9]
range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | dereference()
Internally, this will continuously spawn new jobs up until 80% of all CPU
cores are utilized. As the new processes will not share the same memory
space as the main process, you should pass all dependencies in the arguments
:param args: arguments to be passed to the function. ``kwargs`` too"""
super().__init__(); self.f = f; self.args = args; self.kwargs = kwargs
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
super().__ror__(it)
try: f = dill.dumps(self.f)
except TypeError as e: print(f"Error while trying to pickle {self.f}."); raise e
args = self.args; kwargs = self.kwargs
p = mp.Pool(mp.cpu_count()*4//5)
fs = [p.apply_async(executeFunc, [f, line, *args], kwargs) for line in it]
return (r.get() for r in fs)
[docs]class apply(BaseCli):
[docs] def __init__(self, f:Callable[[str], str], column:int=None):
"""Applies a function f to every line
:param column: if not None, then applies the function to that column only"""
super().__init__(); self.f = f; self.column = column
[docs] def __ror__(self, it:Iterator[str]):
super().__ror__(it); f = self.f; c = self.column
if c is None: return (f(line) for line in it)
else: return ([(e if i != c else f(e))
for i, e in enumerate(row)] for row in it)
[docs]class applySingle(BaseCli):
[docs] def __init__(self, f:Callable[[T], T]):
"""Like :class:`apply`, but much simpler, just operating on the entire input
object, essentially"""
super().__init__(); self.f = f
[docs] def __ror__(self, it:T) -> T:
super().__ror__(it); return self.f(it)
applyS = applySingle
[docs]def lstrip(column:int=None, char:str=None):
"""Strips left of every line"""
return apply(lambda e: e.lstrip(char), column)
[docs]def rstrip(column:int=None, char:str=None):
"""Strips right of every line"""
return apply(lambda e: e.rstrip(char), column)
[docs]def strip(column:int=None, char:str=None):
"""Strips both sides of every line"""
return apply(lambda e: e.strip(char), column)
[docs]def upper(column:int=None):
"""Make all characters uppercase"""
return apply(lambda e: e.upper(), column)
[docs]def lower(column:int=None):
"""Make all characters lowercase"""
return apply(lambda e: e.lower(), column)
[docs]def replace(s:str, target:str=None, column:int=None):
"""Replaces substring `s` with `target` for each line."""
t = patchDefaultDelim(target)
return apply(lambda e: e.replace(s, t), column)
[docs]def remove(s:str, column:int=None):
"""Removes a specific substring in each line."""
return replace(s, "", column)
def wrap(f, c): return f if settings["strict"] else cli.isNumeric(c) | f
[docs]def toFloat(*columns:List[int]):
"""Converts every row into a float. Excludes non numbers if not in
:ref:`strict mode <bioinfoSettings>`. Example::
# returns [1, 3, -2.3]
["1", "3", "-2.3"] | toFloat() | dereference()
# returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']]
[["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | dereference()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns"""
if len(columns) > 0:
return cli.init.serial(*(wrap(apply(lambda e: float(e), c), c) for c in columns))
else: return wrap(apply(lambda e: float(e), None), None)
[docs]def toInt(*columns:List[int]):
"""Converts every row into an integer. Excludes non numbers if not in
:ref:`strict mode <bioinfoSettings>`. Example::
# returns [1, 3, -2]
["1", "3", "-2.3"] | toInt() | dereference()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
See also: :meth:`toFloat`
"""
if len(columns) > 0:
return cli.init.serial(wrap(apply(lambda e: int(float(e)), c), c) for c in columns)
else: return wrap(apply(lambda e: int(float(e)), None), None)
[docs]class sort(BaseCli):
[docs] def __init__(self, column:int=0, numeric=True, reverse=False):
"""Sorts all lines based on a specific `column`.
:param column: if None, sort rows based on themselves and not an element
:param numeric: whether to treat column as float
:param reverse: False for smaller to bigger, True for bigger to smaller. Use
:meth:`__invert__` to quickly reverse the order instead of using this param"""
super().__init__()
self.column = column; self.reverse = reverse; self.numeric = numeric
self.filterF = (lambda x: float(x)) if numeric else (lambda x: x)
[docs] def __ror__(self, it:Iterator[str]):
super().__ror__(it)
if (c := self.column) is None:
return it | cli.wrapList() | cli.transpose() | sort(0, self.numeric, self.reverse)
f = self.filterF
rows = list(it | cli.isNumeric(c) if self.numeric else it)
def sortF(row):
if len(row) > c: return f(row[c])
return float("inf")
return iter(sorted(rows, key=sortF, reverse=self.reverse))
[docs] def __invert__(self):
"""Creates a clone that has the opposite sort order"""
return sort(self.column, self.numeric, not self.reverse)
[docs]class sortF(BaseCli):
[docs] def __init__(self, f:Callable[[T], float], reverse=False):
"""Sorts rows using a function.
Example::
# returns ['a', 'aa', 'aaa', 'aaaa', 'aaaaa']
["a", "aaa", "aaaaa", "aa", "aaaa"] | sortF(lambda r: len(r)) | dereference()"""
super().__init__(); self.f = f; self.reverse = reverse
[docs] def __ror__(self, it):
super().__ror__(it)
return iter(sorted(list(it), key=self.f, reverse=self.reverse))
[docs] def __invert__(self):
return sortF(self.f, not self.reverse)
[docs]class consume(BaseCli):
[docs] def __init__(self, f:Union[BaseCli, Callable[[T], None]]):
r"""Consumes the iterator in a side stream. Returns the iterator.
Kinda like the bash command ``tee``. Example::
# prints "0\n1\n2" and returns [0, 1, 2]
range(3) | consume(headOut()) | toList()
# prints "range(0, 3)" and returns [0, 1, 2]
range(3) | consume(lambda it: print(it)) | toList()
This is useful whenever you want to mutate something, but don't want to
include the function result into the main stream."""
super().__init__(); self.f = f
[docs] def __ror__(self, it:T) -> T:
super().__ror__(it); self.f(it); return it
[docs]class randomize(BaseCli):
[docs] def __init__(self, bs=100):
"""Randomize input stream. In order to be efficient, this does not
convert the input iterator to a giant list and yield random values from that.
Instead, this fetches ``bs`` items at a time, randomizes them, returns and
fetch another ``bs`` items. If you want to do the giant list, then just pass
in ``float("inf")``. Example::
# returns [0, 1, 2, 3, 4], effectively no randomize at all
range(5) | randomize(1) | dereference()
# returns something like this: [1, 0, 2, 3, 5, 4, 6, 8, 7, 9]. You can clearly see the batches
range(10) | randomize(3) | dereference()
# returns something like this: [7, 0, 5, 2, 4, 9, 6, 3, 1, 8]
range(10) | randomize(float("inf")) | dereference()"""
super().__init__(); self.bs = bs
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
super().__ror__(it)
for batch in it | cli.batched(self.bs, True):
perms = torch.randperm(len(batch))
for idx in perms: yield batch[idx]