# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import k1lib as k1; import k1lib.cli as cli
from k1lib.cli import BaseCli
import os
__all__ = ["create", "apply", "filt", "select", "flatten", "pretty", "display", "search", "walk"]
settings = k1.settings.cli
kSett = k1.Settings().add("tok", "\ue000", "Special token for internal processing")
settings.add("ktree", kSett, "cli.ktree module settings");
def _mktree(depth): # _mktree
if depth <= 0: return cli.iden() # _mktree
return cli.groupBy(0, True) | cli.apply(_mktree(depth-1), 1) # _mktree
class mktree1(BaseCli): # mktree1
def __ror__(self, it): it = it | cli.deref(2); return (it | _mktree(len(it[0])))# if len(it) > 0 else [] # mktree1
def prune1(x): # eliminating empty elements. For some reason, _mktree() creates an empty element at each leaf. No idea why and too lazy to dig in further # prune1
for elem in x: # prune1
if len(elem) == 0: continue # prune1
s, children = elem; yield [s, prune1(children)] # prune1
def prune2(x) -> "[children, hasToken]": # trying to grab whether there is a branch that ends at the parent or not # prune2
hasToken = False; ans = [] # prune2
for s, children in x: # prune2
children, hT = prune2(children) # prune2
hasToken = hasToken or s == kSett.tok # prune2
if len(children) == 0: hT = 1 # prune2
ans.append([s, children, hT+0]) # prune2
return ans, hasToken # prune2
def prune3(x, depth=0): # trying to eliminate the "virtual" elements tagged with the token # prune3
ans = [] # prune3
for s, children, num in x: # prune3
if s == kSett.tok: continue # prune3
ans.append([s, prune3(children, depth+1), num, depth]) # prune3
return ans # prune3
def prune(x): return prune3(prune2(prune1(x))[0]) # prune
def prune_makeup(x): # prune_makeup
for elem in x: # prune_makeup
yield [elem[0], prune_makeup(elem[1]), 0] if len(elem) > 1 else [elem[0], [], 0] # last column for extra metadata slot # prune_makeup
[docs]class create(BaseCli): # create
[docs] def __init__(self): # create
"""Creates a tree structure from a flat table.
Example::
flat = [["a", "b"],
["a", "b", "2"],
["a", "b", "3"],
["a", "c", "4"],
["a", "c", "5"],
["a", "d"]]
flat | ktree.create()
That will return this::
[['a',
[['b', [
['2', [], 1, 2],
['3', [], 1, 2]],
1, 1],
['c', [
['4', [], 1, 2],
['5', [], 1, 2]],
0, 1],
['d', [], 1, 1]],
0, 0]]
Each element is represented by a 3-tuple: ``[name, children, ends here?, depth]``.
Children is just a list of elements, and "ends here?" means whether there
is a row in the flat table that exists for this particular element. Most
has it, like "a/b/2" and "a/b/3", but some don't, like "a/c", so "ends here?"
of "c" is 0.
Everything in the :mod:`ktree` module is relatively unoptimized, because my use
cases don't really require super high speed, so right now it can only handle
18k rows/second.
.. warning::
Internally, this makes use of the special character "\\ue000", configurable
at ``settings.cli.ktree.tok`` to create the tree. This is in Unicode's private
use area, so it shouldn't appear anywhere in normal text. However, if your
application uses this character, it's best to change it to something else
""" # create
[docs] def __ror__(self, it): return prune(it | cli.transpose(fill=kSett.tok) | cli.transpose() | mktree1()) # create
def _applyRet(x, f) -> "ch": # _applyRet
ans = [] # _applyRet
for s, ch, *other in x: ans.append(f(s, _applyRet(ch, f), *other)) # _applyRet
return ans # _applyRet
[docs]class apply(BaseCli): # apply
[docs] def __init__(self, f): # apply
"""Transforms the tree recursively.
Example::
a = flat | ktree.create() | ktree.apply(lambda s,ch,e,d: [s,ch,(s | (tryout(0) | aS(int))) + (ch | cut(2) | toSum())])
That returns this::
[['a',
[['b', [['2', [], 2], ['3', [], 3]], 5],
['c', [['4', [], 4], ['5', [], 5]], 9],
['d', [], 0]],
14]]
Extracting the final result::
a | cut(2) | item() # returns 14
Here, I'm trying to add up all elements that can be converted into an
integer, but do that at every node in the tree. The ``s | (tryout(0) | aS(int))``
part is trying to convert it into an integer, returning 0 if it can't do it.
Then, the ``ch | cut(2) | toSum()`` tries to add up the result of all children
underneath it. The results naturally trickle upwards and you get the result "14"
at the top level, at which point it's trivial to extract.
If you're using a lot of cli tools inside the lambda function and the performance
is terrible, you can try to initialize the cli tools outside the lambda, and then
just use it inside, to eliminate cli initialization cost, like this::
def getVal(x):
try: return int(x)
except: return 0
sumF = cut(2) | toSum()
flat | ktree.create() | ktree.apply(lambda s,ch,e,d: [s,ch,getVal(s) + sumF(ch)])
This should give you enough performance for your purpose, but if you want even more
perf, don't use cli tools inside the lambda at all.""" # apply
self.f = f # apply
[docs] def __ror__(self, it): return _applyRet(it, self.f) # apply
def _filter(x, f) -> "ch": # _filter
ans = [] # _filter
for s, ch, *other in x: # _filter
ch = _filter(ch, f) # _filter
res = f(s, ch, *other) # _filter
if res: ans.append([s, ch, *other]) # _filter
return ans # _filter
[docs]class filt(BaseCli): # filt
[docs] def __init__(self, f): # filt
"""Filters out elements that don't pass a certain predicate.
Example::
# returns [['a', [['b', [], 1, 1], ['c', [], 0, 1], ['d', [], 1, 1]], 0, 0]]
flat | ktree.create() | ktree.filt(lambda s,ch,e,d: d<2)
Essentially, this filters out any elements that are of depth 2 or more.""" # filt
self.f = f # filt
[docs] def __ror__(self, it): return _filter(it, self.f) # filt
[docs]class select(BaseCli): # select
[docs] def __init__(self, term:str, depth:int, exact:bool=True): # select
"""Selects for a single element at a particular depth.
Example::
# returns [['a', [['b', [['2', [], 1, 2], ['3', [], 1, 2]], 1, 1]], 0, 0]]. It selects for "/*/b"
flat | ktree.create() | ktree.select("b", 1)
# returns [['a', [['b', [], 1, 1], ['c', [], 0, 1], ['d', [], 1, 1]], 0, 0]]. It selects for "/*/*/b", which does not exist. But it still returns all elements in depth 1, as depth 1 is not selected for
flat | ktree.create() | ktree.select("b", 2)
# returns [['a', [['b', [['2', [], 1, 2], ['3', [], 1, 2]], 1, 1]], 0, 0]]. It selects for "/a/b"
flat | ktree.create() | ktree.select("a", 0) | ktree.select("b", 1)
# returns []. It selects for "/c/b", which does not exist
flat | ktree.create() | ktree.select("c", 0) | ktree.select("b", 1)
:param term: term to select
:param depth: depth at which to select term
:param exact: if True, select for term exactly, else select for elements that contains term""" # select
self.term = term; self.depth = depth; self.exact = exact # select
[docs] def __ror__(self, it): # select
term = self.term; depth = self.depth; exact = self.exact # select
if exact: return it | filt(lambda s,ch,e,d: (d != depth) or (d == depth and term == s)) # select
else: return it | filt(lambda s,ch,e,d: (d != depth) or (d == depth and term in s)) # select
def _flatten(x, joinF, prefix=""): # _flatten
for s,ch,*other in x: pre = joinF(prefix, s); yield [pre,*other]; yield from _flatten(ch, joinF, prefix=pre) # _flatten
[docs]class flatten(BaseCli): # flatten
[docs] def __init__(self, joinF=None): # flatten
"""Flattens the tree.
Example::
flat | ktree.create() | ktree.flatten() | deref()
This returns::
[['/a', 0, 0],
['/a/b', 1, 1],
['/a/b/2', 1, 2],
['/a/b/3', 1, 2],
['/a/c', 0, 1],
['/a/c/4', 1, 2],
['/a/c/5', 1, 2],
['/a/d', 1, 1]]
:param joinF: a function that takes in 2 parameters: prefix (like "/a/b") and element
name (like "2"), to form a new prefix ("/a/b/2"). Defaulted to ``lambda p,s: f"{p}/{s}"``
""" # flatten
self.joinF = joinF or (lambda p,s: f"{p}/{s}") # flatten
[docs] def __ror__(self, it): return _flatten(it, self.joinF) # flatten
def _prettyDepth(x, depth=0): # _prettyDepth
for s,ch,*other in x: # _prettyDepth
yield f"{depth}".ljust(3) + f" "*(depth+1) + ([s, *other] | cli.join(' - ')) # _prettyDepth
yield from _prettyDepth(ch, depth+1) # _prettyDepth
def _prettyNoDepth(x, depth=0): # _prettyNoDepth
for s,ch,*other in x: # _prettyNoDepth
yield f" "*depth + f"{s} - {other | cli.join(' - ')}" # _prettyNoDepth
yield from _prettyNoDepth(ch, depth+1) # _prettyNoDepth
[docs]class pretty(BaseCli): # pretty
[docs] def __init__(self, depth=True): # pretty
"""Converts tree into a string iterator, indented
with the correct level of depth. See also: :class:`display`
:param depth: whether to display the depth at the start of each line or not""" # pretty
self.depth = depth # pretty
[docs] def __ror__(self, it): return _prettyDepth(it) if self.depth else _prettyNoDepth(it) # pretty
[docs]class display(BaseCli): # display
[docs] def __init__(self, lines=10, depth=True): # display
"""Displays the tree.
Example::
flat | ktree.create() | ktree.display()
# equivalent to the above
flat | ktree.create() | ktree.pretty() | stdout()
:param lines: line limit. Put "None" to display all
:param depth: whether to display the depth at the start of each line or not""" # display
self.lines = lines; self.depth = depth # display
[docs] def __ror__(self, it): it | pretty(self.depth) | cli.head(self.lines) | cli.stdout() # display
[docs]def search(term:str, exact=False): # search
"""Searches for a specific term in the name of all elements.
Example::
# returns [['a', [['b', [['2', [], 1, 2]], 1, 1]], 0, 0]]
flat | ktree.create() | ktree.search("2")
:param term: term to search for
:param exact: if True, searches for that exact term, else searches
for the term inside the string""" # search
if exact: st1 = apply(lambda s,ch,*o: [s,ch,(term == s) or any(c[2] for c in ch),*o]) # search
else: st1 = apply(lambda s,ch,*o: [s,ch,(term in s) or any(c[2] for c in ch),*o]) # search
return st1 | filt(lambda s,ch,e,*o: e) | apply(lambda s,ch,e,*o: [s,ch,*o]) # search
_walk = lambda f: cli.aS(lambda fn: os.walk(os.path.expanduser(fn)) | cli.cut(0, 2) | cli.ungroup() | cli.join(os.sep).all() | f | cli.op().split(os.sep).all() | create()) # search
[docs]def walk(fn=None, f=None): # walk
"""Walks the file system and generates a tree of all files and folders.
Example::
"/home/ubuntu" | ktree.walk() # returns a tree
ktree.walk("/home/ubundu") # same as above
ktree.walk(".") # can also do relative paths
"/home/ubuntu" | ktree.walk(f=~grep("git")) # returns a tree with no hidden git-related files
ktree.walk("/home/ubuntu", ~grep("git")) # same as above
:param fn: folder name
:param f: optional function if you want to filter out paths (Iterator[str])""" # walk
f = f or cli.iden(); return _walk(f) if fn is None else fn | _walk(f) # walk