# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for all short utilities that converts from 1 data type to another. They
might feel they have different styles, as :class:`toFloat` converts object iterator to
float iterator, while :class:`toPIL` converts single image url to single PIL image,
whereas :class:`toSum` converts float iterator into a single float value.
The general convention is, if the intended operation sounds simple (convert to floats,
strings, types, ...), then most likely it will convert iterator to iterator, as you
can always use the function directly if you only want to apply it on 1 object.
If it sounds complicated (convert to PIL image, tensor, ...) then most likely it will
convert object to object. Lastly, there are some that just feels right to input
an iterator and output a single object (like getting max, min, std, mean values)."""
__all__ = ["toTensor", "toRange", "toList",
"toSum", "toProd", "toAvg", "toMean", "toStd", "toMax", "toMin", "toPIL", "toImg",
"toRgb", "toRgba", "toGray", "toDict",
"toFloat", "toInt", "toBytes", "toHtml", "toAscii", "toHash", "toCsv"]
import re, k1lib, math, os, numpy as np, io, base64, unicodedata
from k1lib.cli.init import BaseCli, T, yieldT; import k1lib.cli as cli
from k1lib.cli.typehint import *; import matplotlib as mpl; import matplotlib.pyplot as plt
from collections import deque, defaultdict; from typing import Iterator, Any, List, Set, Tuple, Dict, Callable, Union
settings = k1lib.settings.cli
try: import PIL; hasPIL = True
except: hasPIL = False
try: import torch; hasTorch = True
except: torch = k1lib.dep("torch"); hasTorch = False
try: import rdkit; hasRdkit = True
except: hasRdkit = False
try: import graphviz; hasGraphviz = True
except: hasGraphviz = False
try: import plotly; import plotly.express as px; hasPlotly = True
except: hasPlotly = False
[docs]class toTensor(BaseCli): # toTensor
[docs] def __init__(self, dtype=None): # toTensor
"""Converts generator to :class:`torch.Tensor`. Essentially
``torch.tensor(list(it))``. Default dtype is float32
Also checks if input is a PIL Image. If yes, turn it into a :class:`torch.Tensor`
and return.""" # toTensor
self.dtype = dtype or torch.float32 # toTensor
[docs] def __ror__(self, it:Iterator[float]) -> "torch.Tensor": # toTensor
try: # toTensor
import PIL; pic=it # toTensor
if isinstance(pic, PIL.Image.Image): # stolen from torchvision ToTensor transform # toTensor
mode_to_nptype = {'I': np.int32, 'I;16': np.int16, 'F': np.float32} # toTensor
img = torch.from_numpy(np.array(pic, mode_to_nptype.get(pic.mode, np.uint8), copy=True)) # toTensor
if pic.mode == '1': img = 255 * img # toTensor
img = img.view(pic.size[1], pic.size[0], len(pic.getbands())) # toTensor
return img.permute((2, 0, 1)).contiguous().to(self.dtype) # put it from HWC to CHW format # toTensor
except: pass # toTensor
if isinstance(it, np.ndarray): return torch.tensor(it).to(self.dtype) # toTensor
return torch.tensor(list(it)).to(self.dtype) # toTensor
[docs]class toList(BaseCli): # this still exists cause some LLVM optimizations are done on this, and too tired to change that at the moment # toList
[docs] def __init__(self): # toList
"""Converts generator to list.
Example::
# returns [0, 1, 2, 3, 4]
range(5) | toList()
# returns [0, 1, 2, 3, 4]
range(5) | aS(list)
So this cli is sort of outdated. It still works fine, nothing wrong
with it, but just do ``aS(list)`` instead. It's not removed to
avoid breaking old projects.""" # toList
super().__init__() # toList
# toList
def _typehint(self, inp): # toList
if isinstance(inp, tListIterSet): return tList(inp.child) # toList
if isinstance(inp, tCollection): return inp # toList
return tList(tAny()) # toList
[docs] def __ror__(self, it:Iterator[Any]) -> List[Any]: return list(it) # toList
def _toRange(it): # _toRange
for i, _ in enumerate(it): yield i # _toRange
[docs]class toRange(BaseCli): # toRange
[docs] def __init__(self): # toRange
"""Returns iter(range(len(it))), effectively.
Example::
# returns [0, 1, 2]
[3, 2, 5] | toRange() | deref()""" # toRange
super().__init__() # toRange
[docs] def __ror__(self, it:Iterator[Any]) -> Iterator[int]: # toRange
try: return range(len(it)) # toRange
except: return _toRange(it) # toRange
tOpt.addPass(lambda cs, ts, _: [cs[0]], [toRange, toRange]) # toRange
settings.add("arrayTypes", (torch.Tensor, np.ndarray) if hasTorch else (np.ndarray,), "default array types used to accelerate clis") # toRange
def genericTypeHint(inp): # genericTypeHint
if isinstance(inp, tListIterSet): return inp.child # genericTypeHint
if isinstance(inp, tCollection): return inp.children[0] # genericTypeHint
if isinstance(inp, tArrayTypes): return inp.child # genericTypeHint
return tAny() # genericTypeHint
[docs]class toSum(BaseCli): # toSum
[docs] def __init__(self): # toSum
"""Calculates the sum of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 45
range(10) | toSum()""" # toSum
super().__init__() # toSum
def _all_array_opt(self, it, level): # toSum
bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None) # toSum
return NotImplemented if bm is None else bm.sum(it, tuple(range(level, len(it.shape)))) # toSum
def _typehint(self, inp): return genericTypeHint(inp) # toSum
[docs] def __ror__(self, it:Iterator[float]): # toSum
if isinstance(it, settings.arrayTypes): return it.sum() # toSum
return sum(it) # toSum
[docs]class toProd(BaseCli): # toProd
[docs] def __init__(self): # toProd
"""Calculates the product of a list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 362880
range(1,10) | toProd()""" # toProd
super().__init__() # toProd
def _all_array_opt(self, it, level): # toProd
if isinstance(it, np.ndarray): return np.prod(it, tuple(range(level, len(it.shape)))) # toProd
elif hasTorch and isinstance(it, torch.Tensor): # toProd
for i in range(level, len(it.shape)): it = torch.prod(it, level) # toProd
return it # toProd
return NotImplemented # toProd
def _typehint(self, inp): return genericTypeHint(inp) # toProd
[docs] def __ror__(self, it): # toProd
if isinstance(it, settings.arrayTypes): return it.prod() # toProd
else: return math.prod(it) # toProd
[docs]class toAvg(BaseCli): # toAvg
[docs] def __init__(self): # toAvg
"""Calculates average of list of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 4.5
range(10) | toAvg()
# returns nan
[] | toAvg()""" # toAvg
super().__init__() # toAvg
def _all_array_opt(self, it, level): # toAvg
bm = np if isinstance(it, np.ndarray) else (torch if hasTorch and isinstance(it, torch.Tensor) else None) # toAvg
return NotImplemented if bm is None else bm.mean(it, tuple(range(level, len(it.shape)))) # toAvg
def _typehint(self, inp): # toAvg
i = None # toAvg
if isinstance(inp, tListIterSet): i = inp.child # toAvg
if isinstance(inp, tCollection): i = inp.children[0] # toAvg
if isinstance(inp, tArrayTypes): i = inp.child # toAvg
if i is not None: return float if i == int else i # toAvg
return tAny() # toAvg
[docs] def __ror__(self, it:Iterator[float]): # toAvg
if isinstance(it, settings.arrayTypes): return it.mean() # toAvg
s = 0; i = -1 # toAvg
for i, v in enumerate(it): s += v # toAvg
i += 1 # toAvg
if not k1lib.settings.cli.strict and i == 0: return float("nan") # toAvg
return s / i # toAvg
if hasTorch: # toAvg
torchVer = int(torch.__version__.split(".")[0]) # toAvg
if torchVer >= 2: # toAvg
def torchStd(it, ddof, dim=None): return torch.std(it, dim, correction=ddof) # toAvg
else: # toAvg
def torchStd(it, ddof, dim=None): # toAvg
if ddof == 0: return torch.std(it, dim, unbiased=False) # toAvg
if ddof == 1: return torch.std(it, dim, unbiased=True) # toAvg
raise Exception(f"Please install PyTorch 2, as version 1 don't support correction factor of {ddof}") # toAvg
else: # toAvg
def torchStd(it, ddof): raise Exception("PyTorch not installed") # toAvg
[docs]class toStd(BaseCli): # toStd
[docs] def __init__(self, ddof:int=0): # toStd
"""Calculates standard deviation of list of numbers. Can pipe in :class:`torch.Tensor`
or :class:`numpy.ndarray` to be faster. Example::
# returns 2.8722813232690143
range(10) | toStd()
# returns nan
[] | toStd()
:param ddof: "delta degree of freedom". The divisor used in calculations is ``N - ddof``""" # toStd
self.ddof = ddof # toStd
def _all_array_opt(self, it, level): # toStd
n = len(it.shape); ddof = self.ddof; dim = tuple(range(level, n)) # toStd
if isinstance(it, np.ndarray): return np.std(it, ddof=ddof, axis=dim) # toStd
elif hasTorch and isinstance(it, torch.Tensor): return torchStd(it, ddof, dim) # toStd
return NotImplemented # toStd
[docs] def __ror__(self, it): # toStd
ddof = self.ddof # toStd
if isinstance(it, settings.arrayTypes): # toStd
if isinstance(it, np.ndarray): return np.std(it, ddof=ddof) # toStd
elif hasTorch and isinstance(it, torch.Tensor): return torchStd(it, ddof) # toStd
return np.std(np.array(list(it))) # toStd
toMean = toAvg # toStd
[docs]class toMax(BaseCli): # toMax
[docs] def __init__(self): # toMax
"""Calculates the max of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 6
[2, 5, 6, 1, 2] | toMax()""" # toMax
super().__init__() # toMax
def _all_array_opt(self, it, level): # toMax
if isinstance(it, np.ndarray): return np.max(it, tuple(range(level, len(it.shape)))) # toMax
elif hasTorch and isinstance(it, torch.Tensor): # toMax
for i in range(level, len(it.shape)): it = torch.max(it, level)[0] # toMax
return it # toMax
return NotImplemented # toMax
[docs] def __ror__(self, it:Iterator[float]) -> float: # toMax
if isinstance(it, settings.arrayTypes): return it.max() # toMax
return max(it) # toMax
[docs]class toMin(BaseCli): # toMin
[docs] def __init__(self): # toMin
"""Calculates the min of a bunch of numbers. Can pipe in :class:`torch.Tensor` or :class:`numpy.ndarray`.
Example::
# returns 1
[2, 5, 6, 1, 2] | toMin()""" # toMin
super().__init__() # toMin
def _all_array_opt(self, it, level): # toMin
if isinstance(it, np.ndarray): return np.min(it, tuple(range(level, len(it.shape)))) # toMin
elif hasTorch and isinstance(it, torch.Tensor): # toMin
for i in range(level, len(it.shape)): it = torch.min(it, level)[0] # toMin
return it # toMin
return NotImplemented # toMin
[docs] def __ror__(self, it:Iterator[float]) -> float: # toMin
if isinstance(it, settings.arrayTypes): return it.min() # toMin
return min(it) # toMin
settings.add("font", None, "default font file. Best to use .ttf files, used by toPIL()") # toMin
settings.add("chem", k1lib.Settings().add("imgSize", 200, "default image size used in toPIL() when drawing rdkit molecules"), "chemistry-related settings") # toMin
def cropToContentNp(ogIm, pad=10): # cropToContentNp
dim = len(ogIm.shape); im = ogIm # cropToContentNp
if dim > 2: im = im.mean(0) # cropToContentNp
coords = np.argwhere(im.max()-im); x_min, y_min = coords.min(axis=0); x_max, y_max = coords.max(axis=0) # cropToContentNp
return ogIm[x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad] if dim == 2 else ogIm[:,x_min-pad:x_max+1+pad, y_min-pad:y_max+1+pad] # cropToContentNp
def cropToContentPIL(im, pad=0): # cropToContentPIL
im = im | toTensor(int) | cli.op().numpy() | cli.aS(cropToContentNp, pad) # cropToContentPIL
return torch.from_numpy(im).permute(1, 2, 0) | toImg() if len(im.shape) > 2 else im | toImg() # cropToContentPIL
[docs]class toPIL(BaseCli): # toPIL
[docs] def __init__(self, closeFig=True, crop=True): # toPIL
"""Converts multiple data types into a PIL image.
Example::
# grabs first image in the current folder
ls(".") | toPIL().all() | item()
# converts from tensor/array to image
torch.randn(100, 200) | toPIL()
# grabs image, converts to byte stream, and converts back to image
"abc.jpg" | toPIL() | toBytes() | toPIL()
# converts paragraphs to image
["abc", "def"] | toPIL()
# converts SMILES string to molecule, then to image
"c1ccc(C)cc1" | toMol() | toImg()
You can also save a matplotlib figure by piping in a :class:`matplotlib.figure.Figure` object::
x = np.linspace(0, 4)
plt.plot(x, x**2)
plt.gcf() | toPIL()
.. note::
If you are working with image tensors, which is typically have
dimensions of (C, H, W), you have to permute it to PIL's (H, W, C)
first before passing it into this cli.
Also it's expected that
your tensor image ranges from 0-255, and not 0-1. Make sure you
renormalize it
:param closeFig: if input is a matplotlib figure, then closes the figure after generating the image
:param crop: whether to crop white spaces around an image or not""" # toPIL
import PIL; self.PIL = PIL; self.closeFig = closeFig; self.crop = crop # toPIL
def _typehint(self, inp): # toPIL
return PIL.Image.Image # toPIL
[docs] def __ror__(self, path) -> "PIL.Image.Image": # toPIL
if isinstance(path, str): # toPIL
return self.PIL.Image.open(os.path.expanduser(path)) # toPIL
if isinstance(path, bytes): # toPIL
return self.PIL.Image.open(io.BytesIO(path)) # toPIL
if isinstance(path, torch.Tensor): path = path.numpy() # toPIL
if isinstance(path, np.ndarray): # toPIL
return self.PIL.Image.fromarray(path.astype("uint8")) # toPIL
if isinstance(path, mpl.figure.Figure): # toPIL
canvas = path.canvas; canvas.draw() # toPIL
img = self.PIL.Image.frombytes('RGB', canvas.get_width_height(), canvas.tostring_rgb()) # toPIL
if self.closeFig: plt.close(path) # toPIL
return img | cli.aS(cropToContentPIL) # toPIL
if isinstance(path, graphviz.Digraph): # toPIL
import tempfile; a = tempfile.NamedTemporaryFile() # toPIL
path.render(a.name, format="jpeg"); # toPIL
fn = f"{a.name}.jpeg"; im = fn | toImg() # toPIL
try: os.remove(fn) # toPIL
except: pass # toPIL
return im # toPIL
if hasRdkit and isinstance(path, rdkit.Chem.rdchem.Mol): # toPIL
sz = settings.chem.imgSize # toPIL
return self.__ror__(rdkit.Chem.Draw.MolsToGridImage([path], subImgSize=[sz, sz]).data) | (cli.aS(cropToContentPIL) if self.crop else cli.iden()) # toPIL
path = path | cli.deref() # toPIL
if len(path) > 0 and isinstance(path[0], str): # toPIL
from PIL import ImageDraw # toPIL
h = path | cli.shape(0); w = path | cli.shape(0).all() | cli.aS(max) # toPIL
image = self.PIL.Image.new("L", ((w+1)*20, (h+1)*60), 255) # toPIL
font = PIL.ImageFont.truetype(settings.font, 18) if settings.font else None # toPIL
ImageDraw.Draw(image).text((20, 20), path | cli.join("\n"), 0, font=font) # toPIL
return np.array(image)/255 | (cli.aS(cropToContentNp) if self.crop else iden()) | cli.op()*255 | toImg() # toPIL
return NotImplemented # toPIL
toImg = toPIL # toPIL
[docs]class toRgb(BaseCli): # toRgb
[docs] def __init__(self): # toRgb
"""Converts greyscale/rgb PIL image to rgb image.
Example::
# reads image file and converts it to rgb
"a.png" | toPIL() | toRgb()""" # toRgb
import PIL; self.PIL = PIL # toRgb
def _typehint(self, inp): return inp # toRgb
[docs] def __ror__(self, i): # toRgb
if i.getbands() == ("R", "G", "B"): return i # toRgb
rgbI = self.PIL.Image.new("RGB", i.size) # toRgb
rgbI.paste(i); return rgbI # toRgb
[docs]class toRgba(BaseCli): # toRgba
[docs] def __init__(self): # toRgba
"""Converts random PIL image to rgba image.
Example::
# reads image file and converts it to rgba
"a.png" | toPIL() | toRgba()""" # toRgba
import PIL; self.PIL = PIL # toRgba
def _typehint(self, inp): return inp # toRgba
[docs] def __ror__(self, i): # toRgba
if i.getbands() == ("R", "G", "B", "A"): return i # toRgba
rgbI = self.PIL.Image.new("RGBA", i.size) # toRgba
rgbI.paste(i); return rgbI # toRgba
[docs]class toGray(BaseCli): # toGray
[docs] def __init__(self): # toGray
"""Converts random PIL image to a grayscale image.
Example::
# reads image file and converts it to rgba
"a.png" | toPIL() | toGray()""" # toGray
import PIL; self.PIL = PIL # toGray
def _typehint(self, inp): return inp # toGray
[docs] def __ror__(self, i): # toGray
if i.getbands() == ("L"): return i # toGray
return self.PIL.ImageOps.grayscale(i) # toGray
[docs]class toDict(BaseCli): # toDict
[docs] def __init__(self, rows=True, f=None): # toDict
"""Converts 2 Iterators, 1 key, 1 value into a dictionary.
Example::
# returns {1: 3, 2: 4}
[[1, 3], [2, 4]] | toDict()
# returns {1: 3, 2: 4}
[[1, 2], [3, 4]] | toDict(False)
If ``rows`` is a string, then it will build a dictionary from key-value
pairs delimited by this character. For example::
['gene_id "ENSG00000290825.1"',
'transcript_id "ENST00000456328.2"',
'gene_type "lncRNA"',
'gene_name "DDX11L2"',
'transcript_type "lncRNA"',
'transcript_name "DDX11L2-202"',
'level 2',
'transcript_support_level "1"',
'tag "basic"',
'tag "Ensembl_canonical"',
'havana_transcript "OTTHUMT00000362751.1"'] | toDict(" ")
That returns::
{'gene_id': '"ENSG00000290825.1"',
'transcript_id': '"ENST00000456328.2"',
'gene_type': '"lncRNA"',
'gene_name': '"DDX11L2"',
'transcript_type': '"lncRNA"',
'transcript_name': '"DDX11L2-202"',
'level': '2',
'transcript_support_level': '"1"',
'tag': '"Ensembl_canonical"',
'havana_transcript': '"OTTHUMT00000362751.1"'}
:param rows: if True, reads input in row by row, else reads
in list of columns
:param f: if specified, return a defaultdict that uses this function as its generator""" # toDict
self.rows = rows # toDict
if f is not None: self.f = lambda d: defaultdict(f, d) # toDict
else: self.f = lambda x: x # toDict
[docs] def __ror__(self, it:Tuple[Iterator[T], Iterator[T]]) -> dict: # toDict
r = self.rows; f = self.f # toDict
if r: # toDict
if isinstance(r, str): return it | cli.apply(cli.aS(lambda x: x.split(" ")) | cli.head(1).split() | cli.item() + cli.join(" ")) | toDict() # toDict
return f({_k:_v for _k, _v in it}) # toDict
return f({_k:_v for _k, _v in zip(*it)}) # toDict
def _toop(toOp, c, force, defaultValue): # _toop
return cli.apply(toOp, c) | (cli.apply(lambda x: x or defaultValue, c) if force else cli.filt(cli.op() != None, c)) # _toop
def _toFloat(e) -> Union[float, None]: # _toFloat
try: return float(e) # _toFloat
except: return None # _toFloat
[docs]class toFloat(BaseCli): # toFloat
[docs] def __init__(self, *columns, mode=2): # toFloat
"""Converts every row into a float. Example::
# returns [1, 3, -2.3]
["1", "3", "-2.3"] | toFloat() | deref()
# returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']]
[["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | deref()
With weird rows::
# returns [[1.0, 'a'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0) | deref()
# returns [[1.0, 'a'], [0.0, 'b'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0, force=True) | deref()
This also works well with :class:`torch.Tensor` and :class:`numpy.ndarray`,
as they will not be broken up into an iterator::
# returns a numpy array, instead of an iterator
np.array(range(10)) | toFloat()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param mode: different conversion styles
- 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed
- 1: if there are errors, then replace it with zero
- 2: if there are errors, then eliminate the row""" # toFloat
self.columns = columns; self.mode = mode; # toFloat
[docs] def __ror__(self, it): # toFloat
columns = self.columns; mode = self.mode # toFloat
if len(columns) == 0: # toFloat
if isinstance(it, np.ndarray): return it.astype(float) # toFloat
if isinstance(it, torch.Tensor): return it.float() # toFloat
if mode == 0: return it | cli.apply(float) # toFloat
return it | _toop(_toFloat, None, mode == 1, 0.0) # toFloat
else: return it | cli.init.serial(*(_toop(_toFloat, c, mode == 1, 0.0) for c in columns)) # toFloat
def _toInt(e) -> Union[int, None]: # _toInt
try: return int(float(e)) # _toInt
except: return None # _toInt
[docs]class toInt(BaseCli): # toInt
[docs] def __init__(self, *columns, mode=2): # toInt
"""Converts every row into an integer. Example::
# returns [1, 3, -2]
["1", "3", "-2.3"] | toInt() | deref()
:param columns: if nothing, then will convert each row. If available, then
convert all the specified columns
:param mode: different conversion styles
- 0: simple ``float()`` function, fastest, but will throw errors if it can't be parsed
- 1: if there are errors, then replace it with zero
- 2: if there are errors, then eliminate the row
See also: :meth:`toFloat`""" # toInt
self.columns = columns; self.mode = mode; # toInt
[docs] def __ror__(self, it): # toInt
columns = self.columns; mode = self.mode # toInt
if len(columns) == 0: # toInt
if isinstance(it, np.ndarray): return it.astype(int) # toInt
if isinstance(it, torch.Tensor): return it.int() # toInt
if mode == 0: return it | cli.apply(int) # toInt
return it | _toop(_toInt, None, mode == 1, 0.0) # toInt
else: return it | cli.init.serial(*(_toop(_toInt, c, mode == 1, 0.0) for c in columns)) # toInt
[docs]class toBytes(BaseCli): # toBytes
[docs] def __init__(self, imgType="JPEG"): # toBytes
"""Converts several object types to bytes.
Example::
# converts string to bytes
"abc" | toBytes()
# converts image to base64 bytes
torch.randn(200, 100) | toImg() | toBytes()
.. admonition:: Custom datatype
It is possible to build objects that can interoperate with this cli,
like this::
class custom1:
def __init__(self, config=None): ...
def _toBytes(self): return b"abc"
custom1() | toBytes() # returns b"abc"
When called upon, :class:`toBytes` will detect that the input has the ``_toBytes``
method, which will prompt it to execute that method of the complex object. Of
course, this means that you can return anything, not necessarily bytes, but to
maintain intuitiveness, you should return either bytes or iterator of bytes
:param imgType: if input is an image then this is the image type. Can
change to "PNG" or sth like that""" # toBytes
self.imgType = imgType # toBytes
[docs] def __ror__(self, it): # toBytes
if isinstance(it, str): return it.encode() # toBytes
if hasPIL: # toBytes
if isinstance(it, PIL.Image.Image): # toBytes
it = it | toRgb(); buffered = io.BytesIO() # toBytes
it.save(buffered, format=self.imgType); return buffered.getvalue() # toBytes
if hasattr(it, "_toBytes"): return it._toBytes() # toBytes
import dill; return dill.dumps(it) # toBytes
[docs]class toHtml(BaseCli): # toHtml
[docs] def __init__(self): # toHtml
"""Converts several object types to bytes.
Example::
# converts PIL image to html <img> tag
torch.randn(200, 100) | toImg() | toHtml()
""" # toHtml
pass # toHtml
[docs] def __ror__(self, it): # toHtml
if hasPIL: # toHtml
if isinstance(it, PIL.Image.Image): # toHtml
it = it | toBytes(imgType="PNG") | cli.aS(base64.b64encode) | cli.op().decode() # toHtml
return f"<img src=\"data:image/png;base64, {it}\" />" # toHtml
if hasPlotly: # toHtml
if isinstance(it, plotly.graph_objs._figure.Figure): # toHtml
out = io.StringIO(); it.write_html(out); out.seek(0); return out.read() # toHtml
try: return it._repr_html_() # toHtml
except: return it.__repr__() # toHtml
try: # toHtml
from rdkit import Chem # toHtml
from rdkit.Chem import Draw # toHtml
from rdkit.Chem import AllChem # toHtml
from rdkit.Chem.Draw import IPythonConsole # toHtml
IPythonConsole.drawOptions.addAtomIndices = True # toHtml
__all__ = [*__all__, "toMol", "toSmiles"] # toHtml
def toMol(): # toHtml
"""Smiles to molecule.
Example::
"c1ccc(C)cc1" | toMol()""" # toHtml
return cli.aS(Chem.MolFromSmiles) # toHtml
def toSmiles(): # toHtml
"""Molecule to smiles.
Example::
"c1ccc(C)cc1" | toMol() | toSmiles()""" # toHtml
return cli.aS(Chem.MolToSmiles) # toHtml
except: pass # toHtml
import unicodedata, hashlib # toHtml
[docs]def toAscii(): # toAscii
"""Converts complex unicode text to its base ascii form.
Example::
"hà nội" | toAscii() # returns "ha noi"
Taken from https://stackoverflow.com/questions/2365411/convert-unicode-to-ascii-without-errors-in-python""" # toAscii
return cli.aS(lambda word: unicodedata.normalize('NFKD', word).encode('ascii', 'ignore')) # toAscii
[docs]def toHash() -> str: # toHash
"""Converts some string into some hash string.
Example::
"abc" | toHash() # returns 'gASVJAAAAAAAAABDILp4Fr+PAc/qQUFA3l2uIiOwA2Gjlhd6nLQQ/2HyABWtlC4='
Why not just use the builtin function ``hash("abc")``? Because it generates different
hashes for different interpreter sessions, and that breaks many of my applications that
need the hash value to stay constant forever.""" # toHash
def hashF(msg:str) -> str: m = hashlib.sha256(); m.update(f"{msg}".encode()); return k1lib.encode(m.digest()) # toHash
return cli.aS(hashF) # toHash
import csv; pd = k1lib.dep("pandas") # toHash
[docs]class toCsv(BaseCli): # toCsv
[docs] def __init__(self, allSheets=False): # toCsv
"""Converts a csv file name into a table.
Example::
"abc.csv" | toCsv() # returns table of values
"def.xlsx" | toCsv() # returns table of values in the first sheet
"def.xlsx" | toCsv(True) # returns List[Sheet name (str), table of values]
:param allSheets: if input is an Excel sheet, whether to read in all sheets or
just the first sheet. No effect if input is a normal csv file""" # toCsv
self.allSheets = allSheets # toCsv
[docs] def __ror__(self, fn:str): # toCsv
fn = os.path.expanduser(fn) # toCsv
if fn.endswith(".xls") or fn.endswith(".xlsx"): # toCsv
if self.allSheets: return [[k, v.values] for k,v in pd.read_excel(fn, sheet_name=None).items()] # toCsv
else: return pd.read_excel(fn).values # toCsv
def gen(): # toCsv
with open(fn) as f: yield from csv.reader(f) # toCsv
return gen() # toCsv