Source code for k1lib.cli.ktree

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import k1lib as k1; import k1lib.cli as cli
from k1lib.cli import BaseCli
import os
__all__ = ["create", "apply", "filt", "select", "flatten", "pretty", "display", "search", "walk"]
settings = k1.settings.cli
kSett = k1.Settings().add("tok", "\ue000", "Special token for internal processing")
settings.add("ktree", kSett, "cli.ktree module settings");
def _mktree(depth):                                                              # _mktree
    if depth <= 0: return cli.iden()                                             # _mktree
    return cli.groupBy(0, True) | cli.apply(_mktree(depth-1), 1)                 # _mktree
class mktree1(BaseCli):                                                          # mktree1
    def __ror__(self, it): it = it | cli.deref(2); return (it | _mktree(len(it[0])))# if len(it) > 0 else [] # mktree1
def prune1(x): # eliminating empty elements. For some reason, _mktree() creates an empty element at each leaf. No idea why and too lazy to dig in further # prune1
    for elem in x:                                                               # prune1
        if len(elem) == 0: continue                                              # prune1
        s, children = elem; yield [s, prune1(children)]                          # prune1
def prune2(x) -> "[children, hasToken]": # trying to grab whether there is a branch that ends at the parent or not # prune2
    hasToken = False; ans = []                                                   # prune2
    for s, children in x:                                                        # prune2
        children, hT = prune2(children)                                          # prune2
        hasToken = hasToken or s == kSett.tok                                    # prune2
        if len(children) == 0: hT = 1                                            # prune2
        ans.append([s, children, hT+0])                                          # prune2
    return ans, hasToken                                                         # prune2
def prune3(x, depth=0): # trying to eliminate the "virtual" elements tagged with the token # prune3
    ans = []                                                                     # prune3
    for s, children, num in x:                                                   # prune3
        if s == kSett.tok: continue                                              # prune3
        ans.append([s, prune3(children, depth+1), num, depth])                   # prune3
    return ans                                                                   # prune3
def prune(x): return prune3(prune2(prune1(x))[0])                                # prune
def prune_makeup(x):                                                             # prune_makeup
    for elem in x:                                                               # prune_makeup
        yield [elem[0], prune_makeup(elem[1]), 0] if len(elem) > 1 else [elem[0], [], 0] # last column for extra metadata slot # prune_makeup
[docs]class create(BaseCli): # create
[docs] def __init__(self): # create """Creates a tree structure from a flat table. Example:: flat = [["a", "b"], ["a", "b", "2"], ["a", "b", "3"], ["a", "c", "4"], ["a", "c", "5"], ["a", "d"]] flat | ktree.create() That will return this:: [['a', [['b', [ ['2', [], 1, 2], ['3', [], 1, 2]], 1, 1], ['c', [ ['4', [], 1, 2], ['5', [], 1, 2]], 0, 1], ['d', [], 1, 1]], 0, 0]] Each element is represented by a 3-tuple: ``[name, children, ends here?, depth]``. Children is just a list of elements, and "ends here?" means whether there is a row in the flat table that exists for this particular element. Most has it, like "a/b/2" and "a/b/3", but some don't, like "a/c", so "ends here?" of "c" is 0. Everything in the :mod:`ktree` module is relatively unoptimized, because my use cases don't really require super high speed, so right now it can only handle 18k rows/second. .. warning:: Internally, this makes use of the special character "\\ue000", configurable at ``settings.cli.ktree.tok`` to create the tree. This is in Unicode's private use area, so it shouldn't appear anywhere in normal text. However, if your application uses this character, it's best to change it to something else """ # create
[docs] def __ror__(self, it): return prune(it | cli.transpose(fill=kSett.tok) | cli.transpose() | mktree1()) # create
def _applyRet(x, f) -> "ch": # _applyRet ans = [] # _applyRet for s, ch, *other in x: ans.append(f(s, _applyRet(ch, f), *other)) # _applyRet return ans # _applyRet
[docs]class apply(BaseCli): # apply
[docs] def __init__(self, f): # apply """Transforms the tree recursively. Example:: a = flat | ktree.create() | ktree.apply(lambda s,ch,e,d: [s,ch,(s | (tryout(0) | aS(int))) + (ch | cut(2) | toSum())]) That returns this:: [['a', [['b', [['2', [], 2], ['3', [], 3]], 5], ['c', [['4', [], 4], ['5', [], 5]], 9], ['d', [], 0]], 14]] Extracting the final result:: a | cut(2) | item() # returns 14 Here, I'm trying to add up all elements that can be converted into an integer, but do that at every node in the tree. The ``s | (tryout(0) | aS(int))`` part is trying to convert it into an integer, returning 0 if it can't do it. Then, the ``ch | cut(2) | toSum()`` tries to add up the result of all children underneath it. The results naturally trickle upwards and you get the result "14" at the top level, at which point it's trivial to extract. If you're using a lot of cli tools inside the lambda function and the performance is terrible, you can try to initialize the cli tools outside the lambda, and then just use it inside, to eliminate cli initialization cost, like this:: def getVal(x): try: return int(x) except: return 0 sumF = cut(2) | toSum() flat | ktree.create() | ktree.apply(lambda s,ch,e,d: [s,ch,getVal(s) + sumF(ch)]) This should give you enough performance for your purpose, but if you want even more perf, don't use cli tools inside the lambda at all.""" # apply self.f = f # apply
[docs] def __ror__(self, it): return _applyRet(it, self.f) # apply
def _filter(x, f) -> "ch": # _filter ans = [] # _filter for s, ch, *other in x: # _filter ch = _filter(ch, f) # _filter res = f(s, ch, *other) # _filter if res: ans.append([s, ch, *other]) # _filter return ans # _filter
[docs]class filt(BaseCli): # filt
[docs] def __init__(self, f): # filt """Filters out elements that don't pass a certain predicate. Example:: # returns [['a', [['b', [], 1, 1], ['c', [], 0, 1], ['d', [], 1, 1]], 0, 0]] flat | ktree.create() | ktree.filt(lambda s,ch,e,d: d<2) Essentially, this filters out any elements that are of depth 2 or more.""" # filt self.f = f # filt
[docs] def __ror__(self, it): return _filter(it, self.f) # filt
[docs]class select(BaseCli): # select
[docs] def __init__(self, term:str, depth:int, exact:bool=True): # select """Selects for a single element at a particular depth. Example:: # returns [['a', [['b', [['2', [], 1, 2], ['3', [], 1, 2]], 1, 1]], 0, 0]]. It selects for "/*/b" flat | ktree.create() | ktree.select("b", 1) # returns [['a', [['b', [], 1, 1], ['c', [], 0, 1], ['d', [], 1, 1]], 0, 0]]. It selects for "/*/*/b", which does not exist. But it still returns all elements in depth 1, as depth 1 is not selected for flat | ktree.create() | ktree.select("b", 2) # returns [['a', [['b', [['2', [], 1, 2], ['3', [], 1, 2]], 1, 1]], 0, 0]]. It selects for "/a/b" flat | ktree.create() | ktree.select("a", 0) | ktree.select("b", 1) # returns []. It selects for "/c/b", which does not exist flat | ktree.create() | ktree.select("c", 0) | ktree.select("b", 1) :param term: term to select :param depth: depth at which to select term :param exact: if True, select for term exactly, else select for elements that contains term""" # select self.term = term; self.depth = depth; self.exact = exact # select
[docs] def __ror__(self, it): # select term = self.term; depth = self.depth; exact = self.exact # select if exact: return it | filt(lambda s,ch,e,d: (d != depth) or (d == depth and term == s)) # select else: return it | filt(lambda s,ch,e,d: (d != depth) or (d == depth and term in s)) # select
def _flatten(x, joinF, prefix=""): # _flatten for s,ch,*other in x: pre = joinF(prefix, s); yield [pre,*other]; yield from _flatten(ch, joinF, prefix=pre) # _flatten
[docs]class flatten(BaseCli): # flatten
[docs] def __init__(self, joinF=None): # flatten """Flattens the tree. Example:: flat | ktree.create() | ktree.flatten() | deref() This returns:: [['/a', 0, 0], ['/a/b', 1, 1], ['/a/b/2', 1, 2], ['/a/b/3', 1, 2], ['/a/c', 0, 1], ['/a/c/4', 1, 2], ['/a/c/5', 1, 2], ['/a/d', 1, 1]] :param joinF: a function that takes in 2 parameters: prefix (like "/a/b") and element name (like "2"), to form a new prefix ("/a/b/2"). Defaulted to ``lambda p,s: f"{p}/{s}"`` """ # flatten self.joinF = joinF or (lambda p,s: f"{p}/{s}") # flatten
[docs] def __ror__(self, it): return _flatten(it, self.joinF) # flatten
def _prettyDepth(x, depth=0): # _prettyDepth for s,ch,*other in x: # _prettyDepth yield f"{depth}".ljust(3) + f" "*(depth+1) + ([s, *other] | cli.join(' - ')) # _prettyDepth yield from _prettyDepth(ch, depth+1) # _prettyDepth def _prettyNoDepth(x, depth=0): # _prettyNoDepth for s,ch,*other in x: # _prettyNoDepth yield f" "*depth + f"{s} - {other | cli.join(' - ')}" # _prettyNoDepth yield from _prettyNoDepth(ch, depth+1) # _prettyNoDepth
[docs]class pretty(BaseCli): # pretty
[docs] def __init__(self, depth=True): # pretty """Converts tree into a string iterator, indented with the correct level of depth. See also: :class:`display` :param depth: whether to display the depth at the start of each line or not""" # pretty self.depth = depth # pretty
[docs] def __ror__(self, it): return _prettyDepth(it) if self.depth else _prettyNoDepth(it) # pretty
[docs]class display(BaseCli): # display
[docs] def __init__(self, lines=10, depth=True): # display """Displays the tree. Example:: flat | ktree.create() | ktree.display() # equivalent to the above flat | ktree.create() | ktree.pretty() | stdout() :param lines: line limit. Put "None" to display all :param depth: whether to display the depth at the start of each line or not""" # display self.lines = lines; self.depth = depth # display
[docs] def __ror__(self, it): it | pretty(self.depth) | cli.head(self.lines) | cli.stdout() # display
_walk = lambda f: cli.aS(lambda fn: os.walk(os.path.expanduser(fn)) | cli.cut(0, 2) | cli.ungroup() | cli.join(os.sep).all() | f | cli.op().split(os.sep).all() | create()) # search
[docs]def walk(fn=None, f=None): # walk """Walks the file system and generates a tree of all files and folders. Example:: "/home/ubuntu" | ktree.walk() # returns a tree ktree.walk("/home/ubundu") # same as above ktree.walk(".") # can also do relative paths "/home/ubuntu" | ktree.walk(f=~grep("git")) # returns a tree with no hidden git-related files ktree.walk("/home/ubuntu", ~grep("git")) # same as above :param fn: folder name :param f: optional function if you want to filter out paths (Iterator[str])""" # walk f = f or cli.iden(); return _walk(f) if fn is None else fn | _walk(f) # walk