# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for all short utilities that converts from 1 data type to another. They
might feel they have different styles, as :class:`toFloat` converts object iterator to
float iterator, while :class:`toPIL` converts single image url to single PIL image,
whereas :class:`toSum` converts float iterator into a single float value.
The general convention is, if the intended operation sounds simple (convert to floats,
strings, types, ...), then most likely it will convert iterator to iterator, as you
can always use the function directly if you only want to apply it on 1 object.
If it sounds complicated (convert to PIL image, tensor, ...) then most likely it will
convert object to object. Lastly, there are some that just feels right to input
an iterator and output a single object (like getting max, min, std, mean values)."""
__all__ = ["toTensor", "toRange", "toList",
"toSum", "toProd", "toAvg", "toMean", "toStd", "toMax", "toMin", "toArgmin", "toArgmax",
"toPIL", "toImg", "toRgb", "toRgba", "toGray", "toDict",
"toFloat", "toInt", "toBytes", "toDataUri", "toAnchor", "toHtml",
"toAscii", "toHash", "toCsv", "toAudio", "toUnix", "toIso", "toYMD", "toLinks",
"toMovingAvg", "toCm"]
import re, k1lib, math, os, numpy as np, io, json, base64, unicodedata, inspect
from k1lib.cli.init import BaseCli, T, yieldT; import k1lib.cli as cli, k1lib.cli.init as init
from k1lib.cli.typehint import *; mpl = k1lib.dep("matplotlib"); plt = k1lib.dep("matplotlib.pyplot")
from collections import deque, defaultdict; from typing import Iterator, Any, List, Set, Tuple, Dict, Callable, Union
settings = k1lib.settings.cli
try: import PIL; hasPIL = True
except: hasPIL = False
try: import torch; hasTorch = True
except: torch = k1lib.dep("torch"); hasTorch = False
try: import rdkit; hasRdkit = True
except: hasRdkit = False
try: import graphviz; hasGraphviz = True
except: hasGraphviz = False
try: import plotly; import plotly.express as px; hasPlotly = True
except: hasPlotly = False
[docs]class toTensor(BaseCli): # toTensor
[docs] def __init__(self, dtype=None): # toTensor
"""Converts generator to :class:`torch.Tensor`. Essentially
``torch.tensor(list(it))``. Default dtype is float32
Also checks if input is a PIL Image. If yes, turn it into a :class:`torch.Tensor`
and return.""" # toTensor
self.dtype = dtype or torch.float32 # toTensor
[docs] def __ror__(self, it:Iterator[float]) -> "torch.Tensor": # toTensor
try: # toTensor
import PIL; pic=it # toTensor
if isinstance(pic, PIL.Image.Image): # stolen from torchvision ToTensor transform # toTensor
mode_to_nptype = {'I': np.int32, 'I;16': np.int16, 'F': np.float32} # toTensor
img = torch.from_numpy(np.array(pic, mode_to_nptype.get(pic.mode, np.uint8), copy=True)) # toTensor
if pic.mode == '1': img = 255 * img # toTensor
img = img.view(pic.size[1], pic.size[0], len(pic.getbands())) # toTensor
return img.permute((2, 0, 1)).contiguous().to(self.dtype) # put it from HWC to CHW format # toTensor
except: pass # toTensor
if isinstance(it, np.ndarray): return torch.tensor(it).to(self.dtype) # toTensor
return torch.tensor(list(it)).to(self.dtype) # toTensor
[docs]class toList(BaseCli): # this still exists cause some LLVM optimizations are done on this, and too tired to change that at the moment # toList
[docs] def __init__(self): # toList
"""Converts generator to list.
Example::
# returns [0, 1, 2, 3, 4]
range(5) | toList()
# returns [0, 1, 2, 3, 4]
range(5) | aS(list)
So this cli is sort of outdated. It still works fine, nothing wrong
with it, but just do ``aS(list)`` instead. It's not removed to
avoid breaking old projects.""" # toList
super().__init__() # toList
def _typehint(self, inp): # toList
if isinstance(inp, tListIterSet): return tList(inp.child) # toList
if isinstance(inp, tCollection): return inp # toList
return tList(tAny()) # toList
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: return list(it) # toList
def _jsF(self, meta): # toList
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toList
return f"const {fIdx} = ({dataIdx}) => {dataIdx}", fIdx # toList
def _toRange(it): # _toRange
for i, _ in enumerate(it): yield i # _toRange
[docs]class toRange(BaseCli): # toRange
[docs] def __init__(self): # toRange
"""Returns iter(range(len(it))), effectively.
Example::
# returns [0, 1, 2]
[3, 2, 5] | toRange() | deref()""" # toRange
super().__init__() # toRange
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[int]: # toRange
try: return range(len(it)) # toRange
except: return _toRange(it) # toRange
def _jsF(self, meta): # toRange
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toRange
return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toRange()", fIdx # toRange
tOpt.addPass(lambda cs, ts, _: [cs[0]], [toRange, toRange]) # toRange
settings.add("arrayTypes", (torch.Tensor, np.ndarray) if hasTorch else (np.ndarray,), "default array types used to accelerate clis") # toRange
def genericTypeHint(inp): # genericTypeHint
if isinstance(inp, tListIterSet): return inp.child # genericTypeHint
if isinstance(inp, tCollection): return inp.children[0] # genericTypeHint
if isinstance(inp, tArrayTypes): return inp.child # genericTypeHint
return tAny() # genericTypeHint
[docs]class toSum(BaseCli): # toSum
[docs] def __init__(self): # toSum
"""Calculates the sum of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 45
range(10) | toSum()""" # toSum
super().__init__() # toSum
def _all_array_opt(self, it, level): # toSum
bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None) # toSum
return NotImplemented if bm is None else bm.sum(it, tuple(range(level, len(it.shape)))) # toSum
def _typehint(self, inp): return genericTypeHint(inp) # toSum
[docs] def __ror__(self, it:Iterator[float]): # toSum
if isinstance(it, settings.arrayTypes): return it.sum() # toSum
return sum(it) # toSum
def _jsF(self, meta): # toSum
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toSum
return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toSum()", fIdx # toSum
[docs]class toProd(BaseCli): # toProd
[docs] def __init__(self): # toProd
"""Calculates the product of a list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 362880
range(1,10) | toProd()""" # toProd
super().__init__() # toProd
def _all_array_opt(self, it, level): # toProd
if isinstance(it, np.ndarray): return np.prod(it, tuple(range(level, len(it.shape)))) # toProd
elif hasTorch and isinstance(it, torch.Tensor): # toProd
for i in range(level, len(it.shape)): it = torch.prod(it, level) # toProd
return it # toProd
return NotImplemented # toProd
def _typehint(self, inp): return genericTypeHint(inp) # toProd
[docs] def __ror__(self, it): # toProd
if isinstance(it, settings.arrayTypes): return it.prod() # toProd
else: return math.prod(it) # toProd
def _jsF(self, meta): # toProd
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toProd
return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toProd()", fIdx # toProd
[docs]class toAvg(BaseCli): # toAvg
[docs] def __init__(self): # toAvg
"""Calculates average of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 4.5
range(10) | toAvg()
# returns nan
[] | toAvg()""" # toAvg
super().__init__() # toAvg
def _all_array_opt(self, it, level): # toAvg
bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None) # toAvg
return NotImplemented if bm is None else bm.mean(it, tuple(range(level, len(it.shape)))) # toAvg
def _typehint(self, inp): # toAvg
i = None # toAvg
if isinstance(inp, tListIterSet): i = inp.child # toAvg
if isinstance(inp, tCollection): i = inp.children[0] # toAvg
if isinstance(inp, tArrayTypes): i = inp.child # toAvg
if i is not None: return float if i == int else i # toAvg
return tAny() # toAvg
[docs] def __ror__(self, it:Iterator[float]): # toAvg
if isinstance(it, settings.arrayTypes): return it.mean() # toAvg
s = 0; i = -1 # toAvg
for i, v in enumerate(it): s += v # toAvg
i += 1 # toAvg
if not k1lib.settings.cli.strict and i == 0: return float("nan") # toAvg
return s / i # toAvg
def _jsF(self, meta): # toAvg
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toAvg
return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toAvg()", fIdx # toAvg
if hasTorch: # toAvg
torchVer = int(torch.__version__.split(".")[0]) # toAvg
if torchVer >= 2: # toAvg
def torchStd(it, ddof, dim=None): return torch.std(it, dim, correction=ddof) # toAvg
else: # toAvg
def torchStd(it, ddof, dim=None): # toAvg
if ddof == 0: return torch.std(it, dim, unbiased=False) # toAvg
if ddof == 1: return torch.std(it, dim, unbiased=True) # toAvg
raise Exception(f"Please install PyTorch 2, as version 1 don't support correction factor of {ddof}") # toAvg
else: # toAvg
def torchStd(it, ddof): raise Exception("PyTorch not installed") # toAvg
[docs]class toStd(BaseCli): # toStd
[docs] def __init__(self, ddof:int=0): # toStd
"""Calculates standard deviation of list of numbers. Can pipe in :class:`torch.Tensor`
or :class:`numpy.ndarray` to be faster. Example::
# returns 2.8722813232690143
range(10) | toStd()
# returns nan
[] | toStd()
:param ddof: "delta degree of freedom". The divisor used in calculations is ``N - ddof``""" # toStd
self.ddof = ddof # toStd
def _all_array_opt(self, it, level): # toStd
n = len(it.shape); ddof = self.ddof; dim = tuple(range(level, n)) # toStd
if isinstance(it, np.ndarray): return np.std(it, ddof=ddof, axis=dim) # toStd
elif hasTorch and isinstance(it, torch.Tensor): return torchStd(it, ddof, dim) # toStd
return NotImplemented # toStd
[docs] def __ror__(self, it): # toStd
ddof = self.ddof # toStd
if isinstance(it, settings.arrayTypes): # toStd
if isinstance(it, np.ndarray): return np.std(it, ddof=ddof) # toStd
elif hasTorch and isinstance(it, torch.Tensor): return torchStd(it, ddof) # toStd
return np.std(np.array(list(it))) # toStd
def _jsF(self, meta): # toStd
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toStd
return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toStd()", fIdx # toStd
toMean = toAvg # toStd
[docs]class toMax(BaseCli): # toMax
[docs] def __init__(self): # toMax
"""Calculates the max of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 6
[2, 5, 6, 1, 2] | toMax()""" # toMax
super().__init__() # toMax
def _all_array_opt(self, it, level): # toMax
if isinstance(it, np.ndarray): return np.max(it, tuple(range(level, len(it.shape)))) # toMax
elif hasTorch and isinstance(it, torch.Tensor): # toMax
for i in range(level, len(it.shape)): it = torch.max(it, level)[0] # toMax
return it # toMax
return NotImplemented # toMax
[docs] def __ror__(self, it:Iterator[float]) -> float: # toMax
if isinstance(it, settings.arrayTypes): return it.max() # toMax
return max(it) # toMax
def _jsF(self, meta): # toMax
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toMax
return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toMax()", fIdx # toMax
[docs]class toMin(BaseCli): # toMin
[docs] def __init__(self): # toMin
"""Calculates the min of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 1
[2, 5, 6, 1, 2] | toMin()""" # toMin
super().__init__() # toMin
def _all_array_opt(self, it, level): # toMin
if isinstance(it, np.ndarray): return np.min(it, tuple(range(level, len(it.shape)))) # toMin
elif hasTorch and isinstance(it, torch.Tensor): # toMin
for i in range(level, len(it.shape)): it = torch.min(it, level)[0] # toMin
return it # toMin
return NotImplemented # toMin
[docs] def __ror__(self, it:Iterator[float]) -> float: # toMin
if isinstance(it, settings.arrayTypes): return it.min() # toMin
return min(it) # toMin
def _jsF(self, meta): # toMin
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toMin
return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toMin()", fIdx # toMin
[docs]class toArgmin(BaseCli): # toArgmin
[docs] def __init__(self): # toArgmin
"""Get the input iterator's index of the min value.
Example::
[2, 3, 4, 1, 5] | toArgmin() # returns 3
""" # toArgmin
pass # toArgmin
[docs] def __ror__(self, it): # toArgmin
if isinstance(it, k1lib.settings.cli.arrayTypes): return it.argmin().item() # toArgmin
else: # toArgmin
try: len(it); return np.array(it) | self # toArgmin
except: np.array(list(it)) | self # toArgmin
[docs]class toArgmax(BaseCli): # toArgmax
[docs] def __init__(self): # toArgmax
"""Get the input iterator's index of the max value.
Example::
[2, 3, 4, 1, 5] | toArgmin() # returns 4
""" # toArgmax
pass # toArgmax
[docs] def __ror__(self, it): # toArgmax
if isinstance(it, k1lib.settings.cli.arrayTypes): return it.argmax().item() # toArgmax
else: # toArgmax
try: len(it); return np.array(it) | self # toArgmax
except: np.array(list(it)) | self # toArgmax
settings.add("font", None, "default font file. Best to use .ttf files, used by toPIL()") # toArgmax
settings.add("chem", k1lib.Settings().add("imgSize", 200, "default image size used in toPIL() when drawing rdkit molecules"), "chemistry-related settings") # toArgmax
def cropToContentNp(ogIm, pad=10): # cropToContentNp
dim = len(ogIm.shape); im = ogIm # cropToContentNp
if dim > 2: im = im.mean(0) # cropToContentNp
coords = np.argwhere(im.max()-im); x_min, y_min = coords.min(axis=0); x_max, y_max = coords.max(axis=0) # cropToContentNp
return ogIm[x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad] if dim == 2 else ogIm[:,x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad] # cropToContentNp
def cropToContentPIL(im, pad=0): # cropToContentPIL
im = im | toTensor(int) | cli.op().numpy() | cli.aS(cropToContentNp, pad) # cropToContentPIL
return torch.from_numpy(im).permute(1, 2, 0) | toImg() if len(im.shape) > 2 else im | toImg() # cropToContentPIL
[docs]class toPIL(BaseCli): # toPIL
[docs] def __init__(self, closeFig=True, crop=True): # toPIL
"""Converts multiple data types into a PIL image.
Example::
# grabs first image in the current folder
ls(".") | toPIL().all() | item()
# converts from tensor/array to image
torch.randn(100, 200) | toPIL()
# grabs image, converts to byte stream, and converts back to image
"abc.jpg" | toPIL() | toBytes() | toPIL()
# converts paragraphs to image
["abc", "def"] | toPIL()
# converts SMILES string to molecule, then to image
"c1ccc(C)cc1" | toMol() | toImg()
# sketches a graphviz plot, converts to svg then renders the svg as an image
["ab", "bc", "ca"] | (kgv.sketch() | kgv.edges()) | toHtml() | toImg()
You can also save a matplotlib figure by piping in a :class:`matplotlib.figure.Figure` object::
x = np.linspace(0, 4)
plt.plot(x, x**2)
plt.gcf() | toPIL()
.. note::
If you are working with image tensors, which is typically have
dimensions of (C, H, W), you have to permute it to PIL's (H, W, C)
first before passing it into this cli.
Also it's expected that
your tensor image ranges from 0-255, and not 0-1. Make sure you
renormalize it
:param closeFig: if input is a matplotlib figure, then closes the figure after generating the image
:param crop: whether to crop white spaces around an image or not""" # toPIL
import PIL; self.PIL = PIL; self.closeFig = closeFig; self.crop = crop # toPIL
def _typehint(self, inp): # toPIL
return PIL.Image.Image # toPIL
[docs] def __ror__(self, path) -> "PIL.Image.Image": # toPIL
if isinstance(path, Svg): # toPIL
import tempfile; a = tempfile.NamedTemporaryFile() # toPIL
import cairosvg; cairosvg.svg2png(bytestring=path,write_to=a.name); im = a.name | toImg() # toPIL
return im # toPIL
if isinstance(path, str): # toPIL
return self.PIL.Image.open(os.path.expanduser(path)) # toPIL
if isinstance(path, bytes): # toPIL
return self.PIL.Image.open(io.BytesIO(path)) # toPIL
if isinstance(path, torch.Tensor): path = path.numpy() # toPIL
if isinstance(path, np.ndarray): # toPIL
return self.PIL.Image.fromarray(path.astype("uint8")) # toPIL
if isinstance(path, mpl.figure.Figure): # toPIL
canvas = path.canvas; canvas.draw() # toPIL
img = self.PIL.Image.frombytes('RGB', canvas.get_width_height(), canvas.tostring_rgb()) # toPIL
if self.closeFig: plt.close(path) # toPIL
return img | cli.aS(cropToContentPIL) # toPIL
if hasGraphviz and isinstance(path, graphviz.Digraph): # toPIL
import tempfile; a = tempfile.NamedTemporaryFile() # toPIL
path.render(a.name, format="jpeg"); # toPIL
fn = f"{a.name}.jpeg"; im = fn | toImg() # toPIL
try: os.remove(fn) # toPIL
except: pass # toPIL
return im # toPIL
if hasRdkit and isinstance(path, rdkit.Chem.rdchem.Mol): # toPIL
sz = settings.chem.imgSize # toPIL
return self.__ror__(rdkit.Chem.Draw.MolsToGridImage([path], subImgSize=[sz, sz]).data) | (cli.aS(cropToContentPIL) if self.crop else cli.iden()) # toPIL
path = path | cli.deref() # toPIL
if len(path) > 0 and isinstance(path[0], str): # toPIL
from PIL import ImageDraw # toPIL
h = path | cli.shape(0); w = path | cli.shape(0).all() | cli.aS(max) # toPIL
image = self.PIL.Image.new("L", ((w+1)*20, (h+1)*60), 255) # toPIL
font = PIL.ImageFont.truetype(settings.font, 18) if settings.font else None # toPIL
ImageDraw.Draw(image).text((20, 20), path | cli.join("\n"), 0, font=font) # toPIL
return np.array(image)/255 | (cli.aS(cropToContentNp) if self.crop else iden()) | cli.op()*255 | toImg() # toPIL
return NotImplemented # toPIL
toImg = toPIL # toPIL
[docs]class toRgb(BaseCli): # toRgb
[docs] def __init__(self): # toRgb
"""Converts greyscale/rgb PIL image to rgb image.
Example::
# reads image file and converts it to rgb
"a.png" | toPIL() | toRgb()""" # toRgb
import PIL; self.PIL = PIL # toRgb
def _typehint(self, inp): return inp # toRgb
[docs] def __ror__(self, i): # toRgb
if i.getbands() == ("R", "G", "B"): return i # toRgb
rgbI = self.PIL.Image.new("RGB", i.size) # toRgb
rgbI.paste(i); return rgbI # toRgb
[docs]class toRgba(BaseCli): # toRgba
[docs] def __init__(self): # toRgba
"""Converts random PIL image to rgba image.
Example::
# reads image file and converts it to rgba
"a.png" | toPIL() | toRgba()""" # toRgba
import PIL; self.PIL = PIL # toRgba
def _typehint(self, inp): return inp # toRgba
[docs] def __ror__(self, i): # toRgba
if i.getbands() == ("R", "G", "B", "A"): return i # toRgba
rgbI = self.PIL.Image.new("RGBA", i.size) # toRgba
rgbI.paste(i); return rgbI # toRgba
[docs]class toGray(BaseCli): # toGray
[docs] def __init__(self): # toGray
"""Converts random PIL image to a grayscale image.
Example::
# reads image file and converts it to rgba
"a.png" | toPIL() | toGray()""" # toGray
import PIL; self.PIL = PIL # toGray
def _typehint(self, inp): return inp # toGray
[docs] def __ror__(self, i): # toGray
if i.getbands() == ("L"): return i # toGray
return self.PIL.ImageOps.grayscale(i) # toGray
[docs]class toDict(BaseCli): # toDict
[docs] def __init__(self, rows=True, f=None): # toDict
"""Converts 2 Iterators, 1 key, 1 value into a dictionary.
Example::
# returns {1: 3, 2: 4}
[[1, 3], [2, 4]] | toDict()
# returns {1: 3, 2: 4}
[[1, 2], [3, 4]] | toDict(False)
If ``rows`` is a string, then it will build a dictionary from key-value
pairs delimited by this character. For example::
['gene_id "ENSG00000290825.1"',
'transcript_id "ENST00000456328.2"',
'gene_type "lncRNA"',
'gene_name "DDX11L2"',
'transcript_type "lncRNA"',
'transcript_name "DDX11L2-202"',
'level 2',
'transcript_support_level "1"',
'tag "basic"',
'tag "Ensembl_canonical"',
'havana_transcript "OTTHUMT00000362751.1"'] | toDict(" ")
That returns::
{'gene_id': '"ENSG00000290825.1"',
'transcript_id': '"ENST00000456328.2"',
'gene_type': '"lncRNA"',
'gene_name': '"DDX11L2"',
'transcript_type': '"lncRNA"',
'transcript_name': '"DDX11L2-202"',
'level': '2',
'transcript_support_level': '"1"',
'tag': '"Ensembl_canonical"',
'havana_transcript': '"OTTHUMT00000362751.1"'}
:param rows: if True, reads input in row by row, else reads
in list of columns
:param f: if specified, return a defaultdict that uses this function as its generator""" # toDict
self.rows = rows # toDict
if f is not None: self.f = lambda d: defaultdict(f, d) # toDict
else: self.f = lambda x: x # toDict
[docs] def __ror__(self, it:Tuple[Iterator[T], Iterator[T]]) -> dict: # toDict
r = self.rows; f = self.f # toDict
if r: # toDict
if isinstance(r, str): return it | cli.apply(cli.aS(lambda x: x.split(" ")) | cli.head(1).split() | cli.item() + cli.join(" ")) | toDict() # toDict
return f({_k:_v for _k, _v in it}) # toDict
return f({_k:_v for _k, _v in zip(*it)}) # toDict
def _jsF(self, meta): # toDict
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toDict
if not self.rows: raise Exception("toDict._jsF() doesn't support .rows=False yet") # toDict
return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toDict()", fIdx # toDict
def _toop(toOp, c, force, defaultValue): # _toop
return cli.apply(toOp, c) | (cli.apply(lambda x: x or defaultValue, c) if force else cli.filt(cli.op() != None, c)) # _toop
def _toFloat(e) -> Union[float, None]: # _toFloat
try: return float(e) # _toFloat
except: return None # _toFloat
[docs]class toFloat(BaseCli): # toFloat
[docs] def __init__(self, *columns, mode=2): # toFloat
"""Converts every row into a float. Example::
# returns [1, 3, -2.3]
["1", "3", "-2.3"] | toFloat() | deref()
# returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']]
[["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | deref()
With weird rows::
# returns [[1.0, 'a'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0) | deref()
# returns [[1.0, 'a'], [0.0, 'b'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0, force=True) | deref()
This also works well with :class:`torch.Tensor` and :class:`numpy.ndarray`,
as they will not be broken up into an iterator::
# returns a numpy array, instead of an iterator
np.array(range(10)) | toFloat()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param mode: different conversion styles
- 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed
- 1: if there are errors, then replace it with zero
- 2: if there are errors, then eliminate the row""" # toFloat
self.columns = columns; self.mode = mode # toFloat
[docs] def __ror__(self, it): # toFloat
columns = self.columns; mode = self.mode # toFloat
if len(columns) == 0: # toFloat
if isinstance(it, np.ndarray): return it.astype(float) # toFloat
if isinstance(it, torch.Tensor): return it.float() # toFloat
if mode == 0: return (float(e) for e in it) # toFloat
return it | _toop(_toFloat, None, mode == 1, 0.0) # toFloat
else: return it | cli.init.serial(*(_toop(_toFloat, c, mode == 1, 0.0) for c in columns)) # toFloat
def _jsF(self, meta): # toFloat
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); cols = self.columns # toFloat
if len(cols) == 0: # toFloat
if mode == 0: return f"const {fIdx} = ({dataIdx}) => {dataIdx}.map((v) => parseFloat(v))", fIdx # toFloat
if mode == 1: return f"const {fIdx} = ({dataIdx}) => {dataIdx}.map((v) => {{ const a = parseFloat(v); return a === a ? a : 0 }})", fIdx # toFloat
if mode == 2: return f"const {fIdx} = ({dataIdx}) => {{ const ans = []; for (const v of {dataIdx}) {{ const a = parseFloat(v); if (a === a) ans.push(a); }}; return ans; }}" # toFloat
else: return f"""\
const {fIdx} = ({dataIdx}) => {{
const ans = [];
for (const row of {dataIdx}) {{
{'ans.push(row.map(parseFloat));' if mode == 0 else ''}
{'ans.push(row.map(parseFloat).map((v) => (v === v ? v : 0)));' if mode == 1 else ''}
{'const rowp = row.map(parseFloat);if (rowp.map((v) => v === v).every((v) => v)) ans.push(rowp);' if mode == 2 else ''}
}}
return ans;
}}""", fIdx # toFloat
def _toInt(e) -> Union[int, None]: # _toInt
try: return int(float(e)) # _toInt
except: return None # _toInt
[docs]class toInt(BaseCli): # toInt
[docs] def __init__(self, *columns, mode=2): # toInt
"""Converts every row into an integer. Example::
# returns [1, 3, -2]
["1", "3", "-2.3"] | toInt() | deref()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param mode: different conversion styles
- 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed
- 1: if there are errors, then replace it with zero
- 2: if there are errors, then eliminate the row
See also: :meth:`toFloat`""" # toInt
self.columns = columns; self.mode = mode; # toInt
[docs] def __ror__(self, it): # toInt
columns = self.columns; mode = self.mode # toInt
if len(columns) == 0: # toInt
if isinstance(it, np.ndarray): return it.astype(int) # toInt
if isinstance(it, torch.Tensor): return it.int() # toInt
if mode == 0: return (int(e) for e in it) # toInt
return it | _toop(_toInt, None, mode == 1, 0) # toInt
else: return it | cli.init.serial(*(_toop(_toInt, c, mode == 1, 0.0) for c in columns)) # toInt
def _jsF(self, meta): # toInt
fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); cols = self.columns # toInt
if len(cols) == 0: # toInt
if mode == 0: return f"const {fIdx} = ({dataIdx}) => {dataIdx}.map((v) => parseInt(v))", fIdx # toInt
if mode == 1: return f"const {fIdx} = ({dataIdx}) => {dataIdx}.map((v) => {{ const a = parseInt(v); return a === a ? a : 0 }})", fIdx # toInt
if mode == 2: return f"const {fIdx} = ({dataIdx}) => {{ const ans = []; for (const v of {dataIdx}) {{ const a = parseInt(v); if (a === a) ans.push(a); }}; return ans; }}" # toInt
else: return f"""\
const {fIdx} = ({dataIdx}) => {{
const ans = [];
for (const row of {dataIdx}) {{
{'ans.push(row.map(parseInt));' if mode == 0 else ''}
{'ans.push(row.map(parseInt).map((v) => (v === v ? v : 0)));' if mode == 1 else ''}
{'const rowp = row.map(parseInt);if (rowp.map((v) => v === v).every((v) => v)) ans.push(rowp);' if mode == 2 else ''}
}}
return ans;
}}""", fIdx # toInt
[docs]class toBytes(BaseCli): # toBytes
[docs] def __init__(self, dataType=None): # toBytes
"""Converts several object types to bytes.
Example::
# converts string to bytes
"abc" | toBytes()
# converts image to bytes in jpg format
torch.randn(200, 100) | toImg() | toBytes()
# converts image to bytes in png format
torch.randn(200, 100) | toImg() | toBytes("PNG")
"some_file.mp3" | toAudio() | toBytes("mp3")
.. admonition:: Custom datatype
It is possible to build objects that can interoperate with this cli,
like this::
class custom1:
def __init__(self, config=None): ...
def _toBytes(self): return b"abc"
class custom2:
def __init__(self, config=None): ...
def _toBytes(self, dataType):
if dataType == "png": return b"123"
else: return b"456"
custom1() | toBytes() # returns b"abc"
custom2() | toBytes() # returns b"456"
custom2() | toBytes("png") # returns b"123"
When called upon, :class:`toBytes` will detect that the input has the ``_toBytes``
method, which will prompt it to execute that method of the complex object. Of
course, this means that you can return anything, not necessarily bytes, but to
maintain intuitiveness, you should return either bytes or iterator of bytes
:param dataType: depending on input. If it's an image then this can be png, jpg. If
it's a sound then this can be mp3, wav or things like that""" # toBytes
self.dataType = dataType # toBytes
[docs] def __ror__(self, it): # toBytes
if isinstance(it, str): return it.encode() # toBytes
if hasPIL: # toBytes
if isinstance(it, PIL.Image.Image): # toBytes
it = it | toRgb(); buffered = io.BytesIO() # toBytes
it.save(buffered, format=(self.dataType or "JPEG")); return buffered.getvalue() # toBytes
if hasattr(it, "_toBytes"): # toBytes
n = len(inspect.getfullargspec(it._toBytes).args[1:]) # toBytes
if n == 0: return it._toBytes() # toBytes
elif n == 1: return it._toBytes(self.dataType) # toBytes
else: raise Exception(f"{it.__class__.__name__} have 2 or more arguments, which is unsupported") # toBytes
import dill; return dill.dumps(it) # toBytes
mpld3 = k1lib.dep("mpld3") # toBytes
class Svg(str): pass # Svg
class DataUri: # DataUri
def __init__(self, uri:str): # DataUri
self.uri = uri # "data:image/png;base64, ..." # DataUri
self.mime = uri.split(";")[0].split(":")[-1] # "image/png" # DataUri
self.mimeBase = self.mime.split("/")[0] # "image" # DataUri
def _repr_html_(self): # DataUri
if self.mimeBase == "image": return f"<img src=\"{self.uri}\"/>" # DataUri
if self.mime == "text/html": return base64.b64decode(self.uri.split("base64,")[-1]).decode() # DataUri
def __repr__(self): # DataUri
uri = self.uri # DataUri
return f"<DataUri mime='{self.mime}', self.uri='{(uri[:75] + '...') if len(uri) > 75 else uri}'>" # DataUri
def _dataUriHtml(it): return DataUri(f"data:text/html;base64, {base64.b64encode(it.encode()).decode()}") # _dataUriHtml
[docs]class toDataUri(BaseCli): # toDataUri
[docs] def __init__(self): # toDataUri
"""Converts incoming object into data uri scheme.
Data uris are the things that look like "data:image/png;base64, ...",
or "data:text/html;base64, ...". This is a convenience tool mainly
for other tools, and not quite useful directly. Example::
randomImg = cat("https://mlexps.com/ergun.png", False) | toImg() # returns PIL image
randomImg | toDataUri() # returns k1lib.cli.conv.DataUri object with .mime field "image/png" and .uri field "data:image/png;base64, ..."
randomImg | toDataUri() | toHtml() # returns hmtl string `<img src="data:image/png;base64, ..."/>`
randomImg | toHtml() # same like above. toHtml() actually calls toDataUri() behind the scenes
randomImg | toDataUri() | toAnchor() # creates anchor tag (aka link elements "<a></a>") that, when clicked, displays the image in a new tab
randomImg | toAnchor() # same as above. toAnchor() actually calls toDataUri() behind the scenes
""" # toDataUri
self.throw = False # can be configured by outside clis, like toHtml() # toDataUri
[docs] def __ror__(self, it): # toDataUri
if isinstance(it, str): return _dataUriHtml(it) # toDataUri
if isinstance(it, DataUri): return it # toDataUri
if hasPIL and isinstance(it, PIL.Image.Image): # toDataUri
it = it | toBytes(dataType="PNG") | cli.aS(base64.b64encode) | cli.op().decode() # toDataUri
return DataUri(f"data:image/png;base64, {it}") # toDataUri
try: return DataUri(it._toDataUri()) # toDataUri
except Exception as e: # toDataUri
if self.throw: raise Exception(f"toDataUri() called on an unfamiliar object, and the object doesn't implement _toDataUri(). Error: {e}") # toDataUri
return _dataUriHtml(it | toHtml()) # toDataUri
[docs]class toAnchor(BaseCli): # toAnchor
[docs] def __init__(self, text:str="click here"): # toAnchor
"""Converts incoming object into a html anchor tag that, when clicked,
displays the incoming object's html in another tab. Example::
randomImg = cat("https://mlexps.com/ergun.png", False) | toImg() # returns PIL image
randomImg | toAnchor() # returns html string `<a href="data:image/png;base64, ..."></a>`
On some browsers, there's sort of a weird bug where a new tab would open, but
there's nothing displayed on that tab. If you see this is happening, just press
F5 or Ctrl+R to refresh the page and it should display everything nicely
:param text: text to display inside of the anchor""" # toAnchor
self.text = text # toAnchor
[docs] def __ror__(self, it:str): # toAnchor
s = it | toDataUri() | cli.op().uri # toAnchor
return f"<a href=\"{s}\" target=\"_blank\">{self.text}</a>" # toAnchor
[docs]class toHtml(BaseCli): # toHtml
[docs] def __init__(self): # toHtml
"""Converts several object types to bytes.
Example::
# converts PIL image to html <img> tag
torch.randn(200, 100) | toImg() | toHtml()
# converts graphviz graph to svg text (which is essentially html)
g = k1.digraph(); g(*"abc"); g(*"bcd"); g | toHtml()
# converts plotly graphs to html
import plotly.express as px; import pandas as pd
df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [10, 11, 12, 14, 15]})
fig = px.line(df, x='x', y='y', title='Simple Line Chart')
fig | toHtml()
# converts matplotlib plot to image, and then to html. Do this if you want a static plot
x = np.linspace(-2, 2); y = x**2
plt.plot(x, x**2); plt.gcf() | toImg() | toHtml()
# converts matplotlib plot to D3.js html sketch
plt.plot(x, x**2); plt.gcf() | toHtml()
""" # toHtml
pass # toHtml
[docs] def __ror__(self, it): # toHtml
if isinstance(it, str): return it # toHtml
if hasPlotly and isinstance(it, plotly.graph_objs._figure.Figure): # toHtml
out = io.StringIO(); it.write_html(out); out.seek(0); return out.read() # toHtml
if isinstance(it, mpl.figure.Figure): res = mpld3.fig_to_html(it); plt.close(it); return res # toHtml
if hasGraphviz and isinstance(it, graphviz.Digraph): # toHtml
import tempfile; a = tempfile.NamedTemporaryFile() # toHtml
it.render(a.name, format="svg"); # toHtml
fn = f"{a.name}.svg"; im = cli.cat(fn) | cli.join("") # toHtml
try: os.remove(fn) # toHtml
except: pass # toHtml
return Svg(im) # toHtml
try: # toHtml
res = it._repr_html_() # toHtml
if res: return res # toHtml
except: pass # toHtml
try: # toHtml
res = it._toHtml() # toHtml
if res: return res # toHtml
except: pass # toHtml
try: # toHtml
f = toDataUri(); f.throw = True # toHtml
res = (it | f)._repr_html_() # toHtml
if res: return res # toHtml
except: pass # toHtml
return it.__repr__() # toHtml
try: # toHtml
from rdkit import Chem # toHtml
from rdkit.Chem import Draw # toHtml
from rdkit.Chem import AllChem # toHtml
from rdkit.Chem.Draw import IPythonConsole # toHtml
IPythonConsole.drawOptions.addAtomIndices = True # toHtml
__all__ = [*__all__, "toMol", "toSmiles"] # toHtml
def toMol(): # toHtml
"""Smiles to molecule.
Example::
"c1ccc(C)cc1" | toMol()""" # toHtml
return cli.aS(Chem.MolFromSmiles) # toHtml
def toSmiles(): # toHtml
"""Molecule to smiles.
Example::
"c1ccc(C)cc1" | toMol() | toSmiles()""" # toHtml
return cli.aS(Chem.MolToSmiles) # toHtml
except: pass # toHtml
import unicodedata, hashlib # toHtml
[docs]def toAscii(): # toAscii
"""Converts complex unicode text to its base ascii form.
Example::
"hà nội" | toAscii() # returns "ha noi"
Taken from https://stackoverflow.com/questions/2365411/convert-unicode-to-ascii-without-errors-in-python""" # toAscii
return cli.aS(lambda word: unicodedata.normalize('NFKD', word).encode('ascii', 'ignore')) # toAscii
[docs]def toHash() -> str: # toHash
"""Converts some string into some hash string.
Example::
"abc" | toHash() # returns 'gASVJAAAAAAAAABDILp4Fr+PAc/qQUFA3l2uIiOwA2Gjlhd6nLQQ/2HyABWtlC4='
Why not just use the builtin function ``hash("abc")``? Because it generates different
hashes for different interpreter sessions, and that breaks many of my applications that
need the hash value to stay constant forever.""" # toHash
def hashF(msg:str) -> str: m = hashlib.sha256(); m.update(f"{msg}".encode()); return k1lib.encode(m.digest()) # toHash
return cli.aS(hashF) # toHash
import csv; pd = k1lib.dep("pandas") # toHash
[docs]class toCsv(BaseCli): # toCsv
[docs] def __init__(self, allSheets=False): # toCsv
"""Converts a csv file name into a table.
Example::
"abc.csv" | toCsv() # returns table of values
"def.xlsx" | toCsv() # returns table of values in the first sheet
"def.xlsx" | toCsv(True) # returns List[Sheet name (str), table of values]
.. warning::
Note that this is pretty slow compared to just splitting by semicolons. If your
dataset doesn't have anything complicated like semicolons in quotes, then just
do ``op().split(",").all()``
If your dataset does have complicated quotes, then I'd suggest reading the csv
using this cli, then convert it to a tsv file (tab-separated value). Then you can
always just split the string using tab characters
:param allSheets: if input is an Excel sheet, whether to read in all sheets or
just the first sheet. No effect if input is a normal csv file""" # toCsv
self.allSheets = allSheets # toCsv
[docs] def __ror__(self, fn:str): # toCsv
fn = os.path.expanduser(fn) # toCsv
if fn.endswith(".xls") or fn.endswith(".xlsx"): # toCsv
if self.allSheets: return [[k, v.values] for k,v in pd.read_excel(fn, sheet_name=None).items()] # toCsv
else: return pd.read_excel(fn).values # toCsv
def gen(): # toCsv
with open(fn) as f: yield from csv.reader(f) # toCsv
return gen() # toCsv
import validators, shutil, html, io, os; pydub = k1lib.dep("pydub") # toCsv
class Audio: # Audio
def __init__(self, raw:"pydub.audio_segment.AudioSegment"): self.raw = raw # Audio
def resample(self, rate) -> "self": # Audio
"""Resamples the audio""" # Audio
if rate: # Audio
self.raw = self.raw.set_frame_rate(rate) # Audio
self.data = np.array(self.raw.get_array_of_samples())/2.15e9 # Audio
self.rate = self.raw.frame_rate # Audio
return self # Audio
def _toBytes(self, dataType) -> bytes: f = io.BytesIO(); self.raw.export(f, format=(dataType or "wav")); return f.read() # Audio
def __repr__(self): return f"<Audio duration={k1lib.fmt.time(self.raw.duration_seconds)} rate={self.raw.frame_rate}>" # Audio
def __len__(self): return int(self.raw.frame_count()) # Audio
def __getitem__(self, slice_): # Audio
if not isinstance(slice_, slice): return None # Audio
data = np.array(self.raw.get_array_of_samples()) | cli.batched(self.raw.channels) | cli.op()[slice_] # Audio
return Audio(pydub.AudioSegment(data.tobytes(), frame_rate=self.raw.frame_rate, sample_width=self.raw.sample_width, channels=self.raw.channels)) # Audio
def _repr_html_(self): # plays a short sample, first 10s or sth like that # Audio
return f"{html.escape(self.__repr__())}<br>{self.raw[:10000]._repr_html_()}" # Audio
[docs]class toAudio(BaseCli): # toAudio
[docs] def __init__(self, rate=None): # toAudio
"""Reads audio from either a file or a URL or from bytes directly.
Example::
au = "some_file.wav" | toAudio() # can display in a notebook, which will preview the first 10 second
au | toBytes() # exports audio as .wav file
au | toBytes("mp3") # exports audio as .mp3 file
au.resample(16000) # resamples audio to new rate
au | head(0.1) # returns new Audio that has the first 10% of the audio only
au | splitW(8, 2) # splits Audio into 2 Audios, first one covering 80% and second one covering 20% of the track
au.raw # internal pydub.AudioSegment object. If displayed in a notebook, will play the whole thing
You can also use this on any Youtube video or random mp3 links online and on raw bytes::
"https://www.youtube.com/watch?v=FtutLA63Cp8" | toAudio() # grab Bad Apple song from internet
cat("some_file.wav", False) | toAudio() # grab from raw bytes of mp3 or wav, etc.
""" # toAudio
self.rate = rate # toAudio
[docs] def __ror__(self, it:"str|byte") -> Audio: # toAudio
if isinstance(it, str): # toAudio
if os.path.exists(os.path.expanduser(it)): fn = os.path.expanduser(it); tmp = False # toAudio
elif validators.url(it): # toAudio
if not shutil.which("yt-dlp"): raise Exception(f"'{it}' looks like a link, but the required 'yt-dlp' binary is not found. Please install it by doing `pip install yt-dlp`") # toAudio
fn = None | cli.cmd(f"yt-dlp -o - -x {it}", mode=0, text=False) | cli.item() | cli.file(); tmp = True # toAudio
else: raise Exception(f"The file '{it}' does not exist, and it doesn't look like a URL") # toAudio
elif isinstance(it, bytes): fn = it | cli.file(); tmp = True # toAudio
else: raise Exception(f"Unknown {type(it)} audio type") # toAudio
res = Audio(pydub.AudioSegment.from_file(fn)).resample(self.rate) # toAudio
if tmp: os.remove(fn) # toAudio
return res # toAudio
dateutil = k1lib.dep("dateutil") # toAudio
[docs]class toUnix(BaseCli): # toUnix
[docs] def __init__(self, tz:"str | dateutil.tz.tz.tzfile"=None): # toUnix
"""Tries anything piped in into a unix timestamp. If can't convert
then return None. Example::
Local time zone independent::
"2023" | toUnix() # returns 2023, or 2023 seconds after unix epoch. Might be undesirable, but has to support raw ints/floats
"2023-11-01T00Z" | toUnix() # midnight Nov 1st 2023 GMT
"2023-11-01T00:00:00-04:00" | toUnix() # midnight Nov 1st 2023 EST
"2023-11-01" | toUnix("US/Pacific") # midnight Nov 1st 2023 PST
"2023-11-01" | toUnix("UTC") # midnight Nov 1st 2023 UTC
Local time zone dependent (assumes EST)::
"2023-11" | toUnix() # if today's Nov 2nd EST, then this would be 1698897600, or midnight Nov 2nd 2023 EST
"2023-11-04" | toUnix() # midnight Nov 4th 2023 EST
Feel free to experiment more, but in general, this is pretty versatile in what it can
convert. With more effort, I'd probably make this so that every example given will not
depend on local time, but since I just use this to calculate time differences, I don't
really care.
:param tz: Timezone, like "US/Eastern", "US/Pacific". If not specified, then assumes local timezone""" # toUnix
if tz: self.tz = tz if isinstance(tz, dateutil.tz.tz.tzfile) else dateutil.tz.gettz(tz) # toUnix
else: self.tz = None # toUnix
[docs] def __ror__(self, t): # toUnix
try: return float(t) # toUnix
except: # toUnix
try: # toUnix
a = dateutil.parser.parse(t) # toUnix
if self.tz: a = a.replace(tzinfo=self.tz) # toUnix
return a.timestamp() # toUnix
except: return None # toUnix
from datetime import datetime as dt # toUnix
[docs]class toIso(BaseCli): # toIso
[docs] def __init__(self): # toIso
"""Converts unix timestamp into ISO 8601 string format.
Example::
1701382420 | toIso() # returns '2023-11-30T17:13:40', which is correct in EST time
1701382420 | toIso() | toUnix() # returns 1701382420, the input timestamp, showing it's correct
1701382420.123456789 | toIso() # returns '2023-11-30T17:13:40.123457'
As you might have noticed, this cli depends on the timezone of the host computer
""" # toIso
pass # toIso
[docs] def __ror__(self, it): # toIso
return dt.fromtimestamp(it).isoformat() # toIso
[docs]class toYMD(BaseCli): # toYMD
[docs] def __init__(self, idx=None, mode=int): # toYMD
"""Converts unix timestamp into tuple (year, month, day, hour, minute, second).
Example::
1701382420 | toYMD() # returns [2023, 11, 30, 17, 13, 40] in EST timezone
1701382420 | toYMD(0) # returns 2023
1701382420 | toYMD(1) # returns 11
1701382395 | toYMD(mode=str) # returns ['2023', '11', '30', '17', '13', '15']
:param idx: if specified, take the desired element only. If 0, then take year, 1, then month, etc.
:param mode: either int or str. If str, then returns nicely adjusted numbers""" # toYMD
self.idx = idx; self.mode = mode # toYMD
[docs] def __ror__(self, it): # toYMD
d = dt.fromtimestamp(it) # toYMD
if self.mode == int: res = [d.year, d.month, d.day, d.hour, d.minute, d.second] # toYMD
else: res = [f"{d.year}", f"{d.month}".rjust(2,"0"), f"{d.day}".rjust(2,"0"), # toYMD
f"{d.hour}".rjust(2,"0"), f"{d.minute}".rjust(2,"0"), f"{d.second}".rjust(2,"0")] # toYMD
return res if self.idx is None else res[self.idx] # toYMD
settings.add("toLinks", k1lib.Settings()\
.add("splitChars", ["<br>", "<div ", *"\n\t<> ,;"], "characters/strings to split the lines by, so that each link has the opportunity to be on a separate line, so that the first instance in a line don't overshadow everything after it")\
.add("protocols", ["http", "https", "ftp"], "list of recognized protocols to search for links, like 'http' and so on"), "conv.toLinks() settings"); # toYMD
[docs]class toLinks(BaseCli): # toLinks
[docs] def __init__(self, f=None): # toLinks
"""Extracts links and urls from a paragraph.
Example::
paragraph = [
"http://a.c",
"http://a2.c some other text in between <a href='http://b.d'>some link</a> fdvb"
]
# returns {'http://a.c', 'http://a2.c', 'http://b.d'}
paragraph | toLinks() | deref()
If the input is a string instead of an iterator of strings, then
it will :meth:`~k1lib.cli.inp.cat` it first, then look for links
inside the result. For example::
"https://en.wikipedia.org/wiki/Cheese" | toLinks()
At the time of writing, that returns a lot of links::
{'/wiki/Rind-washed_cheese',
'#cite_ref-online_5-7',
'https://web.archive.org/web/20160609031000/http://www.theguardian.com/lifeandstyle/wordofmouth/2012/jun/27/how-eat-cheese-and-biscuits',
'https://is.wikipedia.org/wiki/Ostur',
'/wiki/Meat_and_milk',
'/wiki/Wayback_Machine',
'/wiki/File:WikiCheese_-_Saint-Julien_aux_noix_01.jpg',
'https://gv.wikipedia.org/wiki/Caashey',
'/wiki/Eyes_(cheese)',
'/wiki/Template_talk:Condiments',
'#Pasteurization',
'/wiki/Tuscan_dialect',
'#cite_note-23',
'#cite_note-aha2017-48',
So, keep in mind that lots of different things can be considered a
link. That includes absolute links ('https://gv.wikipedia.org/wiki/Caashey'),
relative links within that particular site ('/wiki/Tuscan_dialect'), and
relative links within the page ('#Pasteurization').
How it works underneath is that it's looking for a string like "https://..."
and a string like "href='...'", which usually have a link inside. For the
first detection style, you can specify extra protocols that you want to
search for using ``settings.cli.toLinks.protocols = [...]``.
Also, this will detect links nested within each other multiple times.
For example, the link 'https://web.archive.org/web/20160609031000/http://www.theguardian.com/lifeandstyle/wordofmouth/2012/jun/27/how-eat-cheese-and-biscuits'
will appear twice in the result, once as itself, but also 'https://www.theguardian.com/lifeandstyle/wordofmouth/2012/jun/27/how-eat-cheese-and-biscuits'
Note that if you really try, you will be able to find an example where this won't
work, so don't expect 100% reliability. But for ost use cases, this should perform
splendidly.""" # toLinks
self.f = f or cli.iden() # toLinks
chars = " \t,;" # random characters to split, so that the first instance in a line doesn't overshadow the ones after # toLinks
self.preprocess = cli.serial(*[(cli.op().split(ch).all() | cli.joinSt()) for ch in settings.toLinks.splitChars]) # toLinks
protocols = "|".join([f"({p})" for p in settings.toLinks.protocols]) # toLinks
self.g = cli.grep(f"(?P<g>({protocols})" + "://[^\(\)\[\]\<\>\{\}\'\" ]*)", extract="g") # toLinks
self.href = cli.grep('href="(?P<g>.+)"', extract="g") & cli.grep("href='(?P<g>.+)'", extract="g") | cli.joinSt() # toLinks
self.post = cli.joinSt() | cli.aS(set) # toLinks
[docs] def __ror__(self, it): # toLinks
if hasattr(it, "_toLinks"): return it._toLinks(self.f) if len(inspect.getfullargspec(it._toLinks).args) == 2 else it._toLinks() # toLinks
host = "" # toLinks
if isinstance(it, str): host = it; it = cli.cat(it) # reads the website first # toLinks
it = it | self.preprocess | cli.aS(list) # toLinks
return it | self.href & self.g | self.post | self.f | cli.aS(set) # toLinks
[docs]class toMovingAvg(BaseCli): # toMovingAvg
[docs] def __init__(self, col:int=None, alpha=0.9, debias=True, v:float=0, dt:float=1): # toMovingAvg
"""Smoothes out sequential data using momentum.
Example::
# returns [4.8, 4.62, 4.458]. 4.8 because 0.9*5 + 0.1*3 = 4.8, and so on
[3, 3, 3] | toMovingAvg(v=5, debias=False) | deref()
Sometimes you want to ignore the initial value, then you can turn on debias mode::
x = np.linspace(0, 10, 100); y = np.cos(x)
plt.plot(x, y)
plt.plot(x, y | toMovingAvg(debias=False) | deref())
plt.plot(x, y | toMovingAvg(debias=False, alpha=0.95) | deref())
plt.plot(x, y | toMovingAvg(debias=True) | deref())
plt.plot(x, y | toMovingAvg(debias=True, alpha=0.95) | deref())
plt.legend(["Signal", "Normal - 0.9 alpha", "Normal - 0.95 alpha", "Debiased - 0.9 alpha", "Debiased - 0.95 alpha"], framealpha=0.3)
plt.grid(True)
.. image:: ../images/movingAvg.png
As you can see, normal mode still has the influence of the initial value at
0 and can't rise up fast, whereas the debias mode will ignore the initial
value and immediately snaps to the first value.
Also, the 2 graphs with 0.9 alpha snap together quicker than the 2 graphs
with 0.95 alpha. Here's the effect of several alpha values:
.. image:: ../images/movingAvg-alphas.png
:param col: column to apply moving average to
:param alpha: momentum term
:param debias: whether to turn on debias mode or not
:param v: initial value, doesn't matter in debias mode
:param dt: pretty much never used, hard to describe, belongs to debias mode, checkout source code for details""" # toMovingAvg
self.col = col; self.initV = v; self.alpha = alpha; self.debias = debias; self.dt = dt # toMovingAvg
if debias and v != 0: raise Exception("Debias mode activated! This means that the initial value doesn't matter, yet you've specified one") # toMovingAvg
if alpha > 1 or alpha < 0: raise Exception("Alpha is outside the [0, 1] range. which does not make sense") # toMovingAvg
[docs] def __ror__(self, it): # toMovingAvg
m = value = self.initV; alpha = self.alpha; col = self.col # toMovingAvg
if self.debias: # toMovingAvg
dt = self.dt; t = 1; tooSmall = False # toMovingAvg
if col is None: # toMovingAvg
for v in it: # toMovingAvg
m = m * alpha + v * (1 - alpha) # toMovingAvg
if tooSmall: yield m # skips complex exponential calculation once it's small enough to speed things up # toMovingAvg
else: # toMovingAvg
exp = alpha**t; value = m / (1 - exp) # toMovingAvg
tooSmall = 10*exp < (1-alpha); t += dt; yield value # toMovingAvg
else: # toMovingAvg
for row in it: # toMovingAvg
m = m * alpha + row[col] * (1 - alpha) # toMovingAvg
if tooSmall: yield [*row[:col], m, *row[col+1:]] # toMovingAvg
else: # toMovingAvg
exp = alpha**t; value = m / (1 - exp) # toMovingAvg
tooSmall = 10**exp < (1-alpha); t += dt; yield [*row[:col], value, *row[col+1:]] # toMovingAvg
else: # toMovingAvg
if col is None: # toMovingAvg
for v in it: m = m * alpha + v * (1 - alpha); yield m # toMovingAvg
else: # toMovingAvg
for row in it: # toMovingAvg
m = m * alpha + row[col] * (1 - alpha) # toMovingAvg
yield [*row[:col], m, *row[col+1:]] # toMovingAvg
cm = k1lib.dep("matplotlib.cm") # toMovingAvg
[docs]class toCm(BaseCli): # toCm
[docs] def __init__(self, col:int, cmap=None, title:str=None): # toCm
"""Converts the specified column to a bunch of color
values, and adds a colorbar automatically. "cm" = "color map". Example::
import matplotlib.cm as cm
exps = [1, 2, 3, 4, 5]
x = np.linspace(-2, 2)
data = exps | apply(lambda exp: [exp, x, x**exp]) | deref()
# without toCm(), plots fine, demonstrates underlying mechanism, but doesn't allow plotting a separate colorbar
data | normalize(0, mode=1) | apply(cm.viridis, 0) | ~apply(lambda c,x,y: plt.plot(x, y, color=c)) | ignore()
# with toCm(), draws a colorbar automatically
data | toCm(0, cm.viridis, "Exponential") | ~apply(lambda c,x,y: plt.plot(x, y, color=c)) | ignore()
.. image:: ../images/toCm.png
Functionality is kind of niche, but I need this over and over
again, so have to make it
:param col: column to convert float/int to color (tuple of 4 floats)
:param cmap: colormap to use. If not specified, defaults to ``cm.viridis``
:param title: title of the colorbar, optional""" # toCm
self.col = col; self.cmap = cmap or cm.viridis; self.title = title # toCm
[docs] def __ror__(self, it): # toCm
col = self.col; cmap = self.cmap; title = self.title # toCm
if col is None: # toCm
if not isinstance(it, k1lib.settings.cli.arrayTypes): it = list(it) # toCm
plt.colorbar(cm.ScalarMappable(norm=plt.Normalize(*it | cli.toMin() & cli.toMax()), cmap=cmap), ax=plt.gca(), label=title) # toCm
return it | cli.normalize(None, 1) | cli.apply(cmap) # toCm
else: # toCm
it = it | cli.deref(2) # toCm
plt.colorbar(cm.ScalarMappable(norm=plt.Normalize(*it | cli.cut(col) | cli.toMin() & cli.toMax()), cmap=cmap), ax=plt.gca(), label=title) # toCm
return it | cli.normalize(col, 1) | cli.apply(cmap, col) # toCm