# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
This is for all short utilities that converts from 1 data type to another. They
might feel they have different styles, as :class:`toFloat` converts object iterator to
float iterator, while :class:`toPIL` converts single image url to single PIL image,
whereas :class:`toSum` converts float iterator into a single float value.
The general convention is, if the intended operation sounds simple (convert to floats,
strings, types, ...), then most likely it will convert iterator to iterator, as you
can always use the function directly if you only want to apply it on 1 object.
If it sounds complicated (convert to PIL image, tensor, ...) then most likely it will
convert object to object. Lastly, there are some that just feels right to input
an iterator and output a single object (like getting max, min, std, mean values)."""
__all__ = ["toTensor", "toRange", "toList",
"toSum", "toProd", "toAvg", "toMean", "toMax", "toMin", "toPIL", "toImg",
"toRgb", "toRgba", "toGray", "toDict",
"toFloat", "toInt", "toBytes", "toHtml", "toAscii"]
import re, k1lib, math, os, numpy as np, io, base64, unicodedata
from k1lib.cli.init import BaseCli, Table, Row, T, yieldT; import k1lib.cli as cli
from k1lib.cli.typehint import *; import matplotlib as mpl; import matplotlib.pyplot as plt
from collections import deque; from typing import Iterator, Any, List, Set, Tuple, Dict, Callable, Union
settings = k1lib.settings.cli
try: import PIL; hasPIL = True
except: hasPIL = False
try: import torch; hasTorch = True
except: torch = k1lib.Object().withAutoDeclare(lambda: type("RandomClass", (object, ), {})); hasTorch = False
try: import rdkit; hasRdkit = True
except: hasRdkit = False
try: import graphviz; hasGraphviz = True
except: hasGraphviz = False
try: import plotly; import plotly.express as px; hasPlotly = True
except: hasPlotly = False
[docs]class toTensor(BaseCli):
[docs] def __init__(self, dtype=torch.float32):
"""Converts generator to :class:`torch.Tensor`. Essentially
``torch.tensor(list(it))``.
Also checks if input is a PIL Image. If yes, turn it into a :class:`torch.Tensor`
and return."""
self.dtype = dtype
[docs] def __ror__(self, it:Iterator[float]) -> torch.Tensor:
try:
import PIL; pic=it
if isinstance(pic, PIL.Image.Image): # stolen from torchvision ToTensor transform
mode_to_nptype = {'I': np.int32, 'I;16': np.int16, 'F': np.float32}
img = torch.from_numpy(np.array(pic, mode_to_nptype.get(pic.mode, np.uint8), copy=True))
if pic.mode == '1': img = 255 * img
img = img.view(pic.size[1], pic.size[0], len(pic.getbands()))
return img.permute((2, 0, 1)).contiguous().to(self.dtype) # put it from HWC to CHW format
except: pass
if isinstance(it, np.ndarray): return torch.tensor(it).to(self.dtype)
return torch.tensor(list(it)).to(self.dtype)
[docs]class toList(BaseCli): # this still exists cause some LLVM optimizations are done on this, and too tired to change that at the moment
[docs] def __init__(self):
"""Converts generator to list.
Example::
# returns [0, 1, 2, 3, 4]
range(5) | toList()
# returns [0, 1, 2, 3, 4]
range(5) | aS(list)
So this cli is sort of outdated. It still works fine, nothing wrong
with it, but just do ``aS(list)`` instead. It's not removed to
avoid breaking old projects."""
super().__init__()
def _typehint(self, inp):
if isinstance(inp, tListIterSet): return tList(inp.child)
if isinstance(inp, tCollection): return inp
return tList(tAny())
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: return list(it)
def _toRange(it):
for i, _ in enumerate(it): yield i
[docs]class toRange(BaseCli):
[docs] def __init__(self):
"""Returns iter(range(len(it))), effectively.
Example::
# returns [0, 1, 2]
[3, 2, 5] | toRange() | deref()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[int]:
try: return range(len(it))
except: return _toRange(it)
tOpt.addPass(lambda cs, ts, _: [cs[0]], [toRange, toRange])
settings.add("arrayTypes", (torch.Tensor, np.ndarray), "default array types used to accelerate clis")
def genericTypeHint(inp):
if isinstance(inp, tListIterSet): return inp.child
if isinstance(inp, tCollection): return inp.children[0]
if isinstance(inp, tArrayTypes): return inp.child
return tAny()
[docs]class toSum(BaseCli):
[docs] def __init__(self):
"""Calculates the sum of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 45
range(10) | toSum()"""
super().__init__()
def _typehint(self, inp): return genericTypeHint(inp)
[docs] def __ror__(self, it:Iterator[float]):
if isinstance(it, settings.arrayTypes): return it.sum()
return sum(it)
[docs]class toProd(BaseCli):
[docs] def __init__(self):
"""Calculates the product of a list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 362880
range(1,10) | toProd()"""
super().__init__()
def _typehint(self, inp): return genericTypeHint(inp)
[docs] def __ror__(self, it):
if isinstance(it, settings.arrayTypes): return it.prod()
else: return math.prod(it)
[docs]class toAvg(BaseCli):
[docs] def __init__(self):
"""Calculates average of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 4.5
range(10) | toAvg()
# returns nan
[] | toAvg()"""
super().__init__()
def _typehint(self, inp):
i = None
if isinstance(inp, tListIterSet): i = inp.child
if isinstance(inp, tCollection): i = inp.children[0]
if isinstance(inp, tArrayTypes): i = inp.child
if i is not None: return float if i == int else i
return tAny()
[docs] def __ror__(self, it:Iterator[float]):
if isinstance(it, settings.arrayTypes): return it.mean()
s = 0; i = -1
for i, v in enumerate(it): s += v
i += 1
if not k1lib.settings.cli.strict and i == 0: return float("nan")
return s / i
toMean = toAvg
[docs]class toMax(BaseCli):
[docs] def __init__(self):
"""Calculates the max of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 6
[2, 5, 6, 1, 2] | toMax()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[float]) -> float:
if isinstance(it, settings.arrayTypes): return it.max()
return max(it)
[docs]class toMin(BaseCli):
[docs] def __init__(self):
"""Calculates the min of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 1
[2, 5, 6, 1, 2] | toMin()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[float]) -> float:
if isinstance(it, settings.arrayTypes): return it.min()
return min(it)
settings.add("font", None, "default font file. Best to use .ttf files, used by toPIL()")
settings.add("chem", k1lib.Settings().add("imgSize", 200, "default image size used in toPIL() when drawing rdkit molecules"), "chemistry-related settings")
def cropToContentNp(ogIm, pad=10):
dim = len(ogIm.shape); im = ogIm
if dim > 2: im = im.mean(0)
coords = np.argwhere(im.max()-im); x_min, y_min = coords.min(axis=0); x_max, y_max = coords.max(axis=0)
return ogIm[x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad] if dim == 2 else ogIm[:,x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad]
def cropToContentPIL(im, pad=0):
im = im | toTensor(int) | cli.op().numpy() | cli.aS(cropToContentNp, pad)
return torch.from_numpy(im).permute(1, 2, 0) | toImg() if len(im.shape) > 2 else im | toImg()
[docs]class toPIL(BaseCli):
[docs] def __init__(self, closeFig=True, crop=True):
"""Converts multiple data types into a PIL image.
Example::
# grabs first image in the current folder
ls(".") | toPIL().all() | item()
# converts from tensor/array to image
torch.randn(100, 200) | toPIL()
# grabs image, converts to byte stream, and converts back to image
"abc.jpg" | toPIL() | toBytes() | toPIL()
# converts paragraphs to image
["abc", "def"] | toPIL()
# converts SMILES string to molecule, then to image
"c1ccc(C)cc1" | toMol() | toImg()
You can also save a matplotlib figure by piping in a :class:`matplotlib.figure.Figure` object::
x = np.linspace(0, 4)
plt.plot(x, x**2)
plt.gcf() | toPIL()
.. note::
If you are working with image tensors, which is typically have
dimensions of (C, H, W), you have to permute it to PIL's (H, W, C)
first before passing it into this cli.
Also it's expected that
your tensor image ranges from 0-255, and not 0-1. Make sure you
renormalize it
:param closeFig: if input is a matplotlib figure, then closes the figure after generating the image
:param crop: whether to crop white spaces around an image or not"""
import PIL; self.PIL = PIL; self.closeFig = closeFig; self.crop = crop
def _typehint(self, inp):
return PIL.Image.Image
[docs] def __ror__(self, path) -> "PIL.Image.Image":
if isinstance(path, str):
return self.PIL.Image.open(os.path.expanduser(path))
if isinstance(path, bytes):
return self.PIL.Image.open(io.BytesIO(path))
if isinstance(path, torch.Tensor): path = path.numpy()
if isinstance(path, np.ndarray):
return self.PIL.Image.fromarray(path.astype("uint8"))
if isinstance(path, mpl.figure.Figure):
canvas = path.canvas; canvas.draw()
img = self.PIL.Image.frombytes('RGB', canvas.get_width_height(), canvas.tostring_rgb())
if self.closeFig: plt.close(path)
return img | cli.aS(cropToContentPIL)
if isinstance(path, graphviz.Digraph):
import tempfile; a = tempfile.NamedTemporaryFile()
path.render(a.name, format="jpeg");
fn = f"{a.name}.jpeg"; im = fn | toImg()
try: os.remove(fn)
except: pass
return im
if hasRdkit and isinstance(path, rdkit.Chem.rdchem.Mol):
sz = settings.chem.imgSize
return self.__ror__(rdkit.Chem.Draw.MolsToGridImage([path], subImgSize=[sz, sz]).data) | (cli.aS(cropToContentPIL) if self.crop else cli.iden())
path = path | cli.deref()
if len(path) > 0 and isinstance(path[0], str):
from PIL import ImageDraw
h = path | cli.shape(0); w = path | cli.shape(0).all() | cli.aS(max)
image = self.PIL.Image.new("L", ((w+1)*20, (h+1)*60), 255)
font = PIL.ImageFont.truetype(settings.font, 18) if settings.font else None
ImageDraw.Draw(image).text((20, 20), path | cli.join("\n"), 0, font=font)
return np.array(image)/255 | (cli.aS(cropToContentNp) if self.crop else iden()) | cli.op()*255 | toImg()
return NotImplemented
toImg = toPIL
[docs]class toRgb(BaseCli):
[docs] def __init__(self):
"""Converts greyscale/rgb PIL image to rgb image.
Example::
# reads image file and converts it to rgb
"a.png" | toPIL() | toRgb()"""
import PIL; self.PIL = PIL
def _typehint(self, inp): return inp
[docs] def __ror__(self, i):
if i.getbands() == ("R", "G", "B"): return i
rgbI = self.PIL.Image.new("RGB", i.size)
rgbI.paste(i); return rgbI
[docs]class toRgba(BaseCli):
[docs] def __init__(self):
"""Converts random PIL image to rgba image.
Example::
# reads image file and converts it to rgba
"a.png" | toPIL() | toRgba()"""
import PIL; self.PIL = PIL
def _typehint(self, inp): return inp
[docs] def __ror__(self, i):
if i.getbands() == ("R", "G", "B", "A"): return i
rgbI = self.PIL.Image.new("RGBA", i.size)
rgbI.paste(i); return rgbI
[docs]class toGray(BaseCli):
[docs] def __init__(self):
"""Converts random PIL image to a grayscale image.
Example::
# reads image file and converts it to rgba
"a.png" | toPIL() | toGray()"""
import PIL; self.PIL = PIL
def _typehint(self, inp): return inp
[docs] def __ror__(self, i):
if i.getbands() == ("L"): return i
return self.PIL.ImageOps.grayscale(i)
[docs]class toDict(BaseCli):
[docs] def __init__(self, rows=True):
"""Converts 2 Iterators, 1 key, 1 value into a dictionary.
Example::
# returns {1: 3, 2: 4}
[[1, 3], [2, 4]] | toDict()
# returns {1: 3, 2: 4}
[[1, 2], [3, 4]] | toDict(False)
If ``rows`` is a string, then it will build a dictionary from key-value
pairs delimited by this character. For example::
['gene_id "ENSG00000290825.1"',
'transcript_id "ENST00000456328.2"',
'gene_type "lncRNA"',
'gene_name "DDX11L2"',
'transcript_type "lncRNA"',
'transcript_name "DDX11L2-202"',
'level 2',
'transcript_support_level "1"',
'tag "basic"',
'tag "Ensembl_canonical"',
'havana_transcript "OTTHUMT00000362751.1"'] | toDict(" ")
That returns::
{'gene_id': '"ENSG00000290825.1"',
'transcript_id': '"ENST00000456328.2"',
'gene_type': '"lncRNA"',
'gene_name': '"DDX11L2"',
'transcript_type': '"lncRNA"',
'transcript_name': '"DDX11L2-202"',
'level': '2',
'transcript_support_level': '"1"',
'tag': '"Ensembl_canonical"',
'havana_transcript': '"OTTHUMT00000362751.1"'}
:params rows: if True, reads input in row by row, else reads
in list of columns"""
self.rows = rows
[docs] def __ror__(self, it:Tuple[Iterator[T], Iterator[T]]) -> dict:
r = self.rows
if r:
if isinstance(r, str): return it | cli.apply(cli.aS(lambda x: x.split(" ")) | cli.head(1).split() | cli.item() + cli.join(" ")) | toDict()
return {_k:_v for _k, _v in it}
return {_k:_v for _k, _v in zip(*it)}
def _toop(toOp, c, force, defaultValue):
return cli.apply(toOp, c) | (cli.apply(lambda x: x or defaultValue, c) if force else cli.filt(cli.op() != None, c))
def _toFloat(e) -> Union[float, None]:
try: return float(e)
except: return None
[docs]class toFloat(BaseCli):
[docs] def __init__(self, *columns, mode=2):
"""Converts every row into a float. Example::
# returns [1, 3, -2.3]
["1", "3", "-2.3"] | toFloat() | deref()
# returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']]
[["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | deref()
With weird rows::
# returns [[1.0, 'a'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0) | deref()
# returns [[1.0, 'a'], [0.0, 'b'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0, force=True) | deref()
This also works well with :class:`torch.Tensor` and :class:`numpy.ndarray`,
as they will not be broken up into an iterator::
# returns a numpy array, instead of an iterator
np.array(range(10)) | toFloat()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param mode: different conversion styles
- 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed
- 1: if there are errors, then replace it with zero
- 2: if there are errors, then eliminate the row"""
self.columns = columns; self.mode = mode;
[docs] def __ror__(self, it):
columns = self.columns; mode = self.mode
if len(columns) == 0:
if isinstance(it, np.ndarray): return it.astype(float)
if isinstance(it, torch.Tensor): return it.float()
if mode == 0: return it | cli.apply(float)
return it | _toop(_toFloat, None, mode == 1, 0.0)
else: return it | cli.init.serial(*(_toop(_toFloat, c, mode == 1, 0.0) for c in columns))
def _toInt(e) -> Union[int, None]:
try: return int(float(e))
except: return None
[docs]class toInt(BaseCli):
[docs] def __init__(self, *columns, mode=2):
"""Converts every row into an integer. Example::
# returns [1, 3, -2]
["1", "3", "-2.3"] | toInt() | deref()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param mode: different conversion styles
- 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed
- 1: if there are errors, then replace it with zero
- 2: if there are errors, then eliminate the row
See also: :meth:`toFloat`"""
self.columns = columns; self.mode = mode;
[docs] def __ror__(self, it):
columns = self.columns; mode = self.mode
if len(columns) == 0:
if isinstance(it, np.ndarray): return it.astype(int)
if isinstance(it, torch.Tensor): return it.int()
if mode == 0: return it | cli.apply(int)
return it | _toop(_toInt, None, mode == 1, 0.0)
else: return it | cli.init.serial(*(_toop(_toInt, c, mode == 1, 0.0) for c in columns))
[docs]class toBytes(BaseCli):
[docs] def __init__(self, imgType="JPEG"):
"""Converts several object types to bytes.
Example::
# converts string to bytes
"abc" | toBytes()
# converts image to base64 bytes
torch.randn(200, 100) | toImg() | toBytes()
:param imgType: if input is an image then this is the image type. Can
change to "PNG" or sth like that"""
self.imgType = imgType
[docs] def __ror__(self, it):
if isinstance(it, str): return it.encode()
if hasPIL:
if isinstance(it, PIL.Image.Image):
it = it | toRgb()
buffered = io.BytesIO()
it.save(buffered, format=self.imgType)
return buffered.getvalue()
import dill
return dill.dumps(it)
[docs]class toHtml(BaseCli):
[docs] def __init__(self):
"""Converts several object types to bytes.
Example::
# converts PIL image to html <img> tag
torch.randn(200, 100) | toImg() | toHtml()
"""
pass
[docs] def __ror__(self, it):
if hasPIL:
if isinstance(it, PIL.Image.Image):
it = it | toBytes(imgType="PNG") | cli.aS(base64.b64encode) | cli.op().decode()
return f"<img src=\"data:image/png;base64, {it}\" />"
if hasPlotly:
if isinstance(it, plotly.graph_objs._figure.Figure):
out = io.StringIO(); it.write_html(out); out.seek(0); return out.read()
try: return it._repr_html_()
except: return it.__repr__()
try:
from rdkit import Chem
from rdkit.Chem import Draw
from rdkit.Chem import AllChem
from rdkit.Chem.Draw import IPythonConsole
IPythonConsole.drawOptions.addAtomIndices = True
__all__ = [*__all__, "toMol", "toSmiles"]
def toMol():
"""Smiles to molecule.
Example::
"c1ccc(C)cc1" | toMol()"""
return cli.aS(Chem.MolFromSmiles)
def toSmiles():
"""Molecule to smiles.
Example::
"c1ccc(C)cc1" | toMol() | toSmiles()"""
return cli.aS(Chem.MolToSmiles)
except: pass
import unicodedata
[docs]def toAscii():
"""Converts complex unicode text to its base ascii form.
Example::
"hà nội" | toAscii() # returns "ha noi"
Taken from https://stackoverflow.com/questions/2365411/convert-unicode-to-ascii-without-errors-in-python"""
return cli.aS(lambda word: unicodedata.normalize('NFKD', word).encode('ascii', 'ignore'))