Source code for k1lib.bioinfo.cli.modifier

# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for quick modifiers, think of them as changing formats
"""
__all__ = ["apply", "applyMp", "applyS",
           "lstrip", "rstrip", "strip",
           "upper", "lower", "replace", "remove", "toFloat", "toInt",
           "sort", "sortF", "consume", "randomize"]
from typing import Callable, Iterator, Any, Union, List
from k1lib.bioinfo.cli.init import patchDefaultDelim, BaseCli, settings, T
import k1lib.bioinfo.cli as cli, numpy as np, torch
import concurrent.futures as futures
import multiprocessing as mp
from functools import partial
import dill, pickle, k1lib
def executeFunc(f, line, args, kwargs):
    import dill
    f = dill.loads(f)
    line = dill.loads(line)
    args = dill.loads(args)
    kwargs = dill.loads(kwargs)
    return f(line, *args, **kwargs)
[docs]class applyMp(BaseCli):
[docs] def __init__(self, f:Callable[[T], T], *args, **kwargs): """Like :class:`apply`, but execute ``f(row)`` of each row in multiple processes. Example:: # returns [3, 2] ["abc", "de"] | applyMp(lambda s: len(s)) | deref() # returns [5, 6, 9] range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | deref() # returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work someList = [1, 2, 3] ["abc", "de"] | applyMp(lambda s: someList) | deref() Internally, this will continuously spawn new jobs up until 80% of all CPU cores are utilized. On posix systems, the default multiprocessing start method is ``fork()``. This sort of means that all the variables in memory will be copied over. This might be expensive (might also not, with copy-on-write), so you might have to think about that. On windows and macos, the default start method is ``spawn``, meaning each child process is a completely new interpreter, so you have to pass in all required variables and reimport every dependencies. Read more at https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods .. note:: Remember that every :class:`~k1lib.bioinfo.cli.init.BaseCli` is also a function, meaning that you can do stuff like:: # returns [['ab', 'ac']] [["ab", "cd", "ac"]] | applyMp(startswith("a") | deref()) | deref() Also remember that the return result of ``f`` should not be a generator. That's why in the example above, there's a ``deref()`` inside f. :param args: extra arguments to be passed to the function. ``kwargs`` too""" super().__init__(); self.f = f; self.args = args; self.kwargs = kwargs
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]: super().__ror__(it) try: f = dill.dumps(self.f) except TypeError as e: print(f"Error while trying to pickle {self.f}."); raise e args = dill.dumps(self.args); kwargs = dill.dumps(self.kwargs) p = mp.Pool(mp.cpu_count()*4//5) fs = [p.apply_async(executeFunc, [f, dill.dumps(line), args, kwargs]) for line in it] return (r.get() for r in fs)
[docs]class apply(BaseCli):
[docs] def __init__(self, f:Callable[[str], str], column:int=None): """Applies a function f to every line. Example:: # returns [0, 1, 4, 9, 16] range(5) | apply(lambda x: x**2) | deref() # returns [[3.0, 1.0, 1.0], [3.0, 1.0, 1.0]] torch.ones(2, 3) | apply(lambda x: x+2, 0) | cli.deref() You can also use this as a decorator, like this:: @apply def f(x): return x**2 # returns [0, 1, 4, 9, 16] range(5) | f | deref() :param column: if not None, then applies the function to that column only""" super().__init__(); self.f = f; self.column = column
[docs] def __ror__(self, it:Iterator[str]): super().__ror__(it); f = self.f; c = self.column if c is None: return (f(line) for line in it) else: return ([(e if i != c else f(e)) for i, e in enumerate(row)] for row in it)
[docs]class applyS(BaseCli):
[docs] def __init__(self, f:Callable[[T], T]): """Like :class:`apply`, but much simpler, just operating on the entire input object, essentially. The "S" stands for "single". Example:: # returns 5 3 | applyS(lambda x: x+2) Like :class:`apply`, you can also use this as a decorator like this:: @applyS def f(x): return x+2 # returns 5 3 | f """ super().__init__(); self.f = f
[docs] def __ror__(self, it:T) -> T: if settings["useCtx"]: super().__ror__(it) return self.f(it)
[docs]def lstrip(column:int=None, char:str=None): """Strips left of every line. Example:: # returns ['12 ', '34'] [" 12 ", " 34"] | lstrip() | deref()""" return apply(lambda e: e.lstrip(char), column)
[docs]def rstrip(column:int=None, char:str=None): """Strips right of every line""" return apply(lambda e: e.rstrip(char), column)
[docs]def strip(column:int=None, char:str=None): """Strips both sides of every line""" return apply(lambda e: e.strip(char), column)
[docs]def upper(column:int=None): """Makes all characters uppercase. Example:: # returns ['ABCDE', '123R'] ["abcde", "123r"] | upper() | deref()""" return apply(lambda e: e.upper(), column)
[docs]def lower(column:int=None): """Makes all characters lowercase""" return apply(lambda e: e.lower(), column)
[docs]def replace(s:str, target:str=None, column:int=None): """Replaces substring `s` with `target` for each line. Example:: # returns ['104', 'ab0c'] ["1234", "ab23c"] | replace("23", "0") | deref() :param target: if not specified, then use the default delimiter specified in bioinfo settings""" t = patchDefaultDelim(target) return apply(lambda e: e.replace(s, t), column)
[docs]def remove(s:str, column:int=None): """Removes a specific substring in each line.""" return replace(s, "", column)
def _op(toOp, c, force, defaultValue): return apply(toOp, c) | (apply(lambda x: x or defaultValue, c) if force else (~cli.isValue(None, c))) def _toFloat(e) -> Union[float, None]: try: return float(e) except: return None
[docs]def toFloat(*columns:List[int], force=False): """Converts every row into a float. Example:: # returns [1, 3, -2.3] ["1", "3", "-2.3"] | toFloat() | deref() # returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']] [["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | deref() With weird rows:: # returns [[1.0, 'a'], [8.0, 'c']] [["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0) | deref() # returns [[1.0, 'a'], [0.0, 'b'], [8.0, 'c']] [["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0, force=True) | deref() :param columns: if nothing, then will convert each row. If available, then convert all the specified columns :param force: if True, forces weird values to 0.0, else filters out all weird rows""" if len(columns) > 0: return cli.init.serial(*(_op(_toFloat, c, force, 0.0) for c in columns)) else: return _op(_toFloat, None, force, 0.0)
def _toInt(e) -> Union[int, None]: try: return int(float(e)) except: return None
[docs]def toInt(*columns:List[int], force=False): """Converts every row into an integer. Example:: # returns [1, 3, -2] ["1", "3", "-2.3"] | toInt() | deref() :param columns: if nothing, then will convert each row. If available, then convert all the specified columns :param force: if True, forces weird values to 0, else filters out all weird rows See also: :meth:`toFloat`""" if len(columns) > 0: return cli.init.serial(*(_op(_toInt, c, force, 0) for c in columns)) else: return _op(_toInt, None, force, 0)
[docs]class sort(BaseCli):
[docs] def __init__(self, column:int=0, numeric=True, reverse=False): """Sorts all lines based on a specific `column`. Example:: # returns [[5, 'a'], [1, 'b']] [[1, "b"], [5, "a"]] | ~sort(0) | deref() # returns [[2, 3]] [[1, "b"], [5, "a"], [2, 3]] | ~sort(1) | deref() # errors out, as you can't really compare str with int [[1, "b"], [2, 3], [5, "a"]] | sort(1, False) | deref() :param column: if None, sort rows based on themselves and not an element :param numeric: whether to convert column to float :param reverse: False for smaller to bigger, True for bigger to smaller. Use :meth:`__invert__` to quickly reverse the order instead of using this param""" super().__init__() self.column = column; self.reverse = reverse; self.numeric = numeric self.filterF = (lambda x: float(x)) if numeric else (lambda x: x)
[docs] def __ror__(self, it:Iterator[str]): super().__ror__(it); c = self.column if c is None: return it | cli.wrapList() | cli.transpose() | sort(0, self.numeric, self.reverse) f = self.filterF rows = list(it | cli.isNumeric(c) if self.numeric else it) def sortF(row): if len(row) > c: return f(row[c]) return float("inf") return iter(sorted(rows, key=sortF, reverse=self.reverse))
[docs] def __invert__(self): """Creates a clone that has the opposite sort order""" return sort(self.column, self.numeric, not self.reverse)
[docs]class sortF(BaseCli):
[docs] def __init__(self, f:Callable[[T], float], reverse=False): """Sorts rows using a function. Example:: # returns ['a', 'aa', 'aaa', 'aaaa', 'aaaaa'] ["a", "aaa", "aaaaa", "aa", "aaaa"] | sortF(lambda r: len(r)) | deref() # returns ['aaaaa', 'aaaa', 'aaa', 'aa', 'a'] ["a", "aaa", "aaaaa", "aa", "aaaa"] | ~sortF(lambda r: len(r)) | deref()""" super().__init__(); self.f = f; self.reverse = reverse
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]: super().__ror__(it) return iter(sorted(list(it), key=self.f, reverse=self.reverse))
[docs] def __invert__(self) -> "sortF": return sortF(self.f, not self.reverse)
[docs]class consume(BaseCli):
[docs] def __init__(self, f:Union[BaseCli, Callable[[T], None]]): r"""Consumes the iterator in a side stream. Returns the iterator. Kinda like the bash command ``tee``. Example:: # prints "0\n1\n2" and returns [0, 1, 2] range(3) | consume(headOut()) | toList() # prints "range(0, 3)" and returns [0, 1, 2] range(3) | consume(lambda it: print(it)) | toList() This is useful whenever you want to mutate something, but don't want to include the function result into the main stream.""" super().__init__(); self.f = f
[docs] def __ror__(self, it:T) -> T: super().__ror__(it); self.f(it); return it
[docs]class randomize(BaseCli):
[docs] def __init__(self, bs=100): """Randomize input stream. In order to be efficient, this does not convert the input iterator to a giant list and yield random values from that. Instead, this fetches ``bs`` items at a time, randomizes them, returns and fetch another ``bs`` items. If you want to do the giant list, then just pass in ``float("inf")``, or ``None``. Example:: # returns [0, 1, 2, 3, 4], effectively no randomize at all range(5) | randomize(1) | deref() # returns something like this: [1, 0, 2, 3, 5, 4, 6, 8, 7, 9]. You can clearly see the batches range(10) | randomize(3) | deref() # returns something like this: [7, 0, 5, 2, 4, 9, 6, 3, 1, 8] range(10) | randomize(float("inf")) | deref() # same as above range(10) | randomize(None) | deref()""" super().__init__(); self.bs = bs if bs != None else float("inf")
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]: super().__ror__(it) for batch in it | cli.batched(self.bs, True): batch = list(batch); perms = torch.randperm(len(batch)) for idx in perms: yield batch[idx]