# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
For operations that feel like the termination
"""
from collections import defaultdict
from typing import Iterator, Any
from k1lib.cli.init import BaseCli, Table
import torch, numbers, numpy as np, k1lib, tempfile, os, sys, time, shutil
from k1lib import cli
__all__ = ["stdout", "tee", "file", "pretty", "display", "headOut",
"intercept", "split"]
settings = k1lib.settings.cli
[docs]class stdout(BaseCli):
[docs] def __init__(self):
"""Prints out all lines. If not iterable, then print out the input
raw. Example::
# prints out "0\\n1\\n2"
range(3) | stdout()
# same as above, but (maybe?) more familiar
range(3) > stdout()"""
super().__init__()
[docs] def __ror__(self, it:Iterator[str]):
try:
it = iter(it)
for line in it: print(line)
except TypeError: print(it)
[docs]class tee(BaseCli):
[docs] def __init__(self, f=(lambda s: f"{s}\n"), s=None):
"""Like the Linux ``tee`` command, this prints the elements to another
specified stream, while yielding the elements. Example::
# prints "0\\n1\\n2\\n3\\n4\\n" and returns [0, 1, 4, 9, 16]
range(5) | tee() | apply(op() ** 2) | deref()
:param f: element transform function. Defaults to just adding a new
line at the end
:param s: stream to write to. Defaults to :attr:`sys.stdout`"""
self.s = s or sys.stdout; self.f = f
[docs] def __ror__(self, it):
s = self.s; f = self.f
for e in it:
print(f(e), end="", file=s)
yield e
[docs] @staticmethod
def cr(f=lambda x: x):
"""Tee, but replaces the previous line. "cr" stands for carriage return.
Example::
# prints "4" and returns [0, 1, 4, 9, 16]. Does print all the numbers in the middle, but is overriden
range(5) | tee.cr() | apply(op() ** 2) | deref()"""
return tee(lambda s: f"\r{f(s)}")
[docs] @staticmethod
def crt(f=lambda x: x):
"""Like :meth:`tee.cr`, but includes an elapsed time text at the end"""
beginTime = time.time()
return tee(lambda s: f"\r{f(s)}, {int(time.time() - beginTime)}s elapsed")
[docs]class file(BaseCli):
[docs] def __init__(self, fileName:str=None, text:bool=True):
"""Opens a new file for writing.
Example::
# writes "0\\n1\\n2\\n" to file
range(3) | file("test/f.txt")
# same as above, but (maybe?) more familiar
range(3) > file("text/f.txt")
# returns ['0', '1', '2']
cat("folder/f.txt") | deref()
# writes bytes to file
b'5643' | file("test/a.bin", False)
# returns ['5643']
cat("test/a.bin") | deref()
You can create temporary files on the fly::
# creates temporary file
url = range(3) > file()
# returns ['0', '1', '2']
cat(url) | deref()
This can be especially useful when integrating with shell scripts that wants to
read in a file::
seq1 = "CCAAACCCCCCCTCCCCCGCTTC"
seq2 = "CCAAACCCCCCCCTCCCCCCGCTTC"
# use "needle" program to locally align 2 sequences
None | cmd(f"needle {[seq1] > file()} {[seq2] > file()} -filter")
You can also append to file with the ">>" operator::
url = range(3) > file()
# appended to file
range(10, 13) >> file(url)
# returns ['0', '1', '2', '10', '11', '12']
cat(url) | deref()
:param fileName: if not specified, create new temporary file and returns the url
when pipes into it
:param text: if True, accepts Iterator[str], and prints out each string on a
new line. Else accepts bytes and write in 1 go."""
super().__init__(); self.fileName = fileName; self.text = text
self.append = False # whether to append to file rather than erasing it
[docs] def __ror__(self, it:Iterator[str]) -> None:
super().__ror__(it); fileName = self.fileName
if fileName is None:
f = tempfile.NamedTemporaryFile()
fileName = f.name; f.close()
fileName = os.path.expanduser(fileName)
if self.text:
with open(fileName, "a" if self.append else "w") as f:
for line in it: f.write(f"{line}\n")
else:
with open(fileName, "ab" if self.append else "wb") as f: f.write(it)
return fileName
def __rrshift__(self, it):
self.append = True
return self.__ror__(it)
@property
def name(self):
"""File name of this :class:`file`"""
return self.fileName
[docs]class pretty(BaseCli):
[docs] def __init__(self, delim=""):
"""Pretty prints a table. Not really used directly.
Example::
# These 2 statements are pretty much the same
[range(10), range(10)] | head(5) | pretty() > stdout()
[range(10), range(10)] | display()"""
self.delim = delim
[docs] def __ror__(self, it:Table[Any]) -> Iterator[str]:
table = []; widths = defaultdict(lambda: 0)
for row in it:
_row = []
for i, e in enumerate(row):
e = f"{e}"; _row.append(e)
widths[i] = max(len(e), widths[i])
table.append(_row)
for row in table:
yield self.delim.join(e.rstrip(" ").ljust(w+3) for w, e in zip(widths.values(), row))
[docs]def display(lines:int=10):
"""Convenience method for displaying a table.
Pretty much equivalent to ``head() | pretty() | stdout()``.
See also: :class:`pretty`"""
f = pretty() | stdout()
if lines is None: return f
else: return cli.head(lines) | f
[docs]def headOut(lines:int=10):
"""Convenience method for head() | stdout()"""
if lines is None: return stdout()
else: return cli.head(lines) | stdout()
[docs]class intercept(BaseCli):
[docs] def __init__(self, raiseError:bool=True):
"""Intercept flow at a particular point, analyze the object piped in, and
raises error to stop flow. Example::
3 | intercept()
:param raiseError: whether to raise error when executed or not."""
self.raiseError = raiseError
[docs] def __ror__(self, s):
print(type(s))
if isinstance(s, (numbers.Number, str, bool)):
print(k1lib.tab(f"{s}"))
elif isinstance(s, (tuple, list)):
print(k1lib.tab(f"Length: {len(s)}"))
for e in s: print(k1lib.tab(f"- {type(e)}"))
elif isinstance(s, settings.arrayTypes):
print(k1lib.tab(f"Shape: {s.shape}"))
if s.numel() < 1000:
print(k1lib.tab(f"{s}"))
if self.raiseError: raise RuntimeError("intercepted")
return s
a = k1lib.AutoIncrement()
[docs]class split(BaseCli):
[docs] def __init__(self, n=10, baseFolder="/tmp"):
"""Splits a large file into multiple fragments, and returns the
path to those files. Example::
# returns a list of 20 files
"big-file.csv" | split(20)
This uses the underlying ``split`` linux cli tool. This also means that
it's not guaranteed to work on macos or windows.
Overtime, there will be lots of split files after a session, so be sure
to clean them up to reduce disk size::
split.clear()
:param n: Number of files to split into
:param baseFolder: Base folder where all the splitted files are"""
baseFolder = os.path.expanduser(baseFolder); self.n = n
self.folder = f"{baseFolder}/k1lib_split/{int(time.time())}_{a()}"
try: shutil.rmtree(self.folder)
except: pass
[docs] def __ror__(self, file):
os.makedirs(self.folder, exist_ok=True)
if not isinstance(file, str): raise RuntimeError("Not a file name!")
None | cli.cmd(f"split -n l/{self.n} \"{os.path.expanduser(file)}\" \"{self.folder}/pr-\"") | cli.ignore()
return [f"{self.folder}/{f}" for f in os.listdir(self.folder)]
[docs] @staticmethod
def clear(baseFolder="/tmp"):
"""Clears all splitted temporary files."""
baseFolder = os.path.expanduser(baseFolder)
try: shutil.rmtree(f"{baseFolder}/k1lib_split")
except: pass