# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This includes helper clis that make it quick to graph graphviz plots."""
__all__ = ["sketch", "edges"]
import re, k1lib, math, os, numpy as np, io, json, base64, unicodedata, inspect
from k1lib.cli.init import BaseCli; import k1lib.cli as cli, k1lib.cli.init as init
graphviz = k1lib.dep("graphviz")
from collections import deque, defaultdict
settings = k1lib.settings.cli
_svgAutoInc = k1lib.AutoIncrement(prefix="_k1_svg_")
_preAutoInc = k1lib.AutoIncrement(prefix="_k1_svg_pre_")
_clusterAuto = k1lib.AutoIncrement(prefix="cluster_")
_idxAuto = k1lib.AutoIncrement(prefix="_idx_")
[docs]class sketch(BaseCli): # sketch
_g = None; _name2Idx = None; ctxIdx = None # sketch
[docs] def __init__(self, **kwargs): # sketch
"""Similar to :class:`~utils.sketch`, which makes it easier to plot multiple graphs quickly,
this makes it easier to plot node graphs a lot quicker than I have before. This cli
configures the graph in general, but doesn't dive too much into the specifics. Example::
["ab", "bc", "ca"] | (kgv.sketch(engine="sfdp") | kgv.edges())
Most of the complexities are in :class:`edges`, so check that class out for more comprehensive examples
:param kwargs: keyword arguments passed into :class:`graphviz.Digraph`""" # sketch
super().__init__(capture=True); self.kwargs = kwargs # sketch
@staticmethod # sketch
def _guard(): # sketch
if sketch._g is None: raise Exception("Context has not been setup yet, can't proceed to plot the edges/nodes. This could be because you're doing `data | kgv.edges()` directly. Instead, do `data | (kgv.sketch() | kgv.edges())`. The operation `kgv.sketch()` will initialize the graph") # sketch
[docs] def __ror__(self, it): # sketch
sketch._g = g = graphviz.Digraph(**self.kwargs); sketch._idx2Popup = idx2Popup = {} # sketch
sketch._name2Idx = name2Idx = defaultdict(lambda: defaultdict(lambda: _idxAuto())) # sketch
it | self.capturedSerial | cli.deref() # sketch
sketch._g = None; sketch._idx2Popup = None; sketch._name2Idx = None; return Graph(g, name2Idx, idx2Popup) # sketch
def _jsF(self, meta): # sketch
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); ctxIdx = init._jsDAuto(); sketch.ctxIdx = ctxIdx # sketch
header, _fIdx, _async = k1lib.kast.asyncGuard(self.capturedSerial._jsF(meta)) # sketch
res = f"""\
let {ctxIdx} = null;\n{header}
const {fIdx} = async ({dataIdx}) => {{
{ctxIdx} = [];
const out = {'await ' if _async else ''}{_fIdx}({dataIdx});
const res = (await (await fetch("https://local.mlexps.com/routeServer/kapi_10-graphviz", {{
method: "POST",
body: JSON.stringify({{ "obj": {ctxIdx} }}),
headers: {{ "Content-Type": "application/json" }}
}})).json());
if (!res.success) throw new Error(res.reason);
return atob(res.data);
}}""", fIdx # sketch
sketch.ctxIdx = None; return res # sketch
class nodes(BaseCli): # nodes
def __init__(self): # nodes
pass # nodes
def __ror__(self, _it) -> "5-column input": # nodes
if sketch._g is None: return _it | (sketch() | self) # nodes
sketch._guard(); it = [] # nodes
for row in _it: # nodes
n = len(row) # nodes
if n == 1: it.append(["", row[0], {}]) # nodes
elif n == 2: it.append([*row, {}]) # nodes
elif n == 3: it.append(row) # nodes
else: raise Exception(f"kgv.nodes() can only accept tables from 1 to 3 columns. Detected {n} columns instead") # nodes
g = sketch._g; name2Idx = sketch._name2Idx # nodes
for s1,n1,kw in it: # nodes
idx = name2Idx[s1][n1] # nodes
if "popup" in kw: sketch._idx2Popup[idx] = kw["popup"]; kw = dict(kw); del kw["popup"] # nodes
g.node(idx, **kw) # nodes
return it # nodes
def _jsF(self, meta): # nodes
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); ctxIdx = sketch.ctxIdx; # nodes
if ctxIdx is None: return (sketch() | self)._jsF(meta) # nodes
return f"""\
const {fIdx} = ({dataIdx}) => {{
const it = [];
for (const row of {dataIdx}) {{
const n = row.length;
if (n === 1) it.push(["", row[0], {{}}])
else if (n === 2) it.push([...row, {{}}])
else if (n === 3) it.push(row)
else throw new Error(`kgv.nodes() can only accept tables from 1 to 3 columns. Detected ${{n}} columns instead`)
}}
{ctxIdx}.push(["nodes", {{it, args: []}}]); return it;
}}
""", fIdx # nodes
def drawSimple(g, names, data, name2Idx): # data here is List[[name1, name2, kw]], names is List[name] # drawSimple
for name in set(names): g.node(name2Idx[name], name); # drawSimple
for name1, name2, kw in data: g.edge(name2Idx[name1], name2Idx[name2], **kw) # drawSimple
[docs]class edges(BaseCli): # edges
[docs] def __init__(self): # edges
"""Plots out edges of the graph.
Example 1::
["ab", "bc", "ca"] | kgv.edges()
Result:
.. image:: ../images/kgv_edges_1.png
If you need to customize the graph on initialization, then you can use :class:`sketch` to
capture related operations (like :class:`edges`), and inject your params in :class:`sketch`::
["ab", "bc", "ca"] | (kgv.sketch(engine="sfdp") | kgv.edges())
Example 2::
[["a", "b", {"label": "3%"}], ["b", "c", {"color": "red"}], "ac", "cb"] | kgv.edges()
Result:
.. image:: ../images/kgv_edges_2.png
Example 3::
[["group1", "a", "group1", "b", {"label": "3%"}],
"ec",
["group1", "b", "", "c", {"color": "red"}],
["group1", "a", "", "c"],
["", "c", "group1", "b"],
["", "c", "group2", "d", {"color": "green"}]
] | kgv.edges()
Result:
.. image:: ../images/kgv_edges_3.png
So the idea is, each row describes a single edge on the graph. Each row
can be multiple different lengths, but only these configurations are allowed:
- [name1, name2]
- [name1, name2, kwargs]
- [group1, name1, group2, name2]
- [group1, name1, group2, name2, kwargs]
So if you don't need the complexity and just want to plot something out, you
can just use the one at the top, but if you do want fancy features, then you
can add those in the kwargs.
Check out a gallery of more examples at `kapi/10-graphviz <https://mlexps.com/kapi/10-graphviz/>`_.
""" # edges
pass # edges
[docs] def __ror__(self, _it) -> "5-column input": # edges
if sketch._g is None: return _it | (sketch() | self) # edges
sketch._guard(); it = [] # edges
for row in _it: # edges
n = len(row) # edges
if n == 2: it.append(["", row[0], "", row[1], {}]) # edges
elif n == 3: it.append(["", row[0], "", row[1], row[2]]) # edges
elif n == 4: it.append([*row, {}]) # edges
elif n == 5: it.append(row) # edges
else: raise Exception(f"kgv.edges() can only accept tables from 2 to 5 columns. Detected {n} columns instead") # edges
g = sketch._g; name2Idx = sketch._name2Idx # edges
# grouping by segments and drawing their internals first # edges
for segN, names in it | cli.batched(2).all() | cli.joinSt() | cli.groupBy(0, True) | cli.apply(cli.joinSt(), 1) | cli.deref(): # edges
if segN: # edges
with g.subgraph(name=_clusterAuto()) as subG: # edges
subG.attr(label=f"{segN}"); drawSimple(subG, names, it | cli.filt(cli.op() == segN, [0, 2]) | cli.cut(1, 3, 4) | cli.deref(), name2Idx[segN]) # edges
else: drawSimple(g, names, it | cli.filt(cli.op() == segN, [0, 2]) | cli.cut(1, 3, 4) | cli.deref(), name2Idx[""]) # edges
# then draw external edges # edges
for s1, n1, s2, n2, kw in it | cli.filt(lambda x: x[0] != x[2]): g.edge(name2Idx[s1][n1], name2Idx[s2][n2], **kw) # edges
return it # edges
def _jsF(self, meta): # edges
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); ctxIdx = sketch.ctxIdx; # edges
if ctxIdx is None: return (sketch() | self)._jsF(meta) # edges
return f"""\
const {fIdx} = ({dataIdx}) => {{
// why not just pass dataIdx in directly? Well, in Python, the interface is that __ror__ should return a 5-column input, so here, gotta honor that, in case the user has some operation downstream of this
const it = [];
for (const row of {dataIdx}) {{
const n = row.length;
if (n === 2) it.push(["", row[0], "", row[1], {{}}])
else if (n === 3) it.push(["", row[0], "", row[1], row[2]])
else if (n === 4) it.push([...row, {{}}])
else if (n === 5) it.push(row)
else throw new Error(`kgv.edges() can only accept tables from 2 to 5 columns. Detected ${{n}} columns instead`)
}}
{ctxIdx}.push(["edges", {{it, args: []}}]); return it;
}}
""", fIdx # edges
class Graph: # Graph
def __init__(self, g, name2Idx, idx2Popup): # Graph
"""Wrapper around a :class:`graphviz.Graph` or :class:`graphviz.Digraph`. Internal graph
object is available at ``self.g``. Not instantiated by end user, instead, returned by :class:`sketch`.""" # Graph
self.g = g; self.name2Idx = name2Idx; self.idx2Popup = idx2Popup # Graph
def _repr_mimebundle_(self, *args, **kwargs): return self.g._repr_mimebundle_(*args, **kwargs) # Graph
def _repr_html_(self): return self._toHtml() # Graph
def _toHtml(self): # Graph
idx2Popup = self.idx2Popup; name2Idx = self.name2Idx; s = self.g | cli.toHtml() | cli.op().replace("><", ">\n<") # Graph
# these ids are: List[(auto generated id, unique id we're replacing it with)] # Graph
# nodeIds = s.split("\n") | grep('<g id="(?P<g>node[0-9]+)"', extract="g") | apply(lambda x: [x, _svgAutoInc()]) | deref() # Graph
nodeIds = s.split("\n") | cli.grep('class="node"', after=1) | cli.batched(2) | cli.apply(cli.op().split("title>")[1].strip("</"), 1) | cli.grep('<g id="(?P<g>node[0-9]+)', col=0, extract="g") | cli.deref() # Graph
edgeIds = s.split("\n") | cli.grep('<g id="(?P<g>edge[0-9]+)"', extract="g") | cli.apply(lambda x: [x, _svgAutoInc()]) | cli.deref() # Graph
graphIds = s.split("\n") | cli.grep('<g id="(?P<g>graph[0-9]+)"', extract="g") | cli.apply(lambda x: [x, _svgAutoInc()]) | cli.deref() # Graph
for x, y in [nodeIds, edgeIds, graphIds] | cli.joinSt(): s = s.replace(f'id="{x}"', f'id="{y}"') # Graph
a = nodeIds | cli.cut(1) | cli.apply(lambda idx: [idx, idx2Popup.get(idx, None)]) | cli.deref(); pre = _preAutoInc() # Graph
inside = f"rect.x <= {pre}_mouseX && {pre}_mouseX < rect.x+rect.width && rect.y <= {pre}_mouseY && {pre}_mouseY < rect.y+rect.height" # Graph
return f"""
<div id="{pre}_wrapper" style="position: relative">{s}<div id="{pre}_popup" style="position: absolute; display: none; background: white; padding: 8px 12px; border-radius: 6px; box-shadow: 0 3px 5px rgb(0 0 0 / 0.3); z-index: 1000000"></div></div>
<script>
const {pre}_nodeId_node_popup = ({json.dumps(a)}).map(([x,y]) => [x, document.querySelector(`#${{x}}`), y]);
const {pre}_nodes = {pre}_nodeId_node_popup.map(([x,n,y]) => n); let {pre}_activeNode = null;
const {pre}_popup = document.querySelector("#{pre}_popup"); const {pre}_wrapper = document.querySelector("#{pre}_wrapper");
const {pre}_nodeId2popup = {{}}; for (const [x,n,y] of {pre}_nodeId_node_popup) {{ {pre}_nodeId2popup[x] = y; }};
let {pre}_mouseX = 0; let {pre}_mouseY = 0; {pre}_wrapper.onmousemove = (e) => {{ {pre}_mouseX = e.clientX; {pre}_mouseY = e.clientY; }};
setInterval(() => {{
if ({pre}_activeNode) {{
const rect = {pre}_activeNode.getBoundingClientRect();
if (!({inside})) {{ {pre}_activeNode = null; {pre}_popup.innerHTML = ""; {pre}_popup.style.display = "none"; }}
}}
if (!{pre}_activeNode) {{ // can't just do `if (activeNode) ... else ...` btw. Separated out for a reason
const wRect = {pre}_wrapper.getBoundingClientRect();
for (const node of {pre}_nodes) {{
const rect = node.getBoundingClientRect();
if ({inside}) {{
const popup = {pre}_nodeId2popup[node.id];
{pre}_activeNode = node;
if (popup) {{
{pre}_popup.style.left = rect.x + rect.width/2 + 10 - wRect.x + "px";
{pre}_popup.style.top = 0; {pre}_popup.innerHTML = popup; {pre}_popup.style.display = "block";
const pRect = {pre}_popup.getBoundingClientRect();
const t1 = rect.y + rect.height/2 + 10 - wRect.y; // "t" for "top"
const t2 = wRect.height - pRect.height;
{pre}_popup.style.top = ((t2 < 0) ? 0 : Math.min(t1, t2)) + "px";
}}
break;
}}
}}
}}
}}, 30);
</script>""" # Graph
return s # Graph