Source code for k1lib.bioinfo.cli.modifier

# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for quick modifiers, think of them as changing formats
"""
__all__ = ["apply", "applyMp", "applySingle", "applyS",
           "lstrip", "rstrip", "strip",
           "upper", "lower", "replace", "remove", "toFloat", "toInt",
           "sort", "sortF", "consume", "randomize"]
from typing import Callable, Iterator, Any, Union, List
from k1lib.bioinfo.cli.init import patchDefaultDelim, BaseCli, settings, T
import k1lib.bioinfo.cli as cli, numpy as np, torch
import concurrent.futures as futures
import multiprocessing as mp
from functools import partial
import dill, pickle
def executeFunc(f, *args, **kwargs):
    import dill; return dill.loads(f)(*args, **kwargs)
[docs]class applyMp(BaseCli):
[docs] def __init__(self, f:Callable[[T], T], *args, **kwargs): """Like :class:`apply`, but execute ``f(row)`` of each row in multiple processes. Example:: # returns [3, 2] ["abc", "de"] | applyMp(lambda s: len(s)) | dereference() # returns [5, 6, 9] range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | dereference() Internally, this will continuously spawn new jobs up until 80% of all CPU cores are utilized. As the new processes will not share the same memory space as the main process, you should pass all dependencies in the arguments :param args: arguments to be passed to the function. ``kwargs`` too""" super().__init__(); self.f = f; self.args = args; self.kwargs = kwargs
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]: super().__ror__(it) try: f = dill.dumps(self.f) except TypeError as e: print(f"Error while trying to pickle {self.f}."); raise e args = self.args; kwargs = self.kwargs p = mp.Pool(mp.cpu_count()*4//5) fs = [p.apply_async(executeFunc, [f, line, *args], kwargs) for line in it] return (r.get() for r in fs)
[docs]class apply(BaseCli):
[docs] def __init__(self, f:Callable[[str], str], column:int=None): """Applies a function f to every line :param column: if not None, then applies the function to that column only""" super().__init__(); self.f = f; self.column = column
[docs] def __ror__(self, it:Iterator[str]): super().__ror__(it); f = self.f; c = self.column if c is None: return (f(line) for line in it) else: return ([(e if i != c else f(e)) for i, e in enumerate(row)] for row in it)
[docs]class applySingle(BaseCli):
[docs] def __init__(self, f:Callable[[T], T]): """Like :class:`apply`, but much simpler, just operating on the entire input object, essentially""" super().__init__(); self.f = f
[docs] def __ror__(self, it:T) -> T: super().__ror__(it); return self.f(it)
applyS = applySingle
[docs]def lstrip(column:int=None, char:str=None): """Strips left of every line""" return apply(lambda e: e.lstrip(char), column)
[docs]def rstrip(column:int=None, char:str=None): """Strips right of every line""" return apply(lambda e: e.rstrip(char), column)
[docs]def strip(column:int=None, char:str=None): """Strips both sides of every line""" return apply(lambda e: e.strip(char), column)
[docs]def upper(column:int=None): """Make all characters uppercase""" return apply(lambda e: e.upper(), column)
[docs]def lower(column:int=None): """Make all characters lowercase""" return apply(lambda e: e.lower(), column)
[docs]def replace(s:str, target:str=None, column:int=None): """Replaces substring `s` with `target` for each line.""" t = patchDefaultDelim(target) return apply(lambda e: e.replace(s, t), column)
[docs]def remove(s:str, column:int=None): """Removes a specific substring in each line.""" return replace(s, "", column)
def wrap(f, c): return f if settings["strict"] else cli.isNumeric(c) | f
[docs]def toFloat(*columns:List[int]): """Converts every row into a float. Excludes non numbers if not in :ref:`strict mode <bioinfoSettings>`. Example:: # returns [1, 3, -2.3] ["1", "3", "-2.3"] | toFloat() | dereference() # returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']] [["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | dereference() :param columns: if nothing, then will convert each row. If available, then convert all the specified columns""" if len(columns) > 0: return cli.init.serial(*(wrap(apply(lambda e: float(e), c), c) for c in columns)) else: return wrap(apply(lambda e: float(e), None), None)
[docs]def toInt(*columns:List[int]): """Converts every row into an integer. Excludes non numbers if not in :ref:`strict mode <bioinfoSettings>`. Example:: # returns [1, 3, -2] ["1", "3", "-2.3"] | toInt() | dereference() :param columns: if nothing, then will convert each row. If available, then convert all the specified columns See also: :meth:`toFloat` """ if len(columns) > 0: return cli.init.serial(wrap(apply(lambda e: int(float(e)), c), c) for c in columns) else: return wrap(apply(lambda e: int(float(e)), None), None)
[docs]class sort(BaseCli):
[docs] def __init__(self, column:int=0, numeric=True, reverse=False): """Sorts all lines based on a specific `column`. :param column: if None, sort rows based on themselves and not an element :param numeric: whether to treat column as float :param reverse: False for smaller to bigger, True for bigger to smaller. Use :meth:`__invert__` to quickly reverse the order instead of using this param""" super().__init__() self.column = column; self.reverse = reverse; self.numeric = numeric self.filterF = (lambda x: float(x)) if numeric else (lambda x: x)
[docs] def __ror__(self, it:Iterator[str]): super().__ror__(it) if (c := self.column) is None: return it | cli.wrapList() | cli.transpose() | sort(0, self.numeric, self.reverse) f = self.filterF rows = list(it | cli.isNumeric(c) if self.numeric else it) def sortF(row): if len(row) > c: return f(row[c]) return float("inf") return iter(sorted(rows, key=sortF, reverse=self.reverse))
[docs] def __invert__(self): """Creates a clone that has the opposite sort order""" return sort(self.column, self.numeric, not self.reverse)
[docs]class sortF(BaseCli):
[docs] def __init__(self, f:Callable[[T], float], reverse=False): """Sorts rows using a function. Example:: # returns ['a', 'aa', 'aaa', 'aaaa', 'aaaaa'] ["a", "aaa", "aaaaa", "aa", "aaaa"] | sortF(lambda r: len(r)) | dereference()""" super().__init__(); self.f = f; self.reverse = reverse
[docs] def __ror__(self, it): super().__ror__(it) return iter(sorted(list(it), key=self.f, reverse=self.reverse))
[docs] def __invert__(self): return sortF(self.f, not self.reverse)
[docs]class consume(BaseCli):
[docs] def __init__(self, f:Union[BaseCli, Callable[[T], None]]): r"""Consumes the iterator in a side stream. Returns the iterator. Kinda like the bash command ``tee``. Example:: # prints "0\n1\n2" and returns [0, 1, 2] range(3) | consume(headOut()) | toList() # prints "range(0, 3)" and returns [0, 1, 2] range(3) | consume(lambda it: print(it)) | toList() This is useful whenever you want to mutate something, but don't want to include the function result into the main stream.""" super().__init__(); self.f = f
[docs] def __ror__(self, it:T) -> T: super().__ror__(it); self.f(it); return it
[docs]class randomize(BaseCli):
[docs] def __init__(self, bs=100): """Randomize input stream. In order to be efficient, this does not convert the input iterator to a giant list and yield random values from that. Instead, this fetches ``bs`` items at a time, randomizes them, returns and fetch another ``bs`` items. If you want to do the giant list, then just pass in ``float("inf")``. Example:: # returns [0, 1, 2, 3, 4], effectively no randomize at all range(5) | randomize(1) | dereference() # returns something like this: [1, 0, 2, 3, 5, 4, 6, 8, 7, 9]. You can clearly see the batches range(10) | randomize(3) | dereference() # returns something like this: [7, 0, 5, 2, 4, 9, 6, 3, 1, 8] range(10) | randomize(float("inf")) | dereference()""" super().__init__(); self.bs = bs
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]: super().__ror__(it) for batch in it | cli.batched(self.bs, True): perms = torch.randperm(len(batch)) for idx in perms: yield batch[idx]