Source code for k1lib.cli.kgv

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This includes helper clis that make it quick to graph graphviz plots."""
__all__ = ["sketch", "edges"]
import re, k1lib, math, os, numpy as np, io, json, base64, unicodedata, inspect
from k1lib.cli.init import BaseCli; import k1lib.cli as cli, k1lib.cli.init as init
graphviz = k1lib.dep("graphviz")
from collections import deque, defaultdict
settings = k1lib.settings.cli
_svgAutoInc = k1lib.AutoIncrement(prefix="_k1_svg_")
_preAutoInc = k1lib.AutoIncrement(prefix="_k1_svg_pre_")
_clusterAuto = k1lib.AutoIncrement(prefix="cluster_")
_idxAuto = k1lib.AutoIncrement(prefix="_idx_")
[docs]class sketch(BaseCli): # sketch _g = None; _name2Idx = None; ctxIdx = None # sketch
[docs] def __init__(self, **kwargs): # sketch """Similar to :class:`~utils.sketch`, which makes it easier to plot multiple graphs quickly, this makes it easier to plot node graphs a lot quicker than I have before. This cli configures the graph in general, but doesn't dive too much into the specifics. Example:: ["ab", "bc", "ca"] | (kgv.sketch(engine="sfdp") | kgv.edges()) Most of the complexities are in :class:`edges`, so check that class out for more comprehensive examples :param kwargs: keyword arguments passed into :class:`graphviz.Digraph`""" # sketch super().__init__(capture=True); self.kwargs = kwargs # sketch
@staticmethod # sketch def _guard(): # sketch if sketch._g is None: raise Exception("Context has not been setup yet, can't proceed to plot the edges/nodes. This could be because you're doing `data | kgv.edges()` directly. Instead, do `data | (kgv.sketch() | kgv.edges())`. The operation `kgv.sketch()` will initialize the graph") # sketch
[docs] def __ror__(self, it): # sketch sketch._g = g = graphviz.Digraph(**self.kwargs); sketch._idx2Popup = idx2Popup = {} # sketch sketch._name2Idx = name2Idx = defaultdict(lambda: defaultdict(lambda: _idxAuto())) # sketch it | self.capturedSerial | cli.deref() # sketch sketch._g = None; sketch._idx2Popup = None; sketch._name2Idx = None; return Graph(g, name2Idx, idx2Popup) # sketch
def _jsF(self, meta): # sketch fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); ctxIdx = init._jsDAuto(); sketch.ctxIdx = ctxIdx # sketch header, _fIdx, _async = k1lib.kast.asyncGuard(self.capturedSerial._jsF(meta)) # sketch res = f"""\ let {ctxIdx} = null;\n{header} const {fIdx} = async ({dataIdx}) => {{ {ctxIdx} = []; const out = {'await ' if _async else ''}{_fIdx}({dataIdx}); const res = (await (await fetch("https://local.mlexps.com/routeServer/kapi_10-graphviz", {{ method: "POST", body: JSON.stringify({{ "obj": {ctxIdx} }}), headers: {{ "Content-Type": "application/json" }} }})).json()); if (!res.success) throw new Error(res.reason); return atob(res.data); }}""", fIdx # sketch sketch.ctxIdx = None; return res # sketch
class nodes(BaseCli): # nodes def __init__(self): # nodes pass # nodes def __ror__(self, _it) -> "5-column input": # nodes if sketch._g is None: return _it | (sketch() | self) # nodes sketch._guard(); it = [] # nodes for row in _it: # nodes n = len(row) # nodes if n == 1: it.append(["", row[0], {}]) # nodes elif n == 2: it.append([*row, {}]) # nodes elif n == 3: it.append(row) # nodes else: raise Exception(f"kgv.nodes() can only accept tables from 1 to 3 columns. Detected {n} columns instead") # nodes g = sketch._g; name2Idx = sketch._name2Idx # nodes for s1,n1,kw in it: # nodes idx = name2Idx[s1][n1] # nodes if "popup" in kw: sketch._idx2Popup[idx] = kw["popup"]; kw = dict(kw); del kw["popup"] # nodes g.node(idx, **kw) # nodes return it # nodes def _jsF(self, meta): # nodes fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); ctxIdx = sketch.ctxIdx; # nodes if ctxIdx is None: return (sketch() | self)._jsF(meta) # nodes return f"""\ const {fIdx} = ({dataIdx}) => {{ const it = []; for (const row of {dataIdx}) {{ const n = row.length; if (n === 1) it.push(["", row[0], {{}}]) else if (n === 2) it.push([...row, {{}}]) else if (n === 3) it.push(row) else throw new Error(`kgv.nodes() can only accept tables from 1 to 3 columns. Detected ${{n}} columns instead`) }} {ctxIdx}.push(["nodes", {{it, args: []}}]); return it; }} """, fIdx # nodes def drawSimple(g, names, data, name2Idx): # data here is List[[name1, name2, kw]], names is List[name] # drawSimple for name in set(names): g.node(name2Idx[name], name); # drawSimple for name1, name2, kw in data: g.edge(name2Idx[name1], name2Idx[name2], **kw) # drawSimple
[docs]class edges(BaseCli): # edges
[docs] def __init__(self): # edges """Plots out edges of the graph. Example 1:: ["ab", "bc", "ca"] | kgv.edges() Result: .. image:: ../images/kgv_edges_1.png If you need to customize the graph on initialization, then you can use :class:`sketch` to capture related operations (like :class:`edges`), and inject your params in :class:`sketch`:: ["ab", "bc", "ca"] | (kgv.sketch(engine="sfdp") | kgv.edges()) Example 2:: [["a", "b", {"label": "3%"}], ["b", "c", {"color": "red"}], "ac", "cb"] | kgv.edges() Result: .. image:: ../images/kgv_edges_2.png Example 3:: [["group1", "a", "group1", "b", {"label": "3%"}], "ec", ["group1", "b", "", "c", {"color": "red"}], ["group1", "a", "", "c"], ["", "c", "group1", "b"], ["", "c", "group2", "d", {"color": "green"}] ] | kgv.edges() Result: .. image:: ../images/kgv_edges_3.png So the idea is, each row describes a single edge on the graph. Each row can be multiple different lengths, but only these configurations are allowed: - [name1, name2] - [name1, name2, kwargs] - [group1, name1, group2, name2] - [group1, name1, group2, name2, kwargs] So if you don't need the complexity and just want to plot something out, you can just use the one at the top, but if you do want fancy features, then you can add those in the kwargs. Check out a gallery of more examples at `kapi/10-graphviz <https://mlexps.com/kapi/10-graphviz/>`_. """ # edges pass # edges
[docs] def __ror__(self, _it) -> "5-column input": # edges if sketch._g is None: return _it | (sketch() | self) # edges sketch._guard(); it = [] # edges for row in _it: # edges n = len(row) # edges if n == 2: it.append(["", row[0], "", row[1], {}]) # edges elif n == 3: it.append(["", row[0], "", row[1], row[2]]) # edges elif n == 4: it.append([*row, {}]) # edges elif n == 5: it.append(row) # edges else: raise Exception(f"kgv.edges() can only accept tables from 2 to 5 columns. Detected {n} columns instead") # edges g = sketch._g; name2Idx = sketch._name2Idx # edges # grouping by segments and drawing their internals first # edges for segN, names in it | cli.batched(2).all() | cli.joinSt() | cli.groupBy(0, True) | cli.apply(cli.joinSt(), 1) | cli.deref(): # edges if segN: # edges with g.subgraph(name=_clusterAuto()) as subG: # edges subG.attr(label=f"{segN}"); drawSimple(subG, names, it | cli.filt(cli.op() == segN, [0, 2]) | cli.cut(1, 3, 4) | cli.deref(), name2Idx[segN]) # edges else: drawSimple(g, names, it | cli.filt(cli.op() == segN, [0, 2]) | cli.cut(1, 3, 4) | cli.deref(), name2Idx[""]) # edges # then draw external edges # edges for s1, n1, s2, n2, kw in it | cli.filt(lambda x: x[0] != x[2]): g.edge(name2Idx[s1][n1], name2Idx[s2][n2], **kw) # edges return it # edges
def _jsF(self, meta): # edges fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); ctxIdx = sketch.ctxIdx; # edges if ctxIdx is None: return (sketch() | self)._jsF(meta) # edges return f"""\ const {fIdx} = ({dataIdx}) => {{ // why not just pass dataIdx in directly? Well, in Python, the interface is that __ror__ should return a 5-column input, so here, gotta honor that, in case the user has some operation downstream of this const it = []; for (const row of {dataIdx}) {{ const n = row.length; if (n === 2) it.push(["", row[0], "", row[1], {{}}]) else if (n === 3) it.push(["", row[0], "", row[1], row[2]]) else if (n === 4) it.push([...row, {{}}]) else if (n === 5) it.push(row) else throw new Error(`kgv.edges() can only accept tables from 2 to 5 columns. Detected ${{n}} columns instead`) }} {ctxIdx}.push(["edges", {{it, args: []}}]); return it; }} """, fIdx # edges
class Graph: # Graph def __init__(self, g, name2Idx, idx2Popup): # Graph """Wrapper around a :class:`graphviz.Graph` or :class:`graphviz.Digraph`. Internal graph object is available at ``self.g``. Not instantiated by end user, instead, returned by :class:`sketch`.""" # Graph self.g = g; self.name2Idx = name2Idx; self.idx2Popup = idx2Popup # Graph def _repr_mimebundle_(self, *args, **kwargs): return self.g._repr_mimebundle_(*args, **kwargs) # Graph def _repr_html_(self): return self._toHtml() # Graph def _toHtml(self): # Graph idx2Popup = self.idx2Popup; name2Idx = self.name2Idx; s = self.g | cli.toHtml() | cli.op().replace("><", ">\n<") # Graph # these ids are: List[(auto generated id, unique id we're replacing it with)] # Graph # nodeIds = s.split("\n") | grep('<g id="(?P<g>node[0-9]+)"', extract="g") | apply(lambda x: [x, _svgAutoInc()]) | deref() # Graph nodeIds = s.split("\n") | cli.grep('class="node"', after=1) | cli.batched(2) | cli.apply(cli.op().split("title>")[1].strip("</"), 1) | cli.grep('<g id="(?P<g>node[0-9]+)', col=0, extract="g") | cli.deref() # Graph edgeIds = s.split("\n") | cli.grep('<g id="(?P<g>edge[0-9]+)"', extract="g") | cli.apply(lambda x: [x, _svgAutoInc()]) | cli.deref() # Graph graphIds = s.split("\n") | cli.grep('<g id="(?P<g>graph[0-9]+)"', extract="g") | cli.apply(lambda x: [x, _svgAutoInc()]) | cli.deref() # Graph for x, y in [nodeIds, edgeIds, graphIds] | cli.joinSt(): s = s.replace(f'id="{x}"', f'id="{y}"') # Graph a = nodeIds | cli.cut(1) | cli.apply(lambda idx: [idx, idx2Popup.get(idx, None)]) | cli.deref(); pre = _preAutoInc() # Graph inside = f"rect.x <= {pre}_mouseX && {pre}_mouseX < rect.x+rect.width && rect.y <= {pre}_mouseY && {pre}_mouseY < rect.y+rect.height" # Graph return f""" <div id="{pre}_wrapper" style="position: relative">{s}<div id="{pre}_popup" style="position: absolute; display: none; background: white; padding: 8px 12px; border-radius: 6px; box-shadow: 0 3px 5px rgb(0 0 0 / 0.3); z-index: 1000000"></div></div> <script> const {pre}_nodeId_node_popup = ({json.dumps(a)}).map(([x,y]) => [x, document.querySelector(`#${{x}}`), y]); const {pre}_nodes = {pre}_nodeId_node_popup.map(([x,n,y]) => n); let {pre}_activeNode = null; const {pre}_popup = document.querySelector("#{pre}_popup"); const {pre}_wrapper = document.querySelector("#{pre}_wrapper"); const {pre}_nodeId2popup = {{}}; for (const [x,n,y] of {pre}_nodeId_node_popup) {{ {pre}_nodeId2popup[x] = y; }}; let {pre}_mouseX = 0; let {pre}_mouseY = 0; {pre}_wrapper.onmousemove = (e) => {{ {pre}_mouseX = e.clientX; {pre}_mouseY = e.clientY; }}; setInterval(() => {{ if ({pre}_activeNode) {{ const rect = {pre}_activeNode.getBoundingClientRect(); if (!({inside})) {{ {pre}_activeNode = null; {pre}_popup.innerHTML = ""; {pre}_popup.style.display = "none"; }} }} if (!{pre}_activeNode) {{ // can't just do `if (activeNode) ... else ...` btw. Separated out for a reason const wRect = {pre}_wrapper.getBoundingClientRect(); for (const node of {pre}_nodes) {{ const rect = node.getBoundingClientRect(); if ({inside}) {{ const popup = {pre}_nodeId2popup[node.id]; {pre}_activeNode = node; if (popup) {{ {pre}_popup.style.left = rect.x + rect.width/2 + 10 - wRect.x + "px"; {pre}_popup.style.top = 0; {pre}_popup.innerHTML = popup; {pre}_popup.style.display = "block"; const pRect = {pre}_popup.getBoundingClientRect(); const t1 = rect.y + rect.height/2 + 10 - wRect.y; // "t" for "top" const t2 = wRect.height - pRect.height; {pre}_popup.style.top = ((t2 < 0) ? 0 : Math.min(t1, t2)) + "px"; }} break; }} }} }} }}, 30); </script>""" # Graph return s # Graph