Source code for k1lib.cli.filt

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for functions that cuts out specific parts of the table
"""
from typing import Callable, Union, List, overload, Iterator, Any, Set, Tuple
from k1lib.cli.init import BaseCli, fastF
import k1lib.cli as cli; import k1lib, os, math, traceback
from k1lib.cli.typehint import *
import numpy as np; from collections import deque
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["filt", "filter_", "inSet", "contains", "empty",
           "isNumeric", "instanceOf",
           "head", "tail", "cut", "rows",
           "intersection", "union", "unique", "breakIf", "mask", "tryout", "resume"]
settings = k1lib.settings.cli
[docs]class filt(BaseCli): # filt
[docs] def __init__(self, predicate:Callable[[Any], bool], column:Union[int, List[int]]=None, catchErrors:bool=False): # filt """Filters out elements. Examples:: # returns [2, 6], grabbing all the even elements [2, 3, 5, 6] | filt(lambda x: x%2 == 0) | deref() # returns [3, 5], grabbing all the odd elements [2, 3, 5, 6] | ~filt(lambda x: x%2 == 0) | deref() # returns [[2, 'a'], [6, 'c']], grabbing all the even elements in the 1st column [[2, "a"], [3, "b"], [5, "a"], [6, "c"]] | filt(lambda x: x%2 == 0, 0) | deref() # throws error, because strings can't mod divide [1, 2, "b", 8] | filt(lambda x: x % 2 == 0) | deref() # returns [2, 8] [1, 2, "b", 8] | filt(lambda x: x % 2 == 0, catchErrors=True) | deref() You can also pass in :class:`~k1lib.cli.modifier.op` or string, for extra intuitiveness and quickness:: # returns [2, 6] [2, 3, 5, 6] | filt(op() % 2 == 0) | deref() # returns ['abc', 'a12'] ["abc", "def", "a12"] | filt(op().startswith("a")) | deref() # returns [3, 4, 5, 6, 7, 8, 9] range(100) | filt(3 <= op() < 10) | deref() # returns [3, 4, 5, 6, 7, 8, 9] range(100) | filt("3 <= x < 10") | deref() See :class:`~k1lib.cli.modifier.aS` for more details on string mode. If you pass in :class:`numpy.ndarray` or :class:`torch.Tensor`, then it will automatically use the C-accelerated versions if possible, like this:: # returns np.array([2, 3, 4]), instead of iter([2, 3, 4]) np.array([1, 2, 3, 4]) | filt(lambda x: x>=2) | deref() # returns [2, 3, 4], instead of np.array([2, 3, 4]), because `math.exp` can't operate on numpy arrays np.array([1, 2, 3, 4]) | filt(lambda x: math.exp(x) >= 3) | deref() If you need more extensive filtering capabilities involving text, check out :class:`~k1lib.cli.grep.grep` If "filt" is too hard to remember, this cli also has an alias :class:`filter_` that kinda mimics Python's ``filter()``. :param predicate: function that returns True or False :param column: if not specified, then filters elements of the input array, else filters the specific column only (or columns, just like in :class:`~k1lib.cli.modifier.apply`) :param catchErrors: whether to catch errors in the function or not (reject elements that raise errors). Runs slower if enabled though""" # filt fs = [predicate]; super().__init__(fs) # filt if column: # filt ex = Exception(f"Filtering using a function on a negative-indexed column ({column}) is not supported") # filt if isinstance(column, int): # filt if column < 0: raise ex # filt else: # filt column = list(column) # filt if len([c for c in column if c < 0]): raise ex # filt f = fs[0]; _fP = fastF(f); self.column = column # filt if catchErrors: # filt def g(x): # filt try: return _fP(x) # filt except: return False # filt self.predicate = g # filt else: self.predicate = _fP # filt
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # filt p = self.predicate; c = self.column # filt if c is None: # filt if isinstance(it, settings.arrayTypes): # filt try: return it[p(it)] # filt except Exception as e: print(e) # filt return (l for l in it if p(l)) # filt elif isinstance(c, int): # filt if isinstance(it, settings.arrayTypes): # filt try: return it[p(it[:,c])] # filt except: pass # filt def gen(): # filt for es in it: # filt es = list(es) # filt if c < len(es) and p(es[c]): yield es # filt return gen() # filt else: # list of ints # filt ops = [] # filt for c_ in c: ops.append(filt(self.predicate, c_, False)) # filt return it | cli.serial(*ops) # filt
[docs] def __invert__(self): # filt """Negate the condition""" # filt def f(s): # filt if isinstance(s, settings.arrayTypes): # filt res = self.predicate(s) # can cause an exception, but that's ok, as that's the signal telling the code in __ror__ to not pass in array types # filt if isinstance(res, settings.arrayTypes): return ~res # filt return not self.predicate(s) # filt return filt(f, self.column) # filt
def __neg__(self): # filt """Also negates the condition""" # filt return ~self # filt
[docs] def split(self): # filt """Splits the input into positive and negative samples. Example:: # returns [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]] range(10) | filt(lambda x: x%2 == 0).split() | deref() # also returns [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]], exactly like above range(10) | filt(lambda x: x%2 == 0) & filt(lambda x: x%2 != 0) | deref()""" # filt f = self.predicate; c = self.column; return filt(f, c) & ~filt(f, c) # filt
filter_ = filt # filt
[docs]def inSet(values:Set[Any], column:int=None) -> filt: # inSet """Filters out lines that is not in the specified set. Example:: # returns [2, 3] range(5) | inSet([2, 8, 3]) | deref() # returns [0, 1, 4] range(5) | ~inSet([2, 8, 3]) | deref()""" # inSet if not isinstance(values, (set, dict)): values = set(values) # inSet return filt(lambda l: l in values, column) # inSet
[docs]def contains(s:str, column:int=None) -> filt: # contains """Filters out lines that don't contain the specified substring. Sort of similar to :class:`~k1lib.cli.grep.grep`, but this is simpler, and can be inverted. Example:: # returns ['abcd', '2bcr'] ["abcd", "0123", "2bcr"] | contains("bc") | deref()""" # contains return filt(lambda e: s in e, column) # contains
[docs]class empty(BaseCli): # empty
[docs] def __init__(self, reverse=False): # empty """Filters out streams that is not empty. Almost always used inverted, but "empty" is a short, sweet name that's easy to remember. Example:: # returns [[1, 2], ['a']] [[], [1, 2], [], ["a"]] | ~empty() | deref() :param reverse: not intended to be used by the end user. Do ``~empty()`` instead.""" # empty super().__init__(); self.reverse = reverse # empty
[docs] def __ror__(self, streams:Iterator[Iterator[Any]]) -> Iterator[Iterator[Any]]: # empty r = self.reverse # empty for stream in streams: # empty try: # empty item, it = stream | cli.peek() # empty if not r: # empty if it == []: yield it # empty else: # empty if it != []: yield it # empty except StopIteration: pass # empty
[docs] def __invert__(self): # empty return empty(not self.reverse) # empty
[docs]def isNumeric(column:int=None) -> filt: # isNumeric """Filters out a line if that column is not a number. Example:: # returns [0, 2, '3'] [0, 2, "3", "a"] | isNumeric() | deref()""" # isNumeric def f(v): # isNumeric try: float(v); return True # isNumeric except ValueError: return False # isNumeric return filt(f, column) # isNumeric
[docs]def instanceOf(cls:Union[type, Tuple[type]], column:int=None) -> filt: # instanceOf """Filters out lines that is not an instance of the given type. Example:: # returns [2] [2, 2.3, "a"] | instanceOf(int) | deref() # returns [2, 2.3] [2, 2.3, "a"] | instanceOf((int, float)) | deref()""" # instanceOf if isinstance(cls, list): cls = tuple(cls) # instanceOf return filt(lambda e: isinstance(e, cls), column) # instanceOf
def sliceable(it): # sliceable try: it[0]; len(it); return True # sliceable except: return False # sliceable def _head(n, inverted, it): # _head it = iter(it) # _head if n is None: # _head if not inverted: yield from it # _head else: return # _head elif n >= 0: # _head if not inverted: # _head for i, line in zip(range(n), it): yield line # _head else: # _head for i, line in zip(range(n), it): pass # _head yield from it # _head else: # _head if not inverted: # head to -3 # _head n = abs(n); queue = deque() # _head for line in it: # _head queue.append(line) # _head if len(queue) > n: yield queue.popleft() # _head else: yield from deque(it, -n) # -3 to end # _head class headSplit(BaseCli): # headSplit def __init__(self, n, inverted): # headSplit self.n = n; self.inverted = inverted # headSplit self.fixup = n is None or isinstance(n, float) or n < 0 # headSplit self.sliceable = None # headSplit def _all_array_opt(self, it, level): # headSplit n = self.n; inverted = self.inverted # headSplit if n is not None and round(n) != n: n = int(it.shape[level]*n) # fractional head # headSplit sl = tuple([slice(None)]*level); b = it[(*sl, slice(n, None))]; a = it[(*sl, slice(None, n))] # headSplit return [b, a] if inverted else [a, b] # headSplit def __ror__(self, it): # headSplit sliceable_ = self.sliceable; n = self.n # headSplit if sliceable_ is None: self.sliceable = sliceable_ = sliceable(it) # headSplit it = it if sliceable_ else list(it) # headSplit if self.fixup: # needs to fix n to a more definite value. Just to make it faster # headSplit l = len(it) # headSplit if n is None: return it, [] # headSplit if isinstance(n, float): n = int(l*n) # fractional head # headSplit n = (n+l)%l # headSplit return it[:n], it[n:] # headSplit
[docs]def tail(n:int=10): # tail """Basically an inverted :class:`head`. Examples:: range(10) | tail(3) | deref() # returns [7, 8, 9]""" # tail return ~head(-n) # tail
class lazyList: # lazyList def __init__(self, it): # lazyList self.it = iter(it); self.elems = [] # lazyList def __getitem__(self, idx): # lazyList elems = self.elems; it = self.it # lazyList for _ in range(len(elems)-1, idx): elems.append(next(it)) # lazyList return elems[idx] # lazyList
[docs]class rows(BaseCli): # rows
[docs] def __init__(self, *rows:List[int]): # rows """Selects specific elements given an iterator of indexes. Space complexity O(1) as a list is not constructed (unless you're slicing it in really weird way). Example:: "0123456789" | rows(2) | toList() # returns ["2"] "0123456789" | rows(5, 8) | toList() # returns ["5", "8"] "0123456789" | rows()[2:5] | toList() # returns ["2", "3", "4"] "0123456789" | ~rows()[2:5] | toList() # returns ["0", "1", "5", "6", "7", "8", "9"] "0123456789" | ~rows()[:7:2] | toList() # returns ['1', '3', '5', '7', '8', '9'] "0123456789" | rows()[:-4] | toList() # returns ['0', '1', '2', '3', '4', '5'] "0123456789" | ~rows()[:-4] | toList() # returns ['6', '7', '8', '9'] Why it's called "rows" is because I couldn't find a good name for it. There was :class:`cut`, which the name of an actual bash cli that selects out columns given indicies. When I needed a way to do what this cli does, it was in the context of selecting out rows, so the name stuck. If you want to just pick out the nth item from the iterator, instead of doing this:: iter(range(10)) | rows(3) | item() # returns 3 ... you can use the shorthand :class:`~k1lib.cli.utils.rItem` instead:: iter(range(10)) | rItem(3) # returns 3 :param rows: ints for the row indices""" # rows if len(rows) == 1 and isinstance(rows[0], slice): # rows self.slice = rows[0]; self.rows = None; self.idxMode = False # rows else: self.slice = None; self.rows = rows; self.sortedRows = sorted(rows); self.idxMode = True # rows self.inverted = False # rows
def __getitem__(self, _slice): # rows start, stop, step = _slice.start, _slice.stop, _slice.step # rows if step == None or step == 1: # rows if start == None and stop == None: return cli.iden() # rows if start == None: return head(stop) # rows if stop == None: return ~head(start) # rows elif step == 0: return cli.ignore() # rows answer = rows(_slice); answer.inverted = self.inverted; return answer # rows def _all_array_opt(self, it, level:int): # rows a = np.array(self.rows) if self.rows else self.slice; s = [slice(None, None, None)]*level # rows if self.inverted: mask = np.ones(it.shape[level], dtype=bool); mask[a] = False; return it[(*s, mask)] # rows return it[(*s, a)] # rows
[docs] def __invert__(self): self.inverted = not self.inverted; return self # rows
[docs] def __ror__(self, it:Iterator[str]): # rows idxMode = self.idxMode; inverted = self.inverted; sl = self.slice; rw = self.rows # rows def gen(it): # rows if not inverted: # rows if idxMode: # rows if len(self.sortedRows) == 0: return # rows it = list(it) if self.sortedRows[0] < 0 else lazyList(it) # rows for idx in rw: yield it[idx] # rows else: yield from list(it)[sl] # rows else: # rows it = list(it); n = len(it); idxs = set((e if e >= 0 else n+e) for e in rw) if idxMode else set(range(n)[sl]) # rows yield from (e for i, e in enumerate(it) if i not in idxs) # rows if isinstance(it, settings.arrayTypes): # rows a = np.array(rw) if rw else sl # rows if inverted: mask = np.ones(len(it), dtype=bool); mask[a] = False; return it[mask] # rows else: return it[a] # rows return gen(it) # rows
[docs]class cut(BaseCli): # cut
[docs] def __init__(self, *columns:List[int]): # cut """Cuts out specific columns, sliceable. Examples:: ["0123456789", "abcdefghij"] | cut(5, 8) | deref() # returns [['5', '8'], ['f', 'i']] ["0123456789", "abcdefghij"] | cut(8, 5) | deref() # returns [['8', '5'], ['i', 'f']], demonstrating permutation-safe ["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']] ["0123456789"] | cut(8, 5) | deref() # returns [['8', '5']], demonstrating permutation-safe ["0123456789", "abcdefghij"] | cut(2) | deref() # returns ['2', 'c'], instead of [['2'], ['c']] as usual ["0123456789"] | cut(2) | deref() # returns ['2'] ["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']] ["0123456789"] | ~cut()[:7:2] | deref() # returns [['1', '3', '5', '7', '8', '9']] In the first example, you can imagine that we're operating on this table:: 0123456789 abcdefghij Then, we want to grab the 5th and 8th column (0-indexed), which forms this table:: 58 fi So, result of that is just ``[['5', '8'], ['f', 'i']]`` In the fourth example, if you're only cutting out 1 column, then it will just grab that column directly, instead of putting it in a list. If you pass in :class:`numpy.ndarray` or :class:`torch.Tensor`, then it will automatically use the C-accelerated versions, like this:: torch.randn(4, 5, 6) | cut(2, 3) # returns tensor of shape (4, 2, 6) torch.randn(4, 5, 6) | cut(2) # returns tensor of shape (4, 6) torch.randn(4, 5, 6) | ~cut()[2:] # returns tensor of shape (4, 2, 6) .. warning:: TD;DR: inverted negative indexes are a bad thing when rows don't have the same number of elements Everything works fine when all of your rows have the same number of elements. But things might behave a little strangely if they don't. For example:: # returns [['2', '3', '4'], ['2', '3', '4', '5', '6', '7']]. Different number of columns, works just fine ["0123456", "0123456789"] | cut()[2:-2] | deref() # returns [['0', '1', '8', '9'], ['a', 'b', 'i', 'j']]. Same number of columns, works just fine ["0123456789", "abcdefghij"] | ~cut()[2:-2] | deref() # returns [['0', '1', '5', '6'], ['0', '1', '5', '6', '7', '8', '9']]. Different number of columns, unsupported invert case ["0123456", "0123456789"] | ~cut()[2:-2] | deref() Why does this happen? It peeks at the first row, determines that ~[2:-2] is equivalent to [:2] and [5:] combined and not [:2] and [-2:] combined. When applied to the second row, [-2:] goes from 5->9, hence the result. Another edge case would be:: # returns [['0', '1', '2', '3', '5', '6'], ['0', '1', '2', '3', '5', '6', '7', '8', '9']] ["0123456", "0123456789"] | ~cut(-3) | deref() Like before, it peeks the first row and translate ~(-3) into ~4, which is equivalent to [:4] and [5:]. But when applied to the second row, it now carries the meaning ~4, instead of ~(-3). Why don't I just fix these edge cases? Because the run time for it would be completely unacceptable, as we'd have to figure out what's the columns to include in the result for every row. This could easily be O(n^3). Of course, with more time optimizing, this could be solved, but this is the only extreme edge case and I don't feel like putting in the effort to optimize it.""" # cut super().__init__() # cut if len(columns) == 1 and isinstance(columns[0], slice): columns = columns[0] # columns is either a slice object, or a list of ints # cut self.columns = columns; self.inverted = False # columns: list[int] | slice # cut
def _all_array_opt(self, it, level): # cut c = self.columns; r = rows(c) if isinstance(c, slice) else rows(*c) # cut if self.inverted: r = ~r # cut it = it | r.all(level+1); return (it | cli.item().all(level+1)) if not isinstance(c, slice) and len(c) == 1 else it # cut
[docs] def __ror__(self, it): # cut columns = self.columns; inverted = self.inverted # cut isArray = isinstance(it, settings.arrayTypes)#; isArray = False # cut if isArray: nCols = len(it[0]); prs = rs = range(nCols) # range(nColumns). "prs" for padded rs # cut else: # carefully peaking first row and get the number of columns # cut it = iter(it); sentinel = object(); row = next(it, sentinel) # cut if row is sentinel: return [] # cut row = list(row); it = it | cli.insert(row); nCols = len(row) # cut rs = range(nCols); prs = range(nCols+20) # 20 for longer rows below. Also "rs" is not a great name, deal with it # cut if isinstance(columns, slice): # cut if not inverted: return it[:,columns] if isArray else (row[columns] for row in (list(row) for row in it)) # cut columns = list(set(rs[columns])) # cut columns = [e if e >= 0 else nCols + e for e in columns] # clear negative indicies # cut if self.inverted: columns = list(set(e for e in prs if e not in columns)) # cut if len(columns) == 1: c = columns[0]; return it[:,c] if isArray else (r[c] for r in (list(row) for row in it) if len(r) > c) # cut else: return it[:,columns] if isArray else ([row[c] for c in columns if c < len(row)] for row in (list(row) for row in it)) # cut
def __getitem__(self, idx): answer = cut(idx); answer.inverted = self.inverted; return answer # cut
[docs] def __invert__(self): self.inverted = not self.inverted; return self # cut
[docs]class intersection(BaseCli): # intersection
[docs] def __init__(self, column=None, full=False): # intersection """Returns the intersection of multiple streams. Example:: # returns set([2, 4, 5]) [[1, 2, 3, 4, 5], [7, 2, 4, 6, 5]] | intersection() # returns ['2g', '4h', '5j'] [["1a", "2b", "3c", "4d", "5e"], ["7f", "2g", "4h", "6i", "5j"]] | intersection(0) | deref() If you want the full distribution, meaning the intersection, as well as what's left of each stream, you can do something like this:: # returns [{2, 4, 5}, [1, 3], [7, 6]] [[1, 2, 3, 4, 5], [7, 2, 4, 6, 5]] | intersection(full=True) | deref() :param column: what column to apply the intersection on. Defaulted to None :param full: if specified, return the full distribution, instead of the intersection alone""" # intersection super().__init__(); self.column = column # intersection self.full = full # intersection self.f = intersection(column, False) if full else None # intersection
def _typehint(self, inp): # intersection if self.column is None: # intersection if isinstance(inp, tArrayTypes): return tSet(inp.child) # intersection if isinstance(inp, tListIterSet): # intersection if isinstance(inp.child, tListIterSet): # intersection return tSet(inp.child.child) # intersection return tSet(tAny()) # intersection if isinstance(inp, tCollection): # intersection a = inp.children[0] # intersection for e in inp.children: # intersection if not isinstance(e, tListIterSet): return tSet(tAny()) # intersection if e.child != a.child: return tSet(tAny()) # intersection return tSet(a.child) # intersection return tSet(tAny()); # intersection else: return tAny() # intersection
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]: # intersection c = self.column; full = self.full; f = self.f # intersection if full: # intersection if c is None: # intersection its = its | cli.deref(2); inter = its | f # intersection return [inter, *((e for e in it if e not in inter) for it in its)] # intersection else: raise Exception("intersection(int, True) mode not supported yet, as it's a little ambiguous what's the use case is, and there're many styles of functionality that this can take on") # intersection if c is None: # intersection answer = None # intersection for it in its: # intersection if answer is None: answer = set(it); continue # intersection answer = answer.intersection(it) # intersection return set() if answer is None else answer # intersection else: # intersection its = its | cli.deref(2); ans = {} # intersection ids = its | cut(c).all() | intersection() | cli.aS(set) # intersection for it in its: # intersection for row in it: # intersection e = row[c] # intersection if e in ids: ans[e] = row # intersection return ans.values() # intersection
[docs]class union(BaseCli): # union
[docs] def __init__(self): # union """Returns the union of multiple streams. Example:: # returns {0, 1, 2, 10, 11, 12, 13, 14} [range(3), range(10, 15)] | union() """ # union super().__init__() # union
def _typehint(self, inp): # union return intersection()._typehint(inp) # union
[docs] def __ror__(self, its:Iterator[Iterator[Any]]) -> Set[Any]: # union answer = set() # union for it in its: answer = set.union(answer, set(it)) # union return answer # union
[docs]class unique(BaseCli): # unique
[docs] def __init__(self, column:int=None): # unique """Filters out non-unique row elements. Example:: # returns [[1, "a"], [2, "a"]] [[1, "a"], [2, "a"], [1, "b"]] | unique(0) | deref() # returns [0, 1, 2, 3, 4] [*range(5), *range(3)] | unique() | deref() In the first example, because the 3rd element's first column is 1, which has already appeared, so it will be filtered out. :param column: the column to detect unique elements. Can be None, which will behave like converting the input iterator into a set, but this cli will maintain the order""" # unique super().__init__(); self.column = column # unique
[docs] def __ror__(self, it): # unique c = self.column # unique if c is None: # unique if isinstance(it, settings.arrayTypes): bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None); return bm.unique(it) # unique def gen(): # unique terms = set() # unique for e in it: # unique if e not in terms: yield e # unique terms.add(e) # unique else: # unique def gen(): # unique terms = set() # unique for row in it: # unique row = list(row); e = row[c] # unique if e not in terms: yield row # unique terms.add(e) # unique return gen() # unique
[docs]class breakIf(BaseCli): # breakIf
[docs] def __init__(self, f): # breakIf """Breaks the input iterator if a condition is met. Example:: # returns [0, 1, 2, 3, 4, 5] [*range(10), 2, 3] | breakIf(lambda x: x > 5) | deref()""" # breakIf fs = [f]; super().__init__(fs); self.f = fs[0]; self._fC = fastF(self.f) # breakIf
def _typehint(self, inp): # breakIf if isinstance(inp, tListIterSet): return tIter(inp.child) # breakIf return tIter(tAny()) # breakIf
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[Any]: # breakIf f = self._fC # breakIf for line in it: # breakIf if f(line): break # breakIf yield line # breakIf
[docs]class mask(BaseCli): # mask
[docs] def __init__(self, mask:Iterator[bool]): # mask """Masks the input stream. Example:: # returns [0, 1, 3] range(5) | mask([True, True, False, True, False]) | deref() # returns torch.tensor([0, 1, 3]) torch.tensor(range(5)) | mask([True, True, False, True, False])""" # mask super().__init__(); self.mask = mask # mask
[docs] def __ror__(self, it): # mask if isinstance(it, settings.arrayTypes): # mask return it[list(self.mask)] # mask return (e for e, m in zip(it, self.mask) if m) # mask
[docs]class tryout(BaseCli): # tryout
[docs] def __init__(self, result=None, retries=0): # tryout """Wraps every cli operation after this in a try-catch block, returning ``result`` if the operation fails. Example:: # returns 9 3 | (tryout("failed") | op()**2) # returns "failed", instead of raising an exception "3" | (tryout("failed") | op()**2) # special mode: returns "unsupported operand type(s) for ** or pow(): 'str' and 'int'" "3" | (tryout(str) | op()**2) # special mode: returns entire trace stack (do `import traceback` first) "3" | (tryout(traceback) | op()**2) # special mode: returns "3", the input of the tryout() block "3" | (tryout(input) | op()**2) By default, this ``tryout()`` object will gobble up all clis behind it and wrap them inside a try-catch block. This might be undesirable, so you can stop it early:: # returns "failed" 3 | (tryout("failed") | op()**2 | aS(str) | op()**2) # raises an exception, because it errors out after the tryout()-captured operations 3 | (tryout("failed") | op()**2) | aS(str) | op()**2 In the first example, :class:`tryout` will catch any errors happening within ``op()``, ``aS(str)`` or the second ``op()**2``. In the second example, :class:`tryout` will only catch errors happening within the first ``op()**2``. .. admonition:: Array mode The above works well for atomic operations and not looping operations. Let's say we have this function:: counter = 0 def f(x): global counter if x > 5: counter += 1 if counter < 3: raise Exception(f"random error: {x}") return x**2 This code will throw an error if x is greater than 5 for the first and second time (but runs smoothly after that. It's a really nasty function I know). Capturing like this will work:: counter = 0 # line below returns [0, 1, 4, 9, 16, 25, 'failed', 'failed', 64, 81] range(10) | apply(tryout("failed") | aS(f)) | deref() But capturing like this won't work:: counter = 0 # line below throws an exception range(10) | (tryout("failed") | apply(f)) | deref() The reason being, :class:`tryout` will only capture errors when the data is passed into ``apply(f)``, and won't capture it later on. However, when data is passed to ``apply(f)``, it hasn't executed anything yet (remember these things are lazily executed). So the exception actually happens when you're trying to ``deref()`` it, which lies outside of :class:`tryout`'s reach. You can just put a tilde in front to tell it to capture errors for individual elements in the iterator:: counter = 0 # line belows returns [0, 1, 4, 9, 16, 25, 'failed', 'failed', 64, 81] range(10) | (~tryout("failed") | apply(f)) | deref() This mode has a weird quirk that requires that there has to be a 1-to-1 correspondence between the input and output for the block of code that it wraps around. Meaning this is okay:: def g(x): global counter if 40 > x[0] >= 30: counter += 1 if counter < 5: raise Exception("random error") return x counter = 0 # returns 50, corrects errors as if it's not even there! range(50) | (~tryout(None, 6) | batched(10, True) | apply(g) | joinStreams()) | deref() | shape(0) This is okay because going in, there're 50 elements, and it's expected that 50 elements goes out of :class:`tryout`. The input can be of infinite length, but there has to be a 1-to-1 relationship between the input and output. While this is not okay:: counter = 0 # returns 75, data structure corrupted range(50) | (~tryout(None, 6) | batched(10, True) | apply(g) | joinStreams() | batched(2, True)) | joinStreams() | deref() | shape(0) It's not okay because it's expected that 25 pairs of elements goes out of :class:`tryout` .. admonition:: Retrying There's also the ``retries`` parameter, which specifies how many times should this class retry the operation until actually returning the predefined result:: counter = 0 # line below returns [0, 1, 4, 9, 16, 25, None, None, 64, 81] range(10) | (~tryout(retries=0) | apply(f)) | deref() counter = 0 # line below returns [0, 1, 4, 9, 16, 25, None, 49, 64, 81] range(10) | (~tryout(retries=1) | apply(f)) | deref() counter = 0 # line below returns [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] range(10) | (~tryout(retries=2) | apply(f)) | deref() :param result: result to return if there is an exception. If passed in the class `str`, then will return the exception's string instead. If passed in the function `input`, then will return the input instead :param retries: how many time to retry before giving up?""" # tryout super().__init__(capture=True); self.result = result; self.inverted = False; self.retries = retries # tryout
[docs] def __ror__(self, it): # tryout retries = self.retries # tryout result = self.result # tryout if len(self.capturedClis) == 0: raise Exception("tryout() currently does not wrap around any other cli. You may need to change `data | tryout() | cli1() | cli2()` into `data | (tryout() | cli1() | cli2())`") # tryout if not self.inverted: # single mode # tryout while True: # tryout try: return it | self.capturedSerial # tryout except Exception as e: # tryout if retries <= 0: return str(e) if result is str else (it if result is input else (traceback.format_exc() if result is traceback else result)) # tryout retries -= 1 # tryout else: # array mode # tryout def gen(it): # tryout patience = retries; savedInputs = k1lib.Wrapper(deque()); ogInp = None # tryout def interceptIt(it): # tryout for e in it: savedInputs().append(e); yield e # tryout it = iter(it); ogIt = it; it = interceptIt(it); outIt = it | self.capturedSerial # tryout while True: # tryout try: e = next(outIt); yield e; savedInputs().popleft(); patience = retries # tryout except StopIteration: break # tryout except Exception as e: # tryout if patience <= 0: ogInp = savedInputs().popleft(); patience = retries # ran out of patience, so gonna just return the canned result instead # tryout else: patience -= 1 # tryout # restart the loop # tryout it = interceptIt([list(savedInputs()), ogIt] | cli.joinStreams()) # tryout savedInputs.value = deque(); outIt = it | self.capturedSerial # tryout if patience == retries: # tryout yield str(e) if result is str else (ogInp if result is input else (traceback.format_exc() if result is traceback else result)) # just resetted # tryout return gen(it) # tryout
[docs] def __invert__(self): self.inverted = not self.inverted; return self # tryout
[docs]def resume(fn): # resume """Resumes a long-running operation. I usually have code that looks like this:: def f(x): pass # long running, expensive calculation ls(".") | applyMp(f) | apply(dill.dumps) | file("somefile.pth") # executing cat.pickle("somefile.pth") | aS(list) # getting all of the saved objects This will read all the files in the current directory, transforms them using the long-running, expensive function, potentially doing it in multiple processes. Then the results are serialized (turns into bytes) and it will be appended to an output file. What's frustrating is that I do stupid things all the time, so the process usually gets interrupted. But I don't want to redo the existing work, so that's where this cli comes into play. Now it looks like this instead:: ls(".") | resume("somefile.pth") | applyMp(f) | apply(dill.dumps) >> file("somefile.pth") Note that we're inserting a resume() AND changed the file write mode to append, so that the file doesn't get overriden. Internally, this is just a shorthand for ``~head(fn | (tryout(0) | aS(cat.pickle) | shape(0)))`` :param fn: file name""" # resume return ~cli.head(fn | (cli.tryout(0) | cli.aS(cli.cat.pickle) | cli.shape(0))) # resume