Source code for k1lib.cli.conv

# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for all short utilities that converts from 1 data type to another. They
might feel they have different styles, as :class:`toFloat` converts object iterator to
float iterator, while :class:`toPIL` converts single image url to single PIL image,
whereas :class:`toSum` converts float iterator into a single float value.

The general convention is, if the intended operation sounds simple (convert to floats,
strings, types, ...), then most likely it will convert iterator to iterator, as you
can always use the function directly if you only want to apply it on 1 object.

If it sounds complicated (convert to PIL image, tensor, ...) then most likely it will
convert object to object. Lastly, there are some that just feels right to input
an iterator and output a single object (like getting max, min, std, mean values)."""
__all__ = ["toTensor", "toRange", "toList",
           "toSum", "toProd", "toAvg", "toMean", "toMax", "toMin", "toPIL", "toImg",
           "toRgb", "toRgba", "toGray", "toDict",
           "toFloat", "toInt", "toBytes", "toHtml", "toAscii"]
import re, k1lib, math, os, numpy as np, io, base64, unicodedata
from k1lib.cli.init import BaseCli, Table, Row, T, yieldT; import k1lib.cli as cli
from k1lib.cli.typehint import *; import matplotlib as mpl; import matplotlib.pyplot as plt
from collections import deque; from typing import Iterator, Any, List, Set, Tuple, Dict, Callable, Union
settings = k1lib.settings.cli
try: import PIL; hasPIL = True
except: hasPIL = False
try: import torch; hasTorch = True
except: torch = k1lib.Object().withAutoDeclare(lambda: type("RandomClass", (object, ), {})); hasTorch = False
try: import rdkit; hasRdkit = True
except: hasRdkit = False
try: import graphviz; hasGraphviz = True
except: hasGraphviz = False
try: import plotly; import plotly.express as px; hasPlotly = True
except: hasPlotly = False
[docs]class toTensor(BaseCli):
[docs] def __init__(self, dtype=torch.float32): """Converts generator to :class:`torch.Tensor`. Essentially ``torch.tensor(list(it))``. Also checks if input is a PIL Image. If yes, turn it into a :class:`torch.Tensor` and return.""" self.dtype = dtype
[docs] def __ror__(self, it:Iterator[float]) -> torch.Tensor: try: import PIL; pic=it if isinstance(pic, PIL.Image.Image): # stolen from torchvision ToTensor transform mode_to_nptype = {'I': np.int32, 'I;16': np.int16, 'F': np.float32} img = torch.from_numpy(np.array(pic, mode_to_nptype.get(pic.mode, np.uint8), copy=True)) if pic.mode == '1': img = 255 * img img = img.view(pic.size[1], pic.size[0], len(pic.getbands())) return img.permute((2, 0, 1)).contiguous().to(self.dtype) # put it from HWC to CHW format except: pass if isinstance(it, np.ndarray): return torch.tensor(it).to(self.dtype) return torch.tensor(list(it)).to(self.dtype)
[docs]class toList(BaseCli): # this still exists cause some LLVM optimizations are done on this, and too tired to change that at the moment
[docs] def __init__(self): """Converts generator to list. Example:: # returns [0, 1, 2, 3, 4] range(5) | toList() # returns [0, 1, 2, 3, 4] range(5) | aS(list) So this cli is sort of outdated. It still works fine, nothing wrong with it, but just do ``aS(list)`` instead. It's not removed to avoid breaking old projects.""" super().__init__()
def _typehint(self, inp): if isinstance(inp, tListIterSet): return tList(inp.child) if isinstance(inp, tCollection): return inp return tList(tAny())
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: return list(it)
def _toRange(it): for i, _ in enumerate(it): yield i
[docs]class toRange(BaseCli):
[docs] def __init__(self): """Returns iter(range(len(it))), effectively. Example:: # returns [0, 1, 2] [3, 2, 5] | toRange() | deref()""" super().__init__()
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[int]: try: return range(len(it)) except: return _toRange(it)
tOpt.addPass(lambda cs, ts, _: [cs[0]], [toRange, toRange]) settings.add("arrayTypes", (torch.Tensor, np.ndarray), "default array types used to accelerate clis") def genericTypeHint(inp): if isinstance(inp, tListIterSet): return inp.child if isinstance(inp, tCollection): return inp.children[0] if isinstance(inp, tArrayTypes): return inp.child return tAny()
[docs]class toSum(BaseCli):
[docs] def __init__(self): """Calculates the sum of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: # returns 45 range(10) | toSum()""" super().__init__()
def _typehint(self, inp): return genericTypeHint(inp)
[docs] def __ror__(self, it:Iterator[float]): if isinstance(it, settings.arrayTypes): return it.sum() return sum(it)
[docs]class toProd(BaseCli):
[docs] def __init__(self): """Calculates the product of a list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: # returns 362880 range(1,10) | toProd()""" super().__init__()
def _typehint(self, inp): return genericTypeHint(inp)
[docs] def __ror__(self, it): if isinstance(it, settings.arrayTypes): return it.prod() else: return math.prod(it)
[docs]class toAvg(BaseCli):
[docs] def __init__(self): """Calculates average of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: # returns 4.5 range(10) | toAvg() # returns nan [] | toAvg()""" super().__init__()
def _typehint(self, inp): i = None if isinstance(inp, tListIterSet): i = inp.child if isinstance(inp, tCollection): i = inp.children[0] if isinstance(inp, tArrayTypes): i = inp.child if i is not None: return float if i == int else i return tAny()
[docs] def __ror__(self, it:Iterator[float]): if isinstance(it, settings.arrayTypes): return it.mean() s = 0; i = -1 for i, v in enumerate(it): s += v i += 1 if not k1lib.settings.cli.strict and i == 0: return float("nan") return s / i
toMean = toAvg
[docs]class toMax(BaseCli):
[docs] def __init__(self): """Calculates the max of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: # returns 6 [2, 5, 6, 1, 2] | toMax()""" super().__init__()
[docs] def __ror__(self, it:Iterator[float]) -> float: if isinstance(it, settings.arrayTypes): return it.max() return max(it)
[docs]class toMin(BaseCli):
[docs] def __init__(self): """Calculates the min of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`. Example:: # returns 1 [2, 5, 6, 1, 2] | toMin()""" super().__init__()
[docs] def __ror__(self, it:Iterator[float]) -> float: if isinstance(it, settings.arrayTypes): return it.min() return min(it)
settings.add("font", None, "default font file. Best to use .ttf files, used by toPIL()") settings.add("chem", k1lib.Settings().add("imgSize", 200, "default image size used in toPIL() when drawing rdkit molecules"), "chemistry-related settings") def cropToContentNp(ogIm, pad=10): dim = len(ogIm.shape); im = ogIm if dim > 2: im = im.mean(0) coords = np.argwhere(im.max()-im); x_min, y_min = coords.min(axis=0); x_max, y_max = coords.max(axis=0) return ogIm[x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad] if dim == 2 else ogIm[:,x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad] def cropToContentPIL(im, pad=0): im = im | toTensor(int) | cli.op().numpy() | cli.aS(cropToContentNp, pad) return torch.from_numpy(im).permute(1, 2, 0) | toImg() if len(im.shape) > 2 else im | toImg()
[docs]class toPIL(BaseCli):
[docs] def __init__(self, closeFig=True, crop=True): """Converts multiple data types into a PIL image. Example:: # grabs first image in the current folder ls(".") | toPIL().all() | item() # converts from tensor/array to image torch.randn(100, 200) | toPIL() # grabs image, converts to byte stream, and converts back to image "abc.jpg" | toPIL() | toBytes() | toPIL() # converts paragraphs to image ["abc", "def"] | toPIL() # converts SMILES string to molecule, then to image "c1ccc(C)cc1" | toMol() | toImg() You can also save a matplotlib figure by piping in a :class:`matplotlib.figure.Figure` object:: x = np.linspace(0, 4) plt.plot(x, x**2) plt.gcf() | toPIL() .. note:: If you are working with image tensors, which is typically have dimensions of (C, H, W), you have to permute it to PIL's (H, W, C) first before passing it into this cli. Also it's expected that your tensor image ranges from 0-255, and not 0-1. Make sure you renormalize it :param closeFig: if input is a matplotlib figure, then closes the figure after generating the image :param crop: whether to crop white spaces around an image or not""" import PIL; self.PIL = PIL; self.closeFig = closeFig; self.crop = crop
def _typehint(self, inp): return PIL.Image.Image
[docs] def __ror__(self, path) -> "PIL.Image.Image": if isinstance(path, str): return self.PIL.Image.open(os.path.expanduser(path)) if isinstance(path, bytes): return self.PIL.Image.open(io.BytesIO(path)) if isinstance(path, torch.Tensor): path = path.numpy() if isinstance(path, np.ndarray): return self.PIL.Image.fromarray(path.astype("uint8")) if isinstance(path, mpl.figure.Figure): canvas = path.canvas; canvas.draw() img = self.PIL.Image.frombytes('RGB', canvas.get_width_height(), canvas.tostring_rgb()) if self.closeFig: plt.close(path) return img | cli.aS(cropToContentPIL) if isinstance(path, graphviz.Digraph): import tempfile; a = tempfile.NamedTemporaryFile() path.render(a.name, format="jpeg"); fn = f"{a.name}.jpeg"; im = fn | toImg() try: os.remove(fn) except: pass return im if hasRdkit and isinstance(path, rdkit.Chem.rdchem.Mol): sz = settings.chem.imgSize return self.__ror__(rdkit.Chem.Draw.MolsToGridImage([path], subImgSize=[sz, sz]).data) | (cli.aS(cropToContentPIL) if self.crop else cli.iden()) path = path | cli.deref() if len(path) > 0 and isinstance(path[0], str): from PIL import ImageDraw h = path | cli.shape(0); w = path | cli.shape(0).all() | cli.aS(max) image = self.PIL.Image.new("L", ((w+1)*20, (h+1)*60), 255) font = PIL.ImageFont.truetype(settings.font, 18) if settings.font else None ImageDraw.Draw(image).text((20, 20), path | cli.join("\n"), 0, font=font) return np.array(image)/255 | (cli.aS(cropToContentNp) if self.crop else iden()) | cli.op()*255 | toImg() return NotImplemented
toImg = toPIL
[docs]class toRgb(BaseCli):
[docs] def __init__(self): """Converts greyscale/rgb PIL image to rgb image. Example:: # reads image file and converts it to rgb "a.png" | toPIL() | toRgb()""" import PIL; self.PIL = PIL
def _typehint(self, inp): return inp
[docs] def __ror__(self, i): if i.getbands() == ("R", "G", "B"): return i rgbI = self.PIL.Image.new("RGB", i.size) rgbI.paste(i); return rgbI
[docs]class toRgba(BaseCli):
[docs] def __init__(self): """Converts random PIL image to rgba image. Example:: # reads image file and converts it to rgba "a.png" | toPIL() | toRgba()""" import PIL; self.PIL = PIL
def _typehint(self, inp): return inp
[docs] def __ror__(self, i): if i.getbands() == ("R", "G", "B", "A"): return i rgbI = self.PIL.Image.new("RGBA", i.size) rgbI.paste(i); return rgbI
[docs]class toGray(BaseCli):
[docs] def __init__(self): """Converts random PIL image to a grayscale image. Example:: # reads image file and converts it to rgba "a.png" | toPIL() | toGray()""" import PIL; self.PIL = PIL
def _typehint(self, inp): return inp
[docs] def __ror__(self, i): if i.getbands() == ("L"): return i return self.PIL.ImageOps.grayscale(i)
[docs]class toDict(BaseCli):
[docs] def __init__(self, rows=True): """Converts 2 Iterators, 1 key, 1 value into a dictionary. Example:: # returns {1: 3, 2: 4} [[1, 3], [2, 4]] | toDict() # returns {1: 3, 2: 4} [[1, 2], [3, 4]] | toDict(False) If ``rows`` is a string, then it will build a dictionary from key-value pairs delimited by this character. For example:: ['gene_id "ENSG00000290825.1"', 'transcript_id "ENST00000456328.2"', 'gene_type "lncRNA"', 'gene_name "DDX11L2"', 'transcript_type "lncRNA"', 'transcript_name "DDX11L2-202"', 'level 2', 'transcript_support_level "1"', 'tag "basic"', 'tag "Ensembl_canonical"', 'havana_transcript "OTTHUMT00000362751.1"'] | toDict(" ") That returns:: {'gene_id': '"ENSG00000290825.1"', 'transcript_id': '"ENST00000456328.2"', 'gene_type': '"lncRNA"', 'gene_name': '"DDX11L2"', 'transcript_type': '"lncRNA"', 'transcript_name': '"DDX11L2-202"', 'level': '2', 'transcript_support_level': '"1"', 'tag': '"Ensembl_canonical"', 'havana_transcript': '"OTTHUMT00000362751.1"'} :params rows: if True, reads input in row by row, else reads in list of columns""" self.rows = rows
[docs] def __ror__(self, it:Tuple[Iterator[T], Iterator[T]]) -> dict: r = self.rows if r: if isinstance(r, str): return it | cli.apply(cli.aS(lambda x: x.split(" ")) | cli.head(1).split() | cli.item() + cli.join(" ")) | toDict() return {_k:_v for _k, _v in it} return {_k:_v for _k, _v in zip(*it)}
def _toop(toOp, c, force, defaultValue): return cli.apply(toOp, c) | (cli.apply(lambda x: x or defaultValue, c) if force else cli.filt(cli.op() != None, c)) def _toFloat(e) -> Union[float, None]: try: return float(e) except: return None
[docs]class toFloat(BaseCli):
[docs] def __init__(self, *columns, mode=2): """Converts every row into a float. Example:: # returns [1, 3, -2.3] ["1", "3", "-2.3"] | toFloat() | deref() # returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']] [["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | deref() With weird rows:: # returns [[1.0, 'a'], [8.0, 'c']] [["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0) | deref() # returns [[1.0, 'a'], [0.0, 'b'], [8.0, 'c']] [["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0, force=True) | deref() This also works well with :class:`torch.Tensor` and :class:`numpy.ndarray`, as they will not be broken up into an iterator:: # returns a numpy array, instead of an iterator np.array(range(10)) | toFloat() :param columns: if nothing, then will convert each row. If available, then convert all the specified columns :param mode: different conversion styles - 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed - 1: if there are errors, then replace it with zero - 2: if there are errors, then eliminate the row""" self.columns = columns; self.mode = mode;
[docs] def __ror__(self, it): columns = self.columns; mode = self.mode if len(columns) == 0: if isinstance(it, np.ndarray): return it.astype(float) if isinstance(it, torch.Tensor): return it.float() if mode == 0: return it | cli.apply(float) return it | _toop(_toFloat, None, mode == 1, 0.0) else: return it | cli.init.serial(*(_toop(_toFloat, c, mode == 1, 0.0) for c in columns))
def _toInt(e) -> Union[int, None]: try: return int(float(e)) except: return None
[docs]class toInt(BaseCli):
[docs] def __init__(self, *columns, mode=2): """Converts every row into an integer. Example:: # returns [1, 3, -2] ["1", "3", "-2.3"] | toInt() | deref() :param columns: if nothing, then will convert each row. If available, then convert all the specified columns :param mode: different conversion styles - 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed - 1: if there are errors, then replace it with zero - 2: if there are errors, then eliminate the row See also: :meth:`toFloat`""" self.columns = columns; self.mode = mode;
[docs] def __ror__(self, it): columns = self.columns; mode = self.mode if len(columns) == 0: if isinstance(it, np.ndarray): return it.astype(int) if isinstance(it, torch.Tensor): return it.int() if mode == 0: return it | cli.apply(int) return it | _toop(_toInt, None, mode == 1, 0.0) else: return it | cli.init.serial(*(_toop(_toInt, c, mode == 1, 0.0) for c in columns))
[docs]class toBytes(BaseCli):
[docs] def __init__(self, imgType="JPEG"): """Converts several object types to bytes. Example:: # converts string to bytes "abc" | toBytes() # converts image to base64 bytes torch.randn(200, 100) | toImg() | toBytes() :param imgType: if input is an image then this is the image type. Can change to "PNG" or sth like that""" self.imgType = imgType
[docs] def __ror__(self, it): if isinstance(it, str): return it.encode() if hasPIL: if isinstance(it, PIL.Image.Image): it = it | toRgb() buffered = io.BytesIO() it.save(buffered, format=self.imgType) return buffered.getvalue() import dill return dill.dumps(it)
[docs]class toHtml(BaseCli):
[docs] def __init__(self): """Converts several object types to bytes. Example:: # converts PIL image to html <img> tag torch.randn(200, 100) | toImg() | toHtml() """ pass
[docs] def __ror__(self, it): if hasPIL: if isinstance(it, PIL.Image.Image): it = it | toBytes(imgType="PNG") | cli.aS(base64.b64encode) | cli.op().decode() return f"<img src=\"data:image/png;base64, {it}\" />" if hasPlotly: if isinstance(it, plotly.graph_objs._figure.Figure): out = io.StringIO(); it.write_html(out); out.seek(0); return out.read() try: return it._repr_html_() except: return it.__repr__()
try: from rdkit import Chem from rdkit.Chem import Draw from rdkit.Chem import AllChem from rdkit.Chem.Draw import IPythonConsole IPythonConsole.drawOptions.addAtomIndices = True __all__ = [*__all__, "toMol", "toSmiles"] def toMol(): """Smiles to molecule. Example:: "c1ccc(C)cc1" | toMol()""" return cli.aS(Chem.MolFromSmiles) def toSmiles(): """Molecule to smiles. Example:: "c1ccc(C)cc1" | toMol() | toSmiles()""" return cli.aS(Chem.MolToSmiles) except: pass import unicodedata
[docs]def toAscii(): """Converts complex unicode text to its base ascii form. Example:: "hà nội" | toAscii() # returns "ha noi" Taken from https://stackoverflow.com/questions/2365411/convert-unicode-to-ascii-without-errors-in-python""" return cli.aS(lambda word: unicodedata.normalize('NFKD', word).encode('ascii', 'ignore'))