# AUTOGENERATED FILE! PLEASE DON'T EDIT
import k1lib, os, dill, time, inspect, json, base64; k1 = k1lib
from typing import List
import k1lib.cli as cli; from k1lib.cli import *
from collections import defaultdict
try: import PIL.Image, PIL; hasPIL = True
except: PIL = k1.dep("PIL"); hasPIL = False
__all__ = ["FromNotebook", "FromPythonFile", "BuildPythonFile", "BuildDashFile", "StartServer", "GenerateHtml", "commonCbs", "serve",
"text", "slider", "html", "analyze", "webToPy", "pyToWeb"]
basePath = os.path.dirname(inspect.getabsfile(k1lib)) + os.sep + "serve" + os.sep
[docs]class FromNotebook(k1.Callback):
[docs] def __init__(self, fileName, tagName="serve"):
"""Grabs source code from a Jupyter notebook. Will grab cells with the comment
like ``# serve`` in the first line.
:param tagName: tag name on the first line of the cell to pull out from"""
super().__init__(); self.fileName = fileName; self.tagName = tagName
[docs] def fetchSource(self): self.l["sourceCode"] = cli.nb.cells(self.fileName) | cli.nb.pretty(whitelist=[self.tagName]) | cli.filt(cli.op()["cell_type"] == "code") | (cli.op()["source"] | ~cli.head(1)).all() | cli.joinStreams() | cli.deref()
[docs]class FromPythonFile(k1.Callback):
[docs] def __init__(self, fileName):
"""Grabs source code from a python file."""
super().__init__(); self.fileName = fileName
[docs] def fetchSource(self): self.l["sourceCode"] = cli.cat(self.fileName) | cli.deref()
[docs]class BuildPythonFile(k1.Callback):
[docs] def __init__(self, port=None):
"""Builds the output Python file, ready to be served on localhost.
:param port: which port to run on localhost. If not given, then a port will
be picked at random, and will be available at ``cbs.l['port']``"""
super().__init__(); self.port = port; self.suffix = "suffix.py"
[docs] def buildPythonFile(self):
# simple prefix
self.l["pythonFile"] = ["from k1lib.imports import *", *self.l["sourceCode"]] | cli.file()
# grabs random free port if one is not available
if self.port is None: import socket; sock = socket.socket(); sock.bind(('', 0)); self.l["port"] = port = sock.getsockname()[1]; sock.close()
else: port = self.port
# grabs temp meta file for communication
self.l["metaFile"] = metaFile = "" | cli.file(); os.remove(metaFile)
# actually has enough info to build the server
(cli.cat(f"{basePath}{self.suffix}") | cli.op().replace("SOCKET_PORT", f"{port}").replace("META_FILE", metaFile).all()) >> cli.file(self.l["pythonFile"])
[docs]class BuildDashFile(BuildPythonFile):
[docs] def __init__(self):
"""Builds the output Python file for a Dash app, ready to be served on localhost"""
super().__init__()
self.suffix = "suffix-dash.py"
[docs]class StartServer(k1.Callback):
[docs] def __init__(self, initTime=10):
"""Starts the server, verify that it starts okay and dumps meta information (including
function signatures) to ``cbs.l``
:param initTime: time to wait in seconds until the server is online before declaring it's unsuccessful"""
super().__init__(); self.initTime = initTime
[docs] def startServer(self):
None | cli.cmd(f"python {self.l['pythonFile']} &"); count = 0; initTime = self.initTime
while not os.path.exists(self.l["metaFile"]):
if count > initTime/0.1: raise Exception(f"Tried to start server up, but no responses yet. Port: {self.l['port']}, pythonFile: {self.l['pythonFile']}, metaFile: {self.l['metaFile']}")
count += 1; time.sleep(0.1)
self.l["meta"] = meta = self.l["metaFile"] | cli.cat(text=False) | cli.aS(dill.loads)
[docs]class GenerateHtml(k1.Callback):
[docs] def __init__(self, serverPrefix=None, htmlFile=None, title="Interactive demo"):
"""Generates a html file that communicates with the server.
:param serverPrefix: prefix of server for back and forth requests, like "https://example.com/proj1". If
empty, tries to grab ``cbs.l["serverPrefix"]``, which you can deposit from your own callback. If
that's not available then it will fallback to ``localhost:port``
:param htmlFile: path of the target html file. If not specified then a temporary file
will be created and made available in ``cbs.l["htmlFile"]``
:param title: title of html page"""
super().__init__(); self.serverPrefix = serverPrefix; self.htmlFile = htmlFile; self.title = title
[docs] def generateHtml(self):
meta = dict(self.l["meta"]); meta = meta | cli.aS(json.dumps)
replaces = cli.op().replace("META_JSON", meta)\
.replace("SERVER_PREFIX", self.serverPrefix or self.l["serverPrefix"] or f"http://localhost:{self.l['port']}")\
.replace("TITLE", self.title)
self.l["htmlFile"] = cli.cat(f"{basePath}main.html") | replaces.all() | cli.file(self.htmlFile)
[docs]def commonCbs():
"""Grabs common callbacks, including :class:`BuildPythonFile` and :class:`StartServer`"""
return k1.Callbacks().add(BuildPythonFile()).add(StartServer());
[docs]def serve(cbs):
"""Runs the serving pipeline."""
import flask, flask_cors
cbs.l = defaultdict(lambda: None)
cbs("begin")
cbs("fetchSource") # fetches cells
cbs("beforeBuildPythonFile"); cbs("buildPythonFile") # builds python server file
cbs("beforeStartServer"); cbs("startServer") # starts serving the model on localhost and add more meta info
cbs("beforeGenerateHtml"); cbs("generateHtml") # produces a standalone html file that provides interactive functionalities
cbs("end")
return cbs
class baseType:
def __init__(self):
"""Base type for all widget types"""
pass
def getConfig(self): return NotImplemented
[docs]class text(baseType):
[docs] def __init__(self, multiline:bool=True, password:bool=False):
"""Represents text, either on single or multiple lines.
If `password` is true, then will set multiline to false automatically,
and creates a text box that blurs out the contents"""
super().__init__(); self.multiline = multiline if not password else False; self.password = password
def __repr__(self): return f"<text multiline={self.multiline}>"
[docs]class html(baseType):
def __init__(self): super().__init__()
def __repr__(self): return f"<html>"
[docs]class slider(baseType):
[docs] def __init__(self, start:float, stop:float, intervals:int=100):
"""Represents a slider from `start` to `stop` with a bunch of
intervals in the middle. If `defValue` is not specified, uses the
middle point between start and stop"""
super().__init__(); self.start = start; self.stop = stop; self.intervals = intervals; self.dt = (stop-start)/intervals
def __repr__(self): return f"<slider {self.start}---{self.intervals}-->{self.stop}>"
def refine(param:str, anno:baseType, default): # anno is not necessarily baseType, can be other types like "int"
if anno == int: return [param, "int", [default, False]]
if anno == float: return [param, "float", [default, False]]
multiline = lambda s: len(s.split("\n")) > 1 or len(s) > 100
if anno == bool: return [param, "checkbox", default]
if anno == str: return [param, "text", [default, multiline(default or "")]]
if isinstance(anno, text): return [param, "text", [default, anno.multiline, anno.password]]
if isinstance(anno, slider): return [param, "slider", [default, anno.start, anno.stop, anno.dt]]
if isinstance(anno, range): return [param, "slider", [default, anno.start, anno.stop, anno.step]]
byte2Str = aS(base64.b64encode) | op().decode("ascii")
if hasPIL and anno == PIL.Image.Image: return [param, "image", (default | toBytes() | byte2Str) if default is not None else None]
if anno == bytes: return [param, "bytes", (default | byte2Str) if default is not None else None]
if isinstance(anno, list): anno | apply(str) | deref(); return [param, "dropdown", [anno.index(default), anno]]
if isinstance(anno, html): return [param, "html", [default]]
raise Exception(f"Unknown type {anno}")
[docs]def analyze(f):
spec = inspect.getfullargspec(f); args = spec.args; n = len(args)
annos = spec.annotations; defaults = spec.defaults or ()
docs = (f.__doc__ or "").split("\n") | grep(":param", sep=True).till() | filt(op().ab_len() > 0) | op().strip().all(2) | (join(" ") | op().split(":") | ~aS(lambda x, y, *z: [y.split(" ")[1], ":".join(z).strip()])).all() | toDict()
mainDoc = (f.__doc__ or " ").split("\n") | grep(".", sep=True).till(":param") | breakIf(op()[0].startswith(":param")) | join(" ").all() | join(" ")
if len(annos) != n + 1: raise Exception(f"Please annotate all of your arguments ({n} args + 1 return != {len(annos)} annos). Args: {args}, annos: {annos}")
if len(defaults) != n: raise Exception(f"Please specify default values for all of your arguments ({n} args != {len(defaults)} default values)")
a = [args, args | lookup(annos), defaults] | transpose() | ~apply(refine) | deref()
ret = refine("return", annos["return"], None)[1]; defaults = a | cut(0, 2) | toDict()
annos = a | cut(0, 1) | toDict(); annos["return"] = ret
if ret == "slider": raise Exception(f"Return value is a slider, which doesn't really make sense. Return float, str or sth like that")
# args:list, annos:dict, defaults:list, docs:dict
return {"args": args, "annos": annos, "defaults": defaults, "docs": docs,
"mainDoc": mainDoc, "source": inspect.getsource(f), "pid": os.getpid()}
return args, annos, defaults, docs, mainDoc, d
[docs]def webToPy(o:str, klass:baseType):
o = str(o)
if klass == "int": return int(float(o))
if klass == "float": return float(o)
if klass == "slider": o = float(o); return int(o) if round(o) == o else o
if klass == "text" or klass == "dropdown": return o
if klass == "checkbox": return o.lower() == "true"
if klass == "bytes": return o | aS(base64.b64decode)
if klass == "image": return o | aS(base64.b64decode) | toImg()
return NotImplemented
[docs]def pyToWeb(o, klass:baseType) -> str:
if klass in ("int", "float", "text", "checkbox"): return f"{o}"
if klass == "slider": return NotImplemented
if klass == "bytes": return o | aS(base64.b64encode) | op().decode()
if klass == "image": return o | toBytes() | aS(base64.b64encode)
if klass == "dropdown": return o;
if klass == "html": return o.encode() | aS(base64.b64encode) | op().decode()
return NotImplemented