# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for quick modifiers, think of them as changing formats
"""
__all__ = ["apply", "applyMp", "applyS",
"lstrip", "rstrip", "strip",
"upper", "lower", "replace", "remove", "toFloat", "toInt",
"sort", "sortF", "consume", "randomize"]
from typing import Callable, Iterator, Any, Union, List
from k1lib.bioinfo.cli.init import patchDefaultDelim, BaseCli, settings, T
import k1lib.bioinfo.cli as cli, numpy as np, torch
import concurrent.futures as futures
import multiprocessing as mp
from functools import partial
import dill, pickle, k1lib
def executeFunc(f, line, args, kwargs):
import dill
f = dill.loads(f)
line = dill.loads(line)
args = dill.loads(args)
kwargs = dill.loads(kwargs)
return f(line, *args, **kwargs)
[docs]class applyMp(BaseCli):
[docs] def __init__(self, f:Callable[[T], T], *args, **kwargs):
"""Like :class:`apply`, but execute ``f(row)`` of each row in
multiple processes. Example::
# returns [3, 2]
["abc", "de"] | applyMp(lambda s: len(s)) | deref()
# returns [5, 6, 9]
range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | deref()
# returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work
someList = [1, 2, 3]
["abc", "de"] | applyMp(lambda s: someList) | deref()
Internally, this will continuously spawn new jobs up until 80% of all CPU
cores are utilized. On posix systems, the default multiprocessing start method is
``fork()``. This sort of means that all the variables in memory will be copied
over. This might be expensive (might also not, with copy-on-write), so you might
have to think about that. On windows and macos, the default start method is
``spawn``, meaning each child process is a completely new interpreter, so you have
to pass in all required variables and reimport every dependencies. Read more at
https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
.. note::
Remember that every :class:`~k1lib.bioinfo.cli.init.BaseCli` is also a
function, meaning that you can do stuff like::
# returns [['ab', 'ac']]
[["ab", "cd", "ac"]] | applyMp(startswith("a") | deref()) | deref()
Also remember that the return result of ``f`` should not be a generator.
That's why in the example above, there's a ``deref()`` inside f.
:param args: extra arguments to be passed to the function. ``kwargs`` too"""
super().__init__(); self.f = f; self.args = args; self.kwargs = kwargs
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
super().__ror__(it)
try: f = dill.dumps(self.f)
except TypeError as e: print(f"Error while trying to pickle {self.f}."); raise e
args = dill.dumps(self.args); kwargs = dill.dumps(self.kwargs)
p = mp.Pool(mp.cpu_count()*4//5)
fs = [p.apply_async(executeFunc, [f, dill.dumps(line), args, kwargs]) for line in it]
return (r.get() for r in fs)
[docs]class apply(BaseCli):
[docs] def __init__(self, f:Callable[[str], str], column:int=None):
"""Applies a function f to every line.
Example::
# returns [0, 1, 4, 9, 16]
range(5) | apply(lambda x: x**2) | deref()
# returns [[3.0, 1.0, 1.0], [3.0, 1.0, 1.0]]
torch.ones(2, 3) | apply(lambda x: x+2, 0) | cli.deref()
You can also use this as a decorator, like this::
@apply
def f(x):
return x**2
# returns [0, 1, 4, 9, 16]
range(5) | f | deref()
:param column: if not None, then applies the function to that column only"""
super().__init__(); self.f = f; self.column = column
[docs] def __ror__(self, it:Iterator[str]):
super().__ror__(it); f = self.f; c = self.column
if c is None: return (f(line) for line in it)
else: return ([(e if i != c else f(e))
for i, e in enumerate(row)] for row in it)
[docs]class applyS(BaseCli):
[docs] def __init__(self, f:Callable[[T], T]):
"""Like :class:`apply`, but much simpler, just operating on the entire input
object, essentially. The "S" stands for "single". Example::
# returns 5
3 | applyS(lambda x: x+2)
Like :class:`apply`, you can also use this as a decorator like this::
@applyS
def f(x):
return x+2
# returns 5
3 | f
"""
super().__init__(); self.f = f
[docs] def __ror__(self, it:T) -> T:
if settings["useCtx"]: super().__ror__(it)
return self.f(it)
[docs]def lstrip(column:int=None, char:str=None):
"""Strips left of every line.
Example::
# returns ['12 ', '34']
[" 12 ", " 34"] | lstrip() | deref()"""
return apply(lambda e: e.lstrip(char), column)
[docs]def rstrip(column:int=None, char:str=None):
"""Strips right of every line"""
return apply(lambda e: e.rstrip(char), column)
[docs]def strip(column:int=None, char:str=None):
"""Strips both sides of every line"""
return apply(lambda e: e.strip(char), column)
[docs]def upper(column:int=None):
"""Makes all characters uppercase.
Example::
# returns ['ABCDE', '123R']
["abcde", "123r"] | upper() | deref()"""
return apply(lambda e: e.upper(), column)
[docs]def lower(column:int=None):
"""Makes all characters lowercase"""
return apply(lambda e: e.lower(), column)
[docs]def replace(s:str, target:str=None, column:int=None):
"""Replaces substring `s` with `target` for each line.
Example::
# returns ['104', 'ab0c']
["1234", "ab23c"] | replace("23", "0") | deref()
:param target: if not specified, then use the default delimiter specified
in bioinfo settings"""
t = patchDefaultDelim(target)
return apply(lambda e: e.replace(s, t), column)
[docs]def remove(s:str, column:int=None):
"""Removes a specific substring in each line."""
return replace(s, "", column)
def _op(toOp, c, force, defaultValue):
return apply(toOp, c) | (apply(lambda x: x or defaultValue, c) if force else (~cli.isValue(None, c)))
def _toFloat(e) -> Union[float, None]:
try: return float(e)
except: return None
[docs]def toFloat(*columns:List[int], force=False):
"""Converts every row into a float. Example::
# returns [1, 3, -2.3]
["1", "3", "-2.3"] | toFloat() | deref()
# returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']]
[["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | deref()
With weird rows::
# returns [[1.0, 'a'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0) | deref()
# returns [[1.0, 'a'], [0.0, 'b'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0, force=True) | deref()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param force: if True, forces weird values to 0.0, else filters out all weird rows"""
if len(columns) > 0:
return cli.init.serial(*(_op(_toFloat, c, force, 0.0) for c in columns))
else: return _op(_toFloat, None, force, 0.0)
def _toInt(e) -> Union[int, None]:
try: return int(float(e))
except: return None
[docs]def toInt(*columns:List[int], force=False):
"""Converts every row into an integer. Example::
# returns [1, 3, -2]
["1", "3", "-2.3"] | toInt() | deref()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param force: if True, forces weird values to 0, else filters out all weird rows
See also: :meth:`toFloat`"""
if len(columns) > 0:
return cli.init.serial(*(_op(_toInt, c, force, 0) for c in columns))
else: return _op(_toInt, None, force, 0)
[docs]class sort(BaseCli):
[docs] def __init__(self, column:int=0, numeric=True, reverse=False):
"""Sorts all lines based on a specific `column`.
Example::
# returns [[5, 'a'], [1, 'b']]
[[1, "b"], [5, "a"]] | ~sort(0) | deref()
# returns [[2, 3]]
[[1, "b"], [5, "a"], [2, 3]] | ~sort(1) | deref()
# errors out, as you can't really compare str with int
[[1, "b"], [2, 3], [5, "a"]] | sort(1, False) | deref()
:param column: if None, sort rows based on themselves and not an element
:param numeric: whether to convert column to float
:param reverse: False for smaller to bigger, True for bigger to smaller. Use
:meth:`__invert__` to quickly reverse the order instead of using this param"""
super().__init__()
self.column = column; self.reverse = reverse; self.numeric = numeric
self.filterF = (lambda x: float(x)) if numeric else (lambda x: x)
[docs] def __ror__(self, it:Iterator[str]):
super().__ror__(it); c = self.column
if c is None:
return it | cli.wrapList() | cli.transpose() | sort(0, self.numeric, self.reverse)
f = self.filterF
rows = list(it | cli.isNumeric(c) if self.numeric else it)
def sortF(row):
if len(row) > c: return f(row[c])
return float("inf")
return iter(sorted(rows, key=sortF, reverse=self.reverse))
[docs] def __invert__(self):
"""Creates a clone that has the opposite sort order"""
return sort(self.column, self.numeric, not self.reverse)
[docs]class sortF(BaseCli):
[docs] def __init__(self, f:Callable[[T], float], reverse=False):
"""Sorts rows using a function.
Example::
# returns ['a', 'aa', 'aaa', 'aaaa', 'aaaaa']
["a", "aaa", "aaaaa", "aa", "aaaa"] | sortF(lambda r: len(r)) | deref()
# returns ['aaaaa', 'aaaa', 'aaa', 'aa', 'a']
["a", "aaa", "aaaaa", "aa", "aaaa"] | ~sortF(lambda r: len(r)) | deref()"""
super().__init__(); self.f = f; self.reverse = reverse
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
super().__ror__(it)
return iter(sorted(list(it), key=self.f, reverse=self.reverse))
[docs] def __invert__(self) -> "sortF":
return sortF(self.f, not self.reverse)
[docs]class consume(BaseCli):
[docs] def __init__(self, f:Union[BaseCli, Callable[[T], None]]):
r"""Consumes the iterator in a side stream. Returns the iterator.
Kinda like the bash command ``tee``. Example::
# prints "0\n1\n2" and returns [0, 1, 2]
range(3) | consume(headOut()) | toList()
# prints "range(0, 3)" and returns [0, 1, 2]
range(3) | consume(lambda it: print(it)) | toList()
This is useful whenever you want to mutate something, but don't want to
include the function result into the main stream."""
super().__init__(); self.f = f
[docs] def __ror__(self, it:T) -> T:
super().__ror__(it); self.f(it); return it
[docs]class randomize(BaseCli):
[docs] def __init__(self, bs=100):
"""Randomize input stream. In order to be efficient, this does not
convert the input iterator to a giant list and yield random values from that.
Instead, this fetches ``bs`` items at a time, randomizes them, returns and
fetch another ``bs`` items. If you want to do the giant list, then just pass
in ``float("inf")``, or ``None``. Example::
# returns [0, 1, 2, 3, 4], effectively no randomize at all
range(5) | randomize(1) | deref()
# returns something like this: [1, 0, 2, 3, 5, 4, 6, 8, 7, 9]. You can clearly see the batches
range(10) | randomize(3) | deref()
# returns something like this: [7, 0, 5, 2, 4, 9, 6, 3, 1, 8]
range(10) | randomize(float("inf")) | deref()
# same as above
range(10) | randomize(None) | deref()"""
super().__init__(); self.bs = bs if bs != None else float("inf")
[docs] def __ror__(self, it:Iterator[T]) -> Iterator[T]:
super().__ror__(it)
for batch in it | cli.batched(self.bs, True):
batch = list(batch); perms = torch.randperm(len(batch))
for idx in perms: yield batch[idx]