Source code for k1lib.cli.conv

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for all short utilities that converts from 1 data type to another. They
might feel they have different styles, as :class:`toFloat` converts object iterator to
float iterator, while :class:`toPIL` converts single image url to single PIL image,
whereas :class:`toSum` converts float iterator into a single float value.

The general convention is, if the intended operation sounds simple (convert to floats,
strings, types, ...), then most likely it will convert iterator to iterator, as you
can always use the function directly if you only want to apply it on 1 object.

If it sounds complicated (convert to PIL image, tensor, ...) then most likely it will
convert object to object. Lastly, there are some that just feels right to input
an iterator and output a single object (like getting max, min, std, mean values)."""
__all__ = ["toTensor", "toRange", "toList",
           "toSum", "toProd", "toAvg", "toMean", "toStd", "toMax", "toMin", "toArgmin", "toArgmax",
           "toPIL", "toImg", "toRgb", "toRgba", "toGray", "toDict",
           "toFloat", "toInt", "toBytes", "toDataUri", "toAnchor", "toHtml",
           "toAscii", "toHash", "toCsv", "toAudio", "toUnix", "toIso", "toYMD", "toLinks",
           "toMovingAvg", "toCm"]
import re, k1lib, math, os, numpy as np, io, json, base64, unicodedata, inspect
from k1lib.cli.init import BaseCli, T, yieldT; import k1lib.cli as cli, k1lib.cli.init as init
from k1lib.cli.typehint import *; mpl = k1lib.dep("matplotlib"); plt = k1lib.dep("matplotlib.pyplot")
from collections import deque, defaultdict; from typing import Iterator, Any, List, Set, Tuple, Dict, Callable, Union
settings = k1lib.settings.cli
try: import PIL; hasPIL = True
except: hasPIL = False
try: import torch; hasTorch = True
except: torch = k1lib.dep("torch"); hasTorch = False
try: import rdkit; hasRdkit = True
except: hasRdkit = False
try: import graphviz; hasGraphviz = True
except: hasGraphviz = False
try: import plotly; import plotly.express as px; hasPlotly = True
except: hasPlotly = False
[docs]class toTensor(BaseCli): # toTensor
[docs] def __init__(self, dtype=None): # toTensor """Converts generator to :class:`torch.Tensor`. Essentially ``torch.tensor(list(it))``. Default dtype is float32 Also checks if input is a PIL Image. If yes, turn it into a :class:`torch.Tensor` and return.""" # toTensor self.dtype = dtype or torch.float32 # toTensor
[docs] def __ror__(self, it:Iterator[float]) -> "torch.Tensor": # toTensor try: # toTensor import PIL; pic=it # toTensor if isinstance(pic, PIL.Image.Image): # stolen from torchvision ToTensor transform # toTensor mode_to_nptype = {'I': np.int32, 'I;16': np.int16, 'F': np.float32} # toTensor img = torch.from_numpy(np.array(pic, mode_to_nptype.get(pic.mode, np.uint8), copy=True)) # toTensor if pic.mode == '1': img = 255 * img # toTensor img = img.view(pic.size[1], pic.size[0], len(pic.getbands())) # toTensor return img.permute((2, 0, 1)).contiguous().to(self.dtype) # put it from HWC to CHW format # toTensor except: pass # toTensor if isinstance(it, np.ndarray): return torch.tensor(it).to(self.dtype) # toTensor return torch.tensor(list(it)).to(self.dtype) # toTensor
[docs]class toList(BaseCli): # this still exists cause some LLVM optimizations are done on this, and too tired to change that at the moment # toList
[docs] def __init__(self): # toList """Converts generator to list. Example:: # returns [0, 1, 2, 3, 4] range(5) | toList() # returns [0, 1, 2, 3, 4] range(5) | aS(list) So this cli is sort of outdated. It still works fine, nothing wrong with it, but just do ``aS(list)`` instead. It's not removed to avoid breaking old projects.""" # toList super().__init__() # toList
def _typehint(self, inp): # toList if isinstance(inp, tListIterSet): return tList(inp.child) # toList if isinstance(inp, tCollection): return inp # toList return tList(tAny()) # toList
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: return list(it) # toList
def _jsF(self, meta): # toList fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toList return f"const {fIdx} = ({dataIdx}) => {dataIdx}", fIdx # toList
def _toRange(it): # _toRange for i, _ in enumerate(it): yield i # _toRange
[docs]class toRange(BaseCli): # toRange
[docs] def __init__(self): # toRange """Returns iter(range(len(it))), effectively. Example:: # returns [0, 1, 2] [3, 2, 5] | toRange() | deref()""" # toRange super().__init__() # toRange
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[int]: # toRange try: return range(len(it)) # toRange except: return _toRange(it) # toRange
def _jsF(self, meta): # toRange fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toRange return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toRange()", fIdx # toRange
tOpt.addPass(lambda cs, ts, _: [cs[0]], [toRange, toRange]) # toRange settings.add("arrayTypes", (torch.Tensor, np.ndarray) if hasTorch else (np.ndarray,), "default array types used to accelerate clis") # toRange def genericTypeHint(inp): # genericTypeHint if isinstance(inp, tListIterSet): return inp.child # genericTypeHint if isinstance(inp, tCollection): return inp.children[0] # genericTypeHint if isinstance(inp, tArrayTypes): return inp.child # genericTypeHint return tAny() # genericTypeHint
[docs]class toSum(BaseCli): # toSum
[docs] def __init__(self): # toSum """Calculates the sum of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: # returns 45 range(10) | toSum()""" # toSum super().__init__() # toSum
def _all_array_opt(self, it, level): # toSum bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None) # toSum return NotImplemented if bm is None else bm.sum(it, tuple(range(level, len(it.shape)))) # toSum def _typehint(self, inp): return genericTypeHint(inp) # toSum
[docs] def __ror__(self, it:Iterator[float]): # toSum if isinstance(it, settings.arrayTypes): return it.sum() # toSum return sum(it) # toSum
def _jsF(self, meta): # toSum fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toSum return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toSum()", fIdx # toSum
[docs]class toProd(BaseCli): # toProd
[docs] def __init__(self): # toProd """Calculates the product of a list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: # returns 362880 range(1,10) | toProd()""" # toProd super().__init__() # toProd
def _all_array_opt(self, it, level): # toProd if isinstance(it, np.ndarray): return np.prod(it, tuple(range(level, len(it.shape)))) # toProd elif hasTorch and isinstance(it, torch.Tensor): # toProd for i in range(level, len(it.shape)): it = torch.prod(it, level) # toProd return it # toProd return NotImplemented # toProd def _typehint(self, inp): return genericTypeHint(inp) # toProd
[docs] def __ror__(self, it): # toProd if isinstance(it, settings.arrayTypes): return it.prod() # toProd else: return math.prod(it) # toProd
def _jsF(self, meta): # toProd fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toProd return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toProd()", fIdx # toProd
[docs]class toAvg(BaseCli): # toAvg
[docs] def __init__(self): # toAvg """Calculates average of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: # returns 4.5 range(10) | toAvg() # returns nan [] | toAvg()""" # toAvg super().__init__() # toAvg
def _all_array_opt(self, it, level): # toAvg bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None) # toAvg return NotImplemented if bm is None else bm.mean(it, tuple(range(level, len(it.shape)))) # toAvg def _typehint(self, inp): # toAvg i = None # toAvg if isinstance(inp, tListIterSet): i = inp.child # toAvg if isinstance(inp, tCollection): i = inp.children[0] # toAvg if isinstance(inp, tArrayTypes): i = inp.child # toAvg if i is not None: return float if i == int else i # toAvg return tAny() # toAvg
[docs] def __ror__(self, it:Iterator[float]): # toAvg if isinstance(it, settings.arrayTypes): return it.mean() # toAvg s = 0; i = -1 # toAvg for i, v in enumerate(it): s += v # toAvg i += 1 # toAvg if not k1lib.settings.cli.strict and i == 0: return float("nan") # toAvg return s / i # toAvg
def _jsF(self, meta): # toAvg fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toAvg return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toAvg()", fIdx # toAvg
if hasTorch: # toAvg torchVer = int(torch.__version__.split(".")[0]) # toAvg if torchVer >= 2: # toAvg def torchStd(it, ddof, dim=None): return torch.std(it, dim, correction=ddof) # toAvg else: # toAvg def torchStd(it, ddof, dim=None): # toAvg if ddof == 0: return torch.std(it, dim, unbiased=False) # toAvg if ddof == 1: return torch.std(it, dim, unbiased=True) # toAvg raise Exception(f"Please install PyTorch 2, as version 1 don't support correction factor of {ddof}") # toAvg else: # toAvg def torchStd(it, ddof): raise Exception("PyTorch not installed") # toAvg
[docs]class toStd(BaseCli): # toStd
[docs] def __init__(self, ddof:int=0): # toStd """Calculates standard deviation of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray` to be faster. Example:: # returns 2.8722813232690143 range(10) | toStd() # returns nan [] | toStd() :param ddof: "delta degree of freedom". The divisor used in calculations is ``N - ddof``""" # toStd self.ddof = ddof # toStd
def _all_array_opt(self, it, level): # toStd n = len(it.shape); ddof = self.ddof; dim = tuple(range(level, n)) # toStd if isinstance(it, np.ndarray): return np.std(it, ddof=ddof, axis=dim) # toStd elif hasTorch and isinstance(it, torch.Tensor): return torchStd(it, ddof, dim) # toStd return NotImplemented # toStd
[docs] def __ror__(self, it): # toStd ddof = self.ddof # toStd if isinstance(it, settings.arrayTypes): # toStd if isinstance(it, np.ndarray): return np.std(it, ddof=ddof) # toStd elif hasTorch and isinstance(it, torch.Tensor): return torchStd(it, ddof) # toStd return np.std(np.array(list(it))) # toStd
def _jsF(self, meta): # toStd fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toStd return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toStd()", fIdx # toStd
toMean = toAvg # toStd
[docs]class toMax(BaseCli): # toMax
[docs] def __init__(self): # toMax """Calculates the max of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: # returns 6 [2, 5, 6, 1, 2] | toMax()""" # toMax super().__init__() # toMax
def _all_array_opt(self, it, level): # toMax if isinstance(it, np.ndarray): return np.max(it, tuple(range(level, len(it.shape)))) # toMax elif hasTorch and isinstance(it, torch.Tensor): # toMax for i in range(level, len(it.shape)): it = torch.max(it, level)[0] # toMax return it # toMax return NotImplemented # toMax
[docs] def __ror__(self, it:Iterator[float]) -> float: # toMax if isinstance(it, settings.arrayTypes): return it.max() # toMax return max(it) # toMax
def _jsF(self, meta): # toMax fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toMax return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toMax()", fIdx # toMax
[docs]class toMin(BaseCli): # toMin
[docs] def __init__(self): # toMin """Calculates the min of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: # returns 1 [2, 5, 6, 1, 2] | toMin()""" # toMin super().__init__() # toMin
def _all_array_opt(self, it, level): # toMin if isinstance(it, np.ndarray): return np.min(it, tuple(range(level, len(it.shape)))) # toMin elif hasTorch and isinstance(it, torch.Tensor): # toMin for i in range(level, len(it.shape)): it = torch.min(it, level)[0] # toMin return it # toMin return NotImplemented # toMin
[docs] def __ror__(self, it:Iterator[float]) -> float: # toMin if isinstance(it, settings.arrayTypes): return it.min() # toMin return min(it) # toMin
def _jsF(self, meta): # toMin fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toMin return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toMin()", fIdx # toMin
[docs]class toArgmin(BaseCli): # toArgmin
[docs] def __init__(self): # toArgmin """Get the input iterator's index of the min value. Example:: [2, 3, 4, 1, 5] | toArgmin() # returns 3 """ # toArgmin pass # toArgmin
[docs] def __ror__(self, it): # toArgmin if isinstance(it, k1lib.settings.cli.arrayTypes): return it.argmin().item() # toArgmin else: # toArgmin try: len(it); return np.array(it) | self # toArgmin except: np.array(list(it)) | self # toArgmin
[docs]class toArgmax(BaseCli): # toArgmax
[docs] def __init__(self): # toArgmax """Get the input iterator's index of the max value. Example:: [2, 3, 4, 1, 5] | toArgmin() # returns 4 """ # toArgmax pass # toArgmax
[docs] def __ror__(self, it): # toArgmax if isinstance(it, k1lib.settings.cli.arrayTypes): return it.argmax().item() # toArgmax else: # toArgmax try: len(it); return np.array(it) | self # toArgmax except: np.array(list(it)) | self # toArgmax
settings.add("font", None, "default font file. Best to use .ttf files, used by toPIL()") # toArgmax settings.add("chem", k1lib.Settings().add("imgSize", 200, "default image size used in toPIL() when drawing rdkit molecules"), "chemistry-related settings") # toArgmax def cropToContentNp(ogIm, pad=10): # cropToContentNp dim = len(ogIm.shape); im = ogIm # cropToContentNp if dim > 2: im = im.mean(0) # cropToContentNp coords = np.argwhere(im.max()-im); x_min, y_min = coords.min(axis=0); x_max, y_max = coords.max(axis=0) # cropToContentNp return ogIm[x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad] if dim == 2 else ogIm[:,x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad] # cropToContentNp def cropToContentPIL(im, pad=0): # cropToContentPIL im = im | toTensor(int) | cli.op().numpy() | cli.aS(cropToContentNp, pad) # cropToContentPIL return torch.from_numpy(im).permute(1, 2, 0) | toImg() if len(im.shape) > 2 else im | toImg() # cropToContentPIL
[docs]class toPIL(BaseCli): # toPIL
[docs] def __init__(self, closeFig=True, crop=True): # toPIL """Converts multiple data types into a PIL image. Example:: # grabs first image in the current folder ls(".") | toPIL().all() | item() # converts from tensor/array to image torch.randn(100, 200) | toPIL() # grabs image, converts to byte stream, and converts back to image "abc.jpg" | toPIL() | toBytes() | toPIL() # converts paragraphs to image ["abc", "def"] | toPIL() # converts SMILES string to molecule, then to image "c1ccc(C)cc1" | toMol() | toImg() # sketches a graphviz plot, converts to svg then renders the svg as an image ["ab", "bc", "ca"] | (kgv.sketch() | kgv.edges()) | toHtml() | toImg() You can also save a matplotlib figure by piping in a :class:`matplotlib.figure.Figure` object:: x = np.linspace(0, 4) plt.plot(x, x**2) plt.gcf() | toPIL() .. note:: If you are working with image tensors, which is typically have dimensions of (C, H, W), you have to permute it to PIL's (H, W, C) first before passing it into this cli. Also it's expected that your tensor image ranges from 0-255, and not 0-1. Make sure you renormalize it :param closeFig: if input is a matplotlib figure, then closes the figure after generating the image :param crop: whether to crop white spaces around an image or not""" # toPIL import PIL; self.PIL = PIL; self.closeFig = closeFig; self.crop = crop # toPIL
def _typehint(self, inp): # toPIL return PIL.Image.Image # toPIL
[docs] def __ror__(self, path) -> "PIL.Image.Image": # toPIL if isinstance(path, Svg): # toPIL import tempfile; a = tempfile.NamedTemporaryFile() # toPIL import cairosvg; cairosvg.svg2png(bytestring=path,write_to=a.name); im = a.name | toImg() # toPIL return im # toPIL if isinstance(path, str): # toPIL return self.PIL.Image.open(os.path.expanduser(path)) # toPIL if isinstance(path, bytes): # toPIL return self.PIL.Image.open(io.BytesIO(path)) # toPIL if isinstance(path, torch.Tensor): path = path.numpy() # toPIL if isinstance(path, np.ndarray): # toPIL return self.PIL.Image.fromarray(path.astype("uint8")) # toPIL if isinstance(path, mpl.figure.Figure): # toPIL canvas = path.canvas; canvas.draw() # toPIL img = self.PIL.Image.frombytes('RGB', canvas.get_width_height(), canvas.tostring_rgb()) # toPIL if self.closeFig: plt.close(path) # toPIL return img | cli.aS(cropToContentPIL) # toPIL if hasGraphviz and isinstance(path, graphviz.Digraph): # toPIL import tempfile; a = tempfile.NamedTemporaryFile() # toPIL path.render(a.name, format="jpeg"); # toPIL fn = f"{a.name}.jpeg"; im = fn | toImg() # toPIL try: os.remove(fn) # toPIL except: pass # toPIL return im # toPIL if hasRdkit and isinstance(path, rdkit.Chem.rdchem.Mol): # toPIL sz = settings.chem.imgSize # toPIL return self.__ror__(rdkit.Chem.Draw.MolsToGridImage([path], subImgSize=[sz, sz]).data) | (cli.aS(cropToContentPIL) if self.crop else cli.iden()) # toPIL path = path | cli.deref() # toPIL if len(path) > 0 and isinstance(path[0], str): # toPIL from PIL import ImageDraw # toPIL h = path | cli.shape(0); w = path | cli.shape(0).all() | cli.aS(max) # toPIL image = self.PIL.Image.new("L", ((w+1)*20, (h+1)*60), 255) # toPIL font = PIL.ImageFont.truetype(settings.font, 18) if settings.font else None # toPIL ImageDraw.Draw(image).text((20, 20), path | cli.join("\n"), 0, font=font) # toPIL return np.array(image)/255 | (cli.aS(cropToContentNp) if self.crop else iden()) | cli.op()*255 | toImg() # toPIL return NotImplemented # toPIL
toImg = toPIL # toPIL
[docs]class toRgb(BaseCli): # toRgb
[docs] def __init__(self): # toRgb """Converts greyscale/rgb PIL image to rgb image. Example:: # reads image file and converts it to rgb "a.png" | toPIL() | toRgb()""" # toRgb import PIL; self.PIL = PIL # toRgb
def _typehint(self, inp): return inp # toRgb
[docs] def __ror__(self, i): # toRgb if i.getbands() == ("R", "G", "B"): return i # toRgb rgbI = self.PIL.Image.new("RGB", i.size) # toRgb rgbI.paste(i); return rgbI # toRgb
[docs]class toRgba(BaseCli): # toRgba
[docs] def __init__(self): # toRgba """Converts random PIL image to rgba image. Example:: # reads image file and converts it to rgba "a.png" | toPIL() | toRgba()""" # toRgba import PIL; self.PIL = PIL # toRgba
def _typehint(self, inp): return inp # toRgba
[docs] def __ror__(self, i): # toRgba if i.getbands() == ("R", "G", "B", "A"): return i # toRgba rgbI = self.PIL.Image.new("RGBA", i.size) # toRgba rgbI.paste(i); return rgbI # toRgba
[docs]class toGray(BaseCli): # toGray
[docs] def __init__(self): # toGray """Converts random PIL image to a grayscale image. Example:: # reads image file and converts it to rgba "a.png" | toPIL() | toGray()""" # toGray import PIL; self.PIL = PIL # toGray
def _typehint(self, inp): return inp # toGray
[docs] def __ror__(self, i): # toGray if i.getbands() == ("L"): return i # toGray return self.PIL.ImageOps.grayscale(i) # toGray
[docs]class toDict(BaseCli): # toDict
[docs] def __init__(self, rows=True, f=None): # toDict """Converts 2 Iterators, 1 key, 1 value into a dictionary. Example:: # returns {1: 3, 2: 4} [[1, 3], [2, 4]] | toDict() # returns {1: 3, 2: 4} [[1, 2], [3, 4]] | toDict(False) If ``rows`` is a string, then it will build a dictionary from key-value pairs delimited by this character. For example:: ['gene_id "ENSG00000290825.1"', 'transcript_id "ENST00000456328.2"', 'gene_type "lncRNA"', 'gene_name "DDX11L2"', 'transcript_type "lncRNA"', 'transcript_name "DDX11L2-202"', 'level 2', 'transcript_support_level "1"', 'tag "basic"', 'tag "Ensembl_canonical"', 'havana_transcript "OTTHUMT00000362751.1"'] | toDict(" ") That returns:: {'gene_id': '"ENSG00000290825.1"', 'transcript_id': '"ENST00000456328.2"', 'gene_type': '"lncRNA"', 'gene_name': '"DDX11L2"', 'transcript_type': '"lncRNA"', 'transcript_name': '"DDX11L2-202"', 'level': '2', 'transcript_support_level': '"1"', 'tag': '"Ensembl_canonical"', 'havana_transcript': '"OTTHUMT00000362751.1"'} :param rows: if True, reads input in row by row, else reads in list of columns :param f: if specified, return a defaultdict that uses this function as its generator""" # toDict self.rows = rows # toDict if f is not None: self.f = lambda d: defaultdict(f, d) # toDict else: self.f = lambda x: x # toDict
[docs] def __ror__(self, it:Tuple[Iterator[T], Iterator[T]]) -> dict: # toDict r = self.rows; f = self.f # toDict if r: # toDict if isinstance(r, str): return it | cli.apply(cli.aS(lambda x: x.split(" ")) | cli.head(1).split() | cli.item() + cli.join(" ")) | toDict() # toDict return f({_k:_v for _k, _v in it}) # toDict return f({_k:_v for _k, _v in zip(*it)}) # toDict
def _jsF(self, meta): # toDict fIdx = init._jsFAuto(); dataIdx = init._jsDAuto() # toDict if not self.rows: raise Exception("toDict._jsF() doesn't support .rows=False yet") # toDict return f"const {fIdx} = ({dataIdx}) => {dataIdx}.toDict()", fIdx # toDict
def _toop(toOp, c, force, defaultValue): # _toop return cli.apply(toOp, c) | (cli.apply(lambda x: x or defaultValue, c) if force else cli.filt(cli.op() != None, c)) # _toop def _toFloat(e) -> Union[float, None]: # _toFloat try: return float(e) # _toFloat except: return None # _toFloat
[docs]class toFloat(BaseCli): # toFloat
[docs] def __init__(self, *columns, mode=2): # toFloat """Converts every row into a float. Example:: # returns [1, 3, -2.3] ["1", "3", "-2.3"] | toFloat() | deref() # returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']] [["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | deref() With weird rows:: # returns [[1.0, 'a'], [8.0, 'c']] [["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0) | deref() # returns [[1.0, 'a'], [0.0, 'b'], [8.0, 'c']] [["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0, force=True) | deref() This also works well with :class:`torch.Tensor` and :class:`numpy.ndarray`, as they will not be broken up into an iterator:: # returns a numpy array, instead of an iterator np.array(range(10)) | toFloat() :param columns: if nothing, then will convert each row. If available, then convert all the specified columns :param mode: different conversion styles - 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed - 1: if there are errors, then replace it with zero - 2: if there are errors, then eliminate the row""" # toFloat self.columns = columns; self.mode = mode # toFloat
[docs] def __ror__(self, it): # toFloat columns = self.columns; mode = self.mode # toFloat if len(columns) == 0: # toFloat if isinstance(it, np.ndarray): return it.astype(float) # toFloat if isinstance(it, torch.Tensor): return it.float() # toFloat if mode == 0: return (float(e) for e in it) # toFloat return it | _toop(_toFloat, None, mode == 1, 0.0) # toFloat else: return it | cli.init.serial(*(_toop(_toFloat, c, mode == 1, 0.0) for c in columns)) # toFloat
def _jsF(self, meta): # toFloat fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); cols = self.columns # toFloat if len(cols) == 0: # toFloat if mode == 0: return f"const {fIdx} = ({dataIdx}) => {dataIdx}.map((v) => parseFloat(v))", fIdx # toFloat if mode == 1: return f"const {fIdx} = ({dataIdx}) => {dataIdx}.map((v) => {{ const a = parseFloat(v); return a === a ? a : 0 }})", fIdx # toFloat if mode == 2: return f"const {fIdx} = ({dataIdx}) => {{ const ans = []; for (const v of {dataIdx}) {{ const a = parseFloat(v); if (a === a) ans.push(a); }}; return ans; }}" # toFloat else: return f"""\ const {fIdx} = ({dataIdx}) => {{ const ans = []; for (const row of {dataIdx}) {{ {'ans.push(row.map(parseFloat));' if mode == 0 else ''} {'ans.push(row.map(parseFloat).map((v) => (v === v ? v : 0)));' if mode == 1 else ''} {'const rowp = row.map(parseFloat);if (rowp.map((v) => v === v).every((v) => v)) ans.push(rowp);' if mode == 2 else ''} }} return ans; }}""", fIdx # toFloat
def _toInt(e) -> Union[int, None]: # _toInt try: return int(float(e)) # _toInt except: return None # _toInt
[docs]class toInt(BaseCli): # toInt
[docs] def __init__(self, *columns, mode=2): # toInt """Converts every row into an integer. Example:: # returns [1, 3, -2] ["1", "3", "-2.3"] | toInt() | deref() :param columns: if nothing, then will convert each row. If available, then convert all the specified columns :param mode: different conversion styles - 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed - 1: if there are errors, then replace it with zero - 2: if there are errors, then eliminate the row See also: :meth:`toFloat`""" # toInt self.columns = columns; self.mode = mode; # toInt
[docs] def __ror__(self, it): # toInt columns = self.columns; mode = self.mode # toInt if len(columns) == 0: # toInt if isinstance(it, np.ndarray): return it.astype(int) # toInt if isinstance(it, torch.Tensor): return it.int() # toInt if mode == 0: return (int(e) for e in it) # toInt return it | _toop(_toInt, None, mode == 1, 0) # toInt else: return it | cli.init.serial(*(_toop(_toInt, c, mode == 1, 0.0) for c in columns)) # toInt
def _jsF(self, meta): # toInt fIdx = init._jsFAuto(); dataIdx = init._jsDAuto(); cols = self.columns # toInt if len(cols) == 0: # toInt if mode == 0: return f"const {fIdx} = ({dataIdx}) => {dataIdx}.map((v) => parseInt(v))", fIdx # toInt if mode == 1: return f"const {fIdx} = ({dataIdx}) => {dataIdx}.map((v) => {{ const a = parseInt(v); return a === a ? a : 0 }})", fIdx # toInt if mode == 2: return f"const {fIdx} = ({dataIdx}) => {{ const ans = []; for (const v of {dataIdx}) {{ const a = parseInt(v); if (a === a) ans.push(a); }}; return ans; }}" # toInt else: return f"""\ const {fIdx} = ({dataIdx}) => {{ const ans = []; for (const row of {dataIdx}) {{ {'ans.push(row.map(parseInt));' if mode == 0 else ''} {'ans.push(row.map(parseInt).map((v) => (v === v ? v : 0)));' if mode == 1 else ''} {'const rowp = row.map(parseInt);if (rowp.map((v) => v === v).every((v) => v)) ans.push(rowp);' if mode == 2 else ''} }} return ans; }}""", fIdx # toInt
[docs]class toBytes(BaseCli): # toBytes
[docs] def __init__(self, dataType=None): # toBytes """Converts several object types to bytes. Example:: # converts string to bytes "abc" | toBytes() # converts image to bytes in jpg format torch.randn(200, 100) | toImg() | toBytes() # converts image to bytes in png format torch.randn(200, 100) | toImg() | toBytes("PNG") "some_file.mp3" | toAudio() | toBytes("mp3") .. admonition:: Custom datatype It is possible to build objects that can interoperate with this cli, like this:: class custom1: def __init__(self, config=None): ... def _toBytes(self): return b"abc" class custom2: def __init__(self, config=None): ... def _toBytes(self, dataType): if dataType == "png": return b"123" else: return b"456" custom1() | toBytes() # returns b"abc" custom2() | toBytes() # returns b"456" custom2() | toBytes("png") # returns b"123" When called upon, :class:`toBytes` will detect that the input has the ``_toBytes`` method, which will prompt it to execute that method of the complex object. Of course, this means that you can return anything, not necessarily bytes, but to maintain intuitiveness, you should return either bytes or iterator of bytes :param dataType: depending on input. If it's an image then this can be png, jpg. If it's a sound then this can be mp3, wav or things like that""" # toBytes self.dataType = dataType # toBytes
[docs] def __ror__(self, it): # toBytes if isinstance(it, str): return it.encode() # toBytes if hasPIL: # toBytes if isinstance(it, PIL.Image.Image): # toBytes it = it | toRgb(); buffered = io.BytesIO() # toBytes it.save(buffered, format=(self.dataType or "JPEG")); return buffered.getvalue() # toBytes if hasattr(it, "_toBytes"): # toBytes n = len(inspect.getfullargspec(it._toBytes).args[1:]) # toBytes if n == 0: return it._toBytes() # toBytes elif n == 1: return it._toBytes(self.dataType) # toBytes else: raise Exception(f"{it.__class__.__name__} have 2 or more arguments, which is unsupported") # toBytes import dill; return dill.dumps(it) # toBytes
mpld3 = k1lib.dep("mpld3") # toBytes class Svg(str): pass # Svg class DataUri: # DataUri def __init__(self, uri:str): # DataUri self.uri = uri # "data:image/png;base64, ..." # DataUri self.mime = uri.split(";")[0].split(":")[-1] # "image/png" # DataUri self.mimeBase = self.mime.split("/")[0] # "image" # DataUri def _repr_html_(self): # DataUri if self.mimeBase == "image": return f"<img src=\"{self.uri}\"/>" # DataUri if self.mime == "text/html": return base64.b64decode(self.uri.split("base64,")[-1]).decode() # DataUri def __repr__(self): # DataUri uri = self.uri # DataUri return f"<DataUri mime='{self.mime}', self.uri='{(uri[:75] + '...') if len(uri) > 75 else uri}'>" # DataUri def _dataUriHtml(it): return DataUri(f"data:text/html;base64, {base64.b64encode(it.encode()).decode()}") # _dataUriHtml
[docs]class toDataUri(BaseCli): # toDataUri
[docs] def __init__(self): # toDataUri """Converts incoming object into data uri scheme. Data uris are the things that look like "data:image/png;base64, ...", or "data:text/html;base64, ...". This is a convenience tool mainly for other tools, and not quite useful directly. Example:: randomImg = cat("https://mlexps.com/ergun.png", False) | toImg() # returns PIL image randomImg | toDataUri() # returns k1lib.cli.conv.DataUri object with .mime field "image/png" and .uri field "data:image/png;base64, ..." randomImg | toDataUri() | toHtml() # returns hmtl string `<img src="data:image/png;base64, ..."/>` randomImg | toHtml() # same like above. toHtml() actually calls toDataUri() behind the scenes randomImg | toDataUri() | toAnchor() # creates anchor tag (aka link elements "<a></a>") that, when clicked, displays the image in a new tab randomImg | toAnchor() # same as above. toAnchor() actually calls toDataUri() behind the scenes """ # toDataUri self.throw = False # can be configured by outside clis, like toHtml() # toDataUri
[docs] def __ror__(self, it): # toDataUri if isinstance(it, str): return _dataUriHtml(it) # toDataUri if isinstance(it, DataUri): return it # toDataUri if hasPIL and isinstance(it, PIL.Image.Image): # toDataUri it = it | toBytes(dataType="PNG") | cli.aS(base64.b64encode) | cli.op().decode() # toDataUri return DataUri(f"data:image/png;base64, {it}") # toDataUri try: return DataUri(it._toDataUri()) # toDataUri except Exception as e: # toDataUri if self.throw: raise Exception(f"toDataUri() called on an unfamiliar object, and the object doesn't implement _toDataUri(). Error: {e}") # toDataUri return _dataUriHtml(it | toHtml()) # toDataUri
[docs]class toAnchor(BaseCli): # toAnchor
[docs] def __init__(self, text:str="click here"): # toAnchor """Converts incoming object into a html anchor tag that, when clicked, displays the incoming object's html in another tab. Example:: randomImg = cat("https://mlexps.com/ergun.png", False) | toImg() # returns PIL image randomImg | toAnchor() # returns html string `<a href="data:image/png;base64, ..."></a>` On some browsers, there's sort of a weird bug where a new tab would open, but there's nothing displayed on that tab. If you see this is happening, just press F5 or Ctrl+R to refresh the page and it should display everything nicely :param text: text to display inside of the anchor""" # toAnchor self.text = text # toAnchor
[docs] def __ror__(self, it:str): # toAnchor s = it | toDataUri() | cli.op().uri # toAnchor return f"<a href=\"{s}\" target=\"_blank\">{self.text}</a>" # toAnchor
[docs]class toHtml(BaseCli): # toHtml
[docs] def __init__(self): # toHtml """Converts several object types to bytes. Example:: # converts PIL image to html <img> tag torch.randn(200, 100) | toImg() | toHtml() # converts graphviz graph to svg text (which is essentially html) g = k1.digraph(); g(*"abc"); g(*"bcd"); g | toHtml() # converts plotly graphs to html import plotly.express as px; import pandas as pd df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [10, 11, 12, 14, 15]}) fig = px.line(df, x='x', y='y', title='Simple Line Chart') fig | toHtml() # converts matplotlib plot to image, and then to html. Do this if you want a static plot x = np.linspace(-2, 2); y = x**2 plt.plot(x, x**2); plt.gcf() | toImg() | toHtml() # converts matplotlib plot to D3.js html sketch plt.plot(x, x**2); plt.gcf() | toHtml() """ # toHtml pass # toHtml
[docs] def __ror__(self, it): # toHtml if isinstance(it, str): return it # toHtml if hasPlotly and isinstance(it, plotly.graph_objs._figure.Figure): # toHtml out = io.StringIO(); it.write_html(out); out.seek(0); return out.read() # toHtml if isinstance(it, mpl.figure.Figure): res = mpld3.fig_to_html(it); plt.close(it); return res # toHtml if hasGraphviz and isinstance(it, graphviz.Digraph): # toHtml import tempfile; a = tempfile.NamedTemporaryFile() # toHtml it.render(a.name, format="svg"); # toHtml fn = f"{a.name}.svg"; im = cli.cat(fn) | cli.join("") # toHtml try: os.remove(fn) # toHtml except: pass # toHtml return Svg(im) # toHtml try: # toHtml res = it._repr_html_() # toHtml if res: return res # toHtml except: pass # toHtml try: # toHtml res = it._toHtml() # toHtml if res: return res # toHtml except: pass # toHtml try: # toHtml f = toDataUri(); f.throw = True # toHtml res = (it | f)._repr_html_() # toHtml if res: return res # toHtml except: pass # toHtml return it.__repr__() # toHtml
try: # toHtml from rdkit import Chem # toHtml from rdkit.Chem import Draw # toHtml from rdkit.Chem import AllChem # toHtml from rdkit.Chem.Draw import IPythonConsole # toHtml IPythonConsole.drawOptions.addAtomIndices = True # toHtml __all__ = [*__all__, "toMol", "toSmiles"] # toHtml def toMol(): # toHtml """Smiles to molecule. Example:: "c1ccc(C)cc1" | toMol()""" # toHtml return cli.aS(Chem.MolFromSmiles) # toHtml def toSmiles(): # toHtml """Molecule to smiles. Example:: "c1ccc(C)cc1" | toMol() | toSmiles()""" # toHtml return cli.aS(Chem.MolToSmiles) # toHtml except: pass # toHtml import unicodedata, hashlib # toHtml
[docs]def toAscii(): # toAscii """Converts complex unicode text to its base ascii form. Example:: "hà nội" | toAscii() # returns "ha noi" Taken from https://stackoverflow.com/questions/2365411/convert-unicode-to-ascii-without-errors-in-python""" # toAscii return cli.aS(lambda word: unicodedata.normalize('NFKD', word).encode('ascii', 'ignore')) # toAscii
[docs]def toHash() -> str: # toHash """Converts some string into some hash string. Example:: "abc" | toHash() # returns 'gASVJAAAAAAAAABDILp4Fr+PAc/qQUFA3l2uIiOwA2Gjlhd6nLQQ/2HyABWtlC4=' Why not just use the builtin function ``hash("abc")``? Because it generates different hashes for different interpreter sessions, and that breaks many of my applications that need the hash value to stay constant forever.""" # toHash def hashF(msg:str) -> str: m = hashlib.sha256(); m.update(f"{msg}".encode()); return k1lib.encode(m.digest()) # toHash return cli.aS(hashF) # toHash
import csv; pd = k1lib.dep("pandas") # toHash
[docs]class toCsv(BaseCli): # toCsv
[docs] def __init__(self, allSheets=False): # toCsv """Converts a csv file name into a table. Example:: "abc.csv" | toCsv() # returns table of values "def.xlsx" | toCsv() # returns table of values in the first sheet "def.xlsx" | toCsv(True) # returns List[Sheet name (str), table of values] .. warning:: Note that this is pretty slow compared to just splitting by semicolons. If your dataset doesn't have anything complicated like semicolons in quotes, then just do ``op().split(",").all()`` If your dataset does have complicated quotes, then I'd suggest reading the csv using this cli, then convert it to a tsv file (tab-separated value). Then you can always just split the string using tab characters :param allSheets: if input is an Excel sheet, whether to read in all sheets or just the first sheet. No effect if input is a normal csv file""" # toCsv self.allSheets = allSheets # toCsv
[docs] def __ror__(self, fn:str): # toCsv fn = os.path.expanduser(fn) # toCsv if fn.endswith(".xls") or fn.endswith(".xlsx"): # toCsv if self.allSheets: return [[k, v.values] for k,v in pd.read_excel(fn, sheet_name=None).items()] # toCsv else: return pd.read_excel(fn).values # toCsv def gen(): # toCsv with open(fn) as f: yield from csv.reader(f) # toCsv return gen() # toCsv
import validators, shutil, html, io, os; pydub = k1lib.dep("pydub") # toCsv class Audio: # Audio def __init__(self, raw:"pydub.audio_segment.AudioSegment"): self.raw = raw # Audio def resample(self, rate) -> "self": # Audio """Resamples the audio""" # Audio if rate: # Audio self.raw = self.raw.set_frame_rate(rate) # Audio self.data = np.array(self.raw.get_array_of_samples())/2.15e9 # Audio self.rate = self.raw.frame_rate # Audio return self # Audio def _toBytes(self, dataType) -> bytes: f = io.BytesIO(); self.raw.export(f, format=(dataType or "wav")); return f.read() # Audio def __repr__(self): return f"<Audio duration={k1lib.fmt.time(self.raw.duration_seconds)} rate={self.raw.frame_rate}>" # Audio def __len__(self): return int(self.raw.frame_count()) # Audio def __getitem__(self, slice_): # Audio if not isinstance(slice_, slice): return None # Audio data = np.array(self.raw.get_array_of_samples()) | cli.batched(self.raw.channels) | cli.op()[slice_] # Audio return Audio(pydub.AudioSegment(data.tobytes(), frame_rate=self.raw.frame_rate, sample_width=self.raw.sample_width, channels=self.raw.channels)) # Audio def _repr_html_(self): # plays a short sample, first 10s or sth like that # Audio return f"{html.escape(self.__repr__())}<br>{self.raw[:10000]._repr_html_()}" # Audio
[docs]class toAudio(BaseCli): # toAudio
[docs] def __init__(self, rate=None): # toAudio """Reads audio from either a file or a URL or from bytes directly. Example:: au = "some_file.wav" | toAudio() # can display in a notebook, which will preview the first 10 second au | toBytes() # exports audio as .wav file au | toBytes("mp3") # exports audio as .mp3 file au.resample(16000) # resamples audio to new rate au | head(0.1) # returns new Audio that has the first 10% of the audio only au | splitW(8, 2) # splits Audio into 2 Audios, first one covering 80% and second one covering 20% of the track au.raw # internal pydub.AudioSegment object. If displayed in a notebook, will play the whole thing You can also use this on any Youtube video or random mp3 links online and on raw bytes:: "https://www.youtube.com/watch?v=FtutLA63Cp8" | toAudio() # grab Bad Apple song from internet cat("some_file.wav", False) | toAudio() # grab from raw bytes of mp3 or wav, etc. """ # toAudio self.rate = rate # toAudio
[docs] def __ror__(self, it:"str|byte") -> Audio: # toAudio if isinstance(it, str): # toAudio if os.path.exists(os.path.expanduser(it)): fn = os.path.expanduser(it); tmp = False # toAudio elif validators.url(it): # toAudio if not shutil.which("yt-dlp"): raise Exception(f"'{it}' looks like a link, but the required 'yt-dlp' binary is not found. Please install it by doing `pip install yt-dlp`") # toAudio fn = None | cli.cmd(f"yt-dlp -o - -x {it}", mode=0, text=False) | cli.item() | cli.file(); tmp = True # toAudio else: raise Exception(f"The file '{it}' does not exist, and it doesn't look like a URL") # toAudio elif isinstance(it, bytes): fn = it | cli.file(); tmp = True # toAudio else: raise Exception(f"Unknown {type(it)} audio type") # toAudio res = Audio(pydub.AudioSegment.from_file(fn)).resample(self.rate) # toAudio if tmp: os.remove(fn) # toAudio return res # toAudio
dateutil = k1lib.dep("dateutil") # toAudio
[docs]class toUnix(BaseCli): # toUnix
[docs] def __init__(self, tz:"str | dateutil.tz.tz.tzfile"=None): # toUnix """Tries anything piped in into a unix timestamp. If can't convert then return None. Example:: Local time zone independent:: "2023" | toUnix() # returns 2023, or 2023 seconds after unix epoch. Might be undesirable, but has to support raw ints/floats "2023-11-01T00Z" | toUnix() # midnight Nov 1st 2023 GMT "2023-11-01T00:00:00-04:00" | toUnix() # midnight Nov 1st 2023 EST "2023-11-01" | toUnix("US/Pacific") # midnight Nov 1st 2023 PST "2023-11-01" | toUnix("UTC") # midnight Nov 1st 2023 UTC Local time zone dependent (assumes EST):: "2023-11" | toUnix() # if today's Nov 2nd EST, then this would be 1698897600, or midnight Nov 2nd 2023 EST "2023-11-04" | toUnix() # midnight Nov 4th 2023 EST Feel free to experiment more, but in general, this is pretty versatile in what it can convert. With more effort, I'd probably make this so that every example given will not depend on local time, but since I just use this to calculate time differences, I don't really care. :param tz: Timezone, like "US/Eastern", "US/Pacific". If not specified, then assumes local timezone""" # toUnix if tz: self.tz = tz if isinstance(tz, dateutil.tz.tz.tzfile) else dateutil.tz.gettz(tz) # toUnix else: self.tz = None # toUnix
[docs] def __ror__(self, t): # toUnix try: return float(t) # toUnix except: # toUnix try: # toUnix a = dateutil.parser.parse(t) # toUnix if self.tz: a = a.replace(tzinfo=self.tz) # toUnix return a.timestamp() # toUnix except: return None # toUnix
from datetime import datetime as dt # toUnix
[docs]class toIso(BaseCli): # toIso
[docs] def __init__(self): # toIso """Converts unix timestamp into ISO 8601 string format. Example:: 1701382420 | toIso() # returns '2023-11-30T17:13:40', which is correct in EST time 1701382420 | toIso() | toUnix() # returns 1701382420, the input timestamp, showing it's correct 1701382420.123456789 | toIso() # returns '2023-11-30T17:13:40.123457' As you might have noticed, this cli depends on the timezone of the host computer """ # toIso pass # toIso
[docs] def __ror__(self, it): # toIso return dt.fromtimestamp(it).isoformat() # toIso
[docs]class toYMD(BaseCli): # toYMD
[docs] def __init__(self, idx=None, mode=int): # toYMD """Converts unix timestamp into tuple (year, month, day, hour, minute, second). Example:: 1701382420 | toYMD() # returns [2023, 11, 30, 17, 13, 40] in EST timezone 1701382420 | toYMD(0) # returns 2023 1701382420 | toYMD(1) # returns 11 1701382395 | toYMD(mode=str) # returns ['2023', '11', '30', '17', '13', '15'] :param idx: if specified, take the desired element only. If 0, then take year, 1, then month, etc. :param mode: either int or str. If str, then returns nicely adjusted numbers""" # toYMD self.idx = idx; self.mode = mode # toYMD
[docs] def __ror__(self, it): # toYMD d = dt.fromtimestamp(it) # toYMD if self.mode == int: res = [d.year, d.month, d.day, d.hour, d.minute, d.second] # toYMD else: res = [f"{d.year}", f"{d.month}".rjust(2,"0"), f"{d.day}".rjust(2,"0"), # toYMD f"{d.hour}".rjust(2,"0"), f"{d.minute}".rjust(2,"0"), f"{d.second}".rjust(2,"0")] # toYMD return res if self.idx is None else res[self.idx] # toYMD
settings.add("toLinks", k1lib.Settings()\ .add("splitChars", ["<br>", "<div ", *"\n\t<> ,;"], "characters/strings to split the lines by, so that each link has the opportunity to be on a separate line, so that the first instance in a line don't overshadow everything after it")\ .add("protocols", ["http", "https", "ftp"], "list of recognized protocols to search for links, like 'http' and so on"), "conv.toLinks() settings"); # toYMD
[docs]class toMovingAvg(BaseCli): # toMovingAvg
[docs] def __init__(self, col:int=None, alpha=0.9, debias=True, v:float=0, dt:float=1): # toMovingAvg """Smoothes out sequential data using momentum. Example:: # returns [4.8, 4.62, 4.458]. 4.8 because 0.9*5 + 0.1*3 = 4.8, and so on [3, 3, 3] | toMovingAvg(v=5, debias=False) | deref() Sometimes you want to ignore the initial value, then you can turn on debias mode:: x = np.linspace(0, 10, 100); y = np.cos(x) plt.plot(x, y) plt.plot(x, y | toMovingAvg(debias=False) | deref()) plt.plot(x, y | toMovingAvg(debias=False, alpha=0.95) | deref()) plt.plot(x, y | toMovingAvg(debias=True) | deref()) plt.plot(x, y | toMovingAvg(debias=True, alpha=0.95) | deref()) plt.legend(["Signal", "Normal - 0.9 alpha", "Normal - 0.95 alpha", "Debiased - 0.9 alpha", "Debiased - 0.95 alpha"], framealpha=0.3) plt.grid(True) .. image:: ../images/movingAvg.png As you can see, normal mode still has the influence of the initial value at 0 and can't rise up fast, whereas the debias mode will ignore the initial value and immediately snaps to the first value. Also, the 2 graphs with 0.9 alpha snap together quicker than the 2 graphs with 0.95 alpha. Here's the effect of several alpha values: .. image:: ../images/movingAvg-alphas.png :param col: column to apply moving average to :param alpha: momentum term :param debias: whether to turn on debias mode or not :param v: initial value, doesn't matter in debias mode :param dt: pretty much never used, hard to describe, belongs to debias mode, checkout source code for details""" # toMovingAvg self.col = col; self.initV = v; self.alpha = alpha; self.debias = debias; self.dt = dt # toMovingAvg if debias and v != 0: raise Exception("Debias mode activated! This means that the initial value doesn't matter, yet you've specified one") # toMovingAvg if alpha > 1 or alpha < 0: raise Exception("Alpha is outside the [0, 1] range. which does not make sense") # toMovingAvg
[docs] def __ror__(self, it): # toMovingAvg m = value = self.initV; alpha = self.alpha; col = self.col # toMovingAvg if self.debias: # toMovingAvg dt = self.dt; t = 1; tooSmall = False # toMovingAvg if col is None: # toMovingAvg for v in it: # toMovingAvg m = m * alpha + v * (1 - alpha) # toMovingAvg if tooSmall: yield m # skips complex exponential calculation once it's small enough to speed things up # toMovingAvg else: # toMovingAvg exp = alpha**t; value = m / (1 - exp) # toMovingAvg tooSmall = 10*exp < (1-alpha); t += dt; yield value # toMovingAvg else: # toMovingAvg for row in it: # toMovingAvg m = m * alpha + row[col] * (1 - alpha) # toMovingAvg if tooSmall: yield [*row[:col], m, *row[col+1:]] # toMovingAvg else: # toMovingAvg exp = alpha**t; value = m / (1 - exp) # toMovingAvg tooSmall = 10**exp < (1-alpha); t += dt; yield [*row[:col], value, *row[col+1:]] # toMovingAvg else: # toMovingAvg if col is None: # toMovingAvg for v in it: m = m * alpha + v * (1 - alpha); yield m # toMovingAvg else: # toMovingAvg for row in it: # toMovingAvg m = m * alpha + row[col] * (1 - alpha) # toMovingAvg yield [*row[:col], m, *row[col+1:]] # toMovingAvg
cm = k1lib.dep("matplotlib.cm") # toMovingAvg
[docs]class toCm(BaseCli): # toCm
[docs] def __init__(self, col:int, cmap=None, title:str=None): # toCm """Converts the specified column to a bunch of color values, and adds a colorbar automatically. "cm" = "color map". Example:: import matplotlib.cm as cm exps = [1, 2, 3, 4, 5] x = np.linspace(-2, 2) data = exps | apply(lambda exp: [exp, x, x**exp]) | deref() # without toCm(), plots fine, demonstrates underlying mechanism, but doesn't allow plotting a separate colorbar data | normalize(0, mode=1) | apply(cm.viridis, 0) | ~apply(lambda c,x,y: plt.plot(x, y, color=c)) | ignore() # with toCm(), draws a colorbar automatically data | toCm(0, cm.viridis, "Exponential") | ~apply(lambda c,x,y: plt.plot(x, y, color=c)) | ignore() .. image:: ../images/toCm.png Functionality is kind of niche, but I need this over and over again, so have to make it :param col: column to convert float/int to color (tuple of 4 floats) :param cmap: colormap to use. If not specified, defaults to ``cm.viridis`` :param title: title of the colorbar, optional""" # toCm self.col = col; self.cmap = cmap or cm.viridis; self.title = title # toCm
[docs] def __ror__(self, it): # toCm col = self.col; cmap = self.cmap; title = self.title # toCm if col is None: # toCm if not isinstance(it, k1lib.settings.cli.arrayTypes): it = list(it) # toCm plt.colorbar(cm.ScalarMappable(norm=plt.Normalize(*it | cli.toMin() & cli.toMax()), cmap=cmap), ax=plt.gca(), label=title) # toCm return it | cli.normalize(None, 1) | cli.apply(cmap) # toCm else: # toCm it = it | cli.deref(2) # toCm plt.colorbar(cm.ScalarMappable(norm=plt.Normalize(*it | cli.cut(col) | cli.toMin() & cli.toMax()), cmap=cmap), ax=plt.gca(), label=title) # toCm return it | cli.normalize(col, 1) | cli.apply(cmap, col) # toCm