# AUTOGENERATED FILE! PLEASE DON'T EDIT
import k1lib
from k1lib.cli import *
__all__ = ["trace"]
traceIdxAuto = k1lib.AutoIncrement()
class TraceData:
def __init__(self, _cli, inS, outS, name=None):
"""
:param inS: in and out strings to be displayed in the edges"""
self.idx = f"{traceIdxAuto()}"
self.inS = inS; self.outS = outS; self.cli = _cli
self.name = name or _cli.__class__.__name__
def __str__(self):
return f"<TraceData idx='{self.idx}' inS='{self.inS}' outS='{self.outS}' name='{self.name}' cli='{self.cli}'>"
def isMTM(c):
if not isinstance(c, BaseCli): return False
if isinstance(c, manyToMany): return True
if isinstance(c, applyMp): return True
if isinstance(c, apply) and isinstance(c.f, BaseCli) and c.column is None: return True
return False
class TraceException(Exception): pass
clusterAuto = k1lib.AutoIncrement()
emptyInputSentinel = object()
class _trace(BaseCli): # "I" for extra internal. Can't use double underscores due to convention
def __init__(self, inp, f, g=None, depth=None):
"""
Some notes. startTd will always tries to grab the first thing, lastTd will only
grab the last thing at the end of __ror__, hence "last" and not "end".
:param inp: initial input to pipe into other cli tools
:param f: function to display result of cli tools, default just shows the shape of the stream
:param env: :class:`graphviz.dot.Digraph` to use (hence subgraph, hence no "start" and "end")"""
if depth is None: depth = k1lib.MaxDepth(float("inf"), 0)
self.inp = inp # will change constantly as new clis are being piped into by trace
self.f = f; self.depth = depth; self._reprRO = k1lib.RunOnce()
if g is None:
self.lastTd = TraceData(None, None, None, "\\<start\\>")
self.g = k1lib.digraph(); self._formNode(self.lastTd)
else: self.g = g; self.lastTd = None
self.firstTime = True # every other time other than the first should not record any data. It should just pass data through
def _formNode(self, td:TraceData, g=None): (g or self.g).node(td.idx, td.name)
def _formEdge(self, td1:TraceData, td2:TraceData, g=None):
if td1 is None or td2 is None: return
(g or self.g).edge(td1.idx, td2.idx, label=f" {td2.inS or td1.outS}")
def _run(self, c, inp):
"""Takes in cli tool and input, runs it, and get trace data and output"""
if isinstance(c, op): c.op_solidify()
out = c(inp) | deref() # why not "inp | c"? Cause we want to serve plain old functions inside apply too
return TraceData(c, f"{self.f(inp)}", f"{self.f(out)}", None), out
def __repr__(self):
try: from IPython.core import display as dis
except: raise RuntimeError("You have to install IPython/execute in a notebook first!")
if not self._reprRO():
td = TraceData(None, self.lastTd.outS, None, "\\<end\\>")
self._formNode(td); self._formEdge(self.lastTd, td)
dis.display(dis.SVG(k1lib.scaleSvg(self.g._repr_svg_(), cliSettings["svgScale"]))); return "<trace object>"
def __ror__(self, it):
"""Alternative way to specify input."""
#if self.inp != emptyInputSentinel: raise TraceException("Input to trace has already been set, but it's being set again (possibly due to `.all()`). Check last trace using ``trace.last``")
if self.inp != emptyInputSentinel: self.firstTime = False
self.inp = it | deref(); return self
def __iter__(self): return self.inp
@k1lib.patch(_trace)
def __or__(self, c):
if self.inp is emptyInputSentinel: return super(_trace, self).__or__(c)
if not isinstance(c, BaseCli): return NotImplemented
if self._reprRO.value: raise RuntimeError("Can't pipe this trace() into another cli tool, as it is used! Make a new trace instead.")
td, out = self._run(c, self.inp) # runs through the entire thing, then decides whether to go into the details or not
if not self.firstTime: return out
if not hasattr(self, "startTd"): self.startTd = td; startTdSet = True
else: startTdSet = False # whether startTd is set lately
def bypass(): # default connection case, don't go into clis and explore
self._formNode(td); self._formEdge(self.lastTd, td); self.lastTd = td
if self.depth and isinstance(c, serial):
with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG:
subG.attr(label="|, serial")
t = _trace(self.inp, self.f, subG, self.depth.enter())
for _c in c.clis: t = t | _c
self._formEdge(self.lastTd, t.startTd); self.lastTd = t.lastTd
if startTdSet: self.startTd = t.startTd
elif self.depth and isMTM(c):
if isinstance(c, (apply, applyMp)): _c = c.f
elif isinstance(c, manyToMany): _c = c.cli
try: singleInp = self.inp | item()
except StopIteration: bypass() # no items at all, can't trace!
else:
with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG:
subG.attr(label=".all(), manyToMany, apply")
t = _trace(self.inp | item(), self.f, subG, self.depth.enter())
o1Td = TraceData(None, self.f(self.inp), None, "*"); self._formNode(o1Td, g=subG); t = t | _c
o2Td = TraceData(None, self.f(t.inp), None, "*"); self._formNode(o2Td, g=subG)
t._formEdge(o1Td, t.startTd); t._formEdge(t.lastTd, o2Td); o2Td.outS = self.f(out)
self._formEdge(self.lastTd, o1Td); self.lastTd = o2Td
if startTdSet: self.startTd = o1Td
elif self.depth and isinstance(c, oneToMany):
with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG:
subG.attr(label="&, oneToMany")
o1Td = TraceData(None, self.f(self.inp), None, "*"); self._formNode(o1Td, g=subG)
o2Td = TraceData(None, None, None, "*"); self._formNode(o2Td, g=subG)
for _c in c.clis:
t = _trace(self.inp, self.f, subG, self.depth.enter()) | _c
self._formEdge(o1Td, t.startTd); self._formEdge(t.lastTd, o2Td)
self._formEdge(self.lastTd, o1Td); self.lastTd = o2Td; o2Td.outS = self.f(out)
if startTdSet: self.startTd = o1Td
elif self.depth and isinstance(c, manyToManySpecific):
with self.g.subgraph(name=f"cluster_{clusterAuto()}") as subG:
subG.attr(label="+, manyToManySpecific")
o1Td = TraceData(None, self.f(self.inp), None, "*"); self._formNode(o1Td, g=subG)
o2Td = TraceData(None, None, self.f(out), "*"); self._formNode(o2Td, g=subG)
for _c, _it in zip(c.clis, self.inp):
t = _trace(_it, self.f, subG, self.depth.enter()) | _c
self._formEdge(o1Td, t.startTd); self._formEdge(t.lastTd, o2Td)
self._formEdge(self.lastTd, o1Td); self.lastTd = o2Td; o2Td.outS = self.f(out)
if startTdSet: self.startTd = o1Td
else: bypass()
self.inp = out; return self
[docs]class trace(_trace):
last = None
"""Last instantiated trace object. Access this to view the previous (possibly nested) trace."""
[docs] def __init__(self, f=shape(), maxDepth=float("inf")):
"""Traces out how the data stream is transformed through complex cli tools.
Example::
# returns [1, 4, 9, 16], normal command
range(1, 5) | apply(lambda x: x**2) | deref()
# traced command, will display how the shapes evolve through cli tools
range(1, 5) | trace() | apply(lambda x: x**2) | deref()
Essentially, this :class:`~k1lib.cli.utils.deref` every stream before and after every
cli tool, and then displays the clis and streams in a graph for visualization.
There're a lot more instructions and code examples over the tutorial section. Go check it out!
:param f: function to display the data stream. Defaulted to :class:`~k1lib.cli.utils.shape`,
and to :class:`~k1lib.cli.utils.iden` if is None."""
f = f or iden()
g = lambda x: f"{f(x)}".split("\n")[:2] | apply(lambda s: f"{s[:50]}..." if len(s) > 50 else s) | join("\n")
super().__init__(emptyInputSentinel, g, depth=k1lib.MaxDepth(maxDepth))
trace.last = self