Source code for k1lib.cli.inp

# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""This module for tools that will likely start the processing stream."""
from typing import Iterator, Union, Any
import k1lib, urllib, subprocess, warnings, os, k1lib, threading, time, warnings, math, io, dill
from k1lib.cli import BaseCli; import k1lib.cli as cli
from k1lib.cli.typehint import *
__all__ = ["cat", "splitSeek", "curl", "wget", "ls", "cmd", "walk", "requireCli"]
settings = k1lib.settings.cli
catSettings = k1lib.Settings().add("chunkSize", 100000, "file reading chunk size for binary+chunk mode. Decrease it to avoid wasting memory and increase it to avoid disk latency")
catSettings.add("every", k1lib.Settings().add("text", 1000, "for text mode, will print every n lines").add("binary", 10, "for binary mode, will print every n 100000-byte blocks"), "profiler print frequency")
settings.add("cat", catSettings, "inp.cat() settings")
def _catGenText(fn, sB, eB): # fn for "file name"
    if sB == 0 and eB == -1: # fast path without bounds (90-160 MB/s expected)
        with open(fn) as f:
            while True:
                line = f.readline()
                if line == "": return
                yield line[:-1] if line[-1] == "\n" else line
    else: # slow path with bounds (15 MB/s expected)
        sB = wrap(fn, sB); eB = wrap(fn, eB)
        with open(fn) as f:
            f.seek(sB); b = sB # current byte
            while True:
                line = f.readline()
                if line == "": return
                if f.tell() > eB: yield line[:len(line)-(f.tell()-eB)]; return
                if line[-1] == "\n": yield line[:-1]
                else: yield line
def _catGenBin(fn, sB, eB):
    chunkSize = settings.cat.chunkSize; sB = wrap(fn, sB); eB = wrap(fn, eB); nB = eB - sB # number of bytes to read total
    with open(fn, "rb") as f:
        f.seek(sB); nChunks = math.ceil(nB / chunkSize); lastChunkSize = nB - chunkSize*(nChunks-1)
        yield from range(nChunks) | cli.applyTh(lambda i: f.read(chunkSize) if i < nChunks-1 else f.read(chunkSize)[:lastChunkSize], prefetch=10)
def fileLength(fn):
    with open(fn, 'rb') as f: return f.seek(0, os.SEEK_END)
def wrap(fn, b): return b if b >= 0 else b + fileLength(fn) + 1
class _cat(BaseCli):
    def __init__(self, text, chunks, sB, eB): self.text = text; self.chunks = chunks; self.sB = sB; self.eB = eB
    def _typehint(self, ignored=None):
        if self.text: return tIter(str) if self.chunks else tList(str)
        else: return tIter(bytes) if self.chunks else bytes
    def __ror__(self, fn:str) -> Union[Iterator[str], bytes]:
        text = self.text; chunks = self.chunks; sB = self.sB; eB = self.eB
        fn = os.path.expanduser(fn)
        if text and chunks and k1lib._settings.packages.k1a and sB == 0 and eB == -1:
            return k1lib._k1a.k1a.StrIterCat(fn) # if there's a need to accelerate this if bounds are specified then I'll add it to k1a. Right now doesn't seem to be worth it
        if chunks: return _catGenText(fn, sB, eB) if text else _catGenBin(fn, sB, eB)
        sB = wrap(fn, sB); eB = wrap(fn, eB)
        if text:
            with open(fn) as f: f.seek(sB); return f.read(eB-sB).splitlines()
        else:
            with open(fn, "rb") as f: f.seek(sB); return f.read(eB-sB)
class Profile(BaseCli):
    def __init__(self, text): self.data = []; self.text = text
    def __ror__(self, it):
        fmt = k1lib.fmt; chars = 0; beginTime = time.time()
        if self.text:
            a, b, c, d, f = k1lib.ConstantPad.multi(5); every = settings.cat.every.text
            for lines, e in enumerate(it):
                chars += len(e)
                if lines % every == 0: # every 1000 lines, print stuff out
                    elapsed = time.time() - beginTime#; self.data.append([lines, chars, elapsed])
                    print(f"Current line: {fmt.item(lines) | a} ({fmt.item(lines/elapsed) | b} lines/s), current byte/chars: {fmt.size(chars) | c} ({fmt.size(chars/elapsed) | d}/s), elapsed: {fmt.time(elapsed) | f}                                 ", end="\r")
                yield e
        else:
            a, b, c = k1lib.ConstantPad.multi(3); every = settings.cat.every.binary
            for i, e in enumerate(it):
                chars += len(e)
                if i % every == 0: # every 10 100000-byte chunks, print stuff out
                    elapsed = time.time() - beginTime#; self.data.append([chars, elapsed])
                    print(f"Current size/chars: {fmt.size(chars) | a} ({fmt.size(chars/elapsed) | b}/s), elapsed: {fmt.time(elapsed) | c}                                 ", end="\r")
                yield e
[docs]def cat(fileName:str=None, text:bool=True, chunks:bool=None, profile:bool=False, sB=0, eB=-1): """Reads a file line by line. Example:: # display first 10 lines of file cat("file.txt") | headOut() # piping in also works "file.txt" | cat() | headOut() # read bytes from an image file and dumps it to another file cat("img.png", False) | file("img2.png") If you want to read only specific sections of the file, you can specify the start (``sB``) and end byte (``eB``) like this:: "123456\\n89" | file("test/catTest.pth") # returns ['3456', '8'] cat("test/catTest.pth", sB=2, eB=8) | deref() settings.cat.context.chunkSize=3 # just for demonstration, don't do it normally # returns [b'123', b'456', b'\\n8'] cat("test/catTest.pth", text=False, chunks=True, eB=8) | deref() If you are working with large files and would like to read 1 file from multiple threads/processes, then you can use this cli in conjunction with :class:`splitSeek`. If you are dumping multiple pickled objects into a single file, you can read all of them using :meth:`cat.pickle`. This cli has lots of settings at :data:`~k1lib.settings`.cli.cat :param fileName: if None, then return a :class:`~k1lib.cli.init.BaseCli` that accepts a file name and outputs Iterator[str] :param text: if True, read text file, else read binary file :param chunks: if True then reads the file chunk by chunk, else reads the entire file. Defaults to True in text mode and False in binary mode :param profile: whether to profile the file reading rate or not. Can adjust printing frequency using `settings.cli.cat.every` :param sB: "start byte". Specify this if you want to start reading from this byte :param eB: "end byte", inclusive. Default -1 means end of file""" if chunks is None: chunks = True if text else False if profile and not chunks: warnings.warn(f"Can't profile reading rate when you're trying to read everything at once"); profile = False f = _cat(text, chunks, sB, eB) if profile: f = f | Profile(text) return f if fileName is None else fileName | f
def _catPickle(fileName=None, pickleModule=dill): """Reads a file as a series of pickled objects. Example:: "ab" | aS(dill.dumps) | file("test/catTest.pth") "cd" | aS(dill.dumps) >> file("test/catTest.pth") # append to the file # returns ["ab", "cd"] cat.pickle("test/catTest.pth") | deref() # also returns ["ab", "cd"], same style as :class:`cat` "test/catTest.pth" | cat.pickle() | deref() :param fileName: name of the pickled file :param pickleModule: pickle module to use. Python's default is "pickle", but I usually use :mod:`dill` because it's more robust""" def gen(fn): with open(os.path.expanduser(fn), "rb") as f: try: while True: yield dill.load(f) except: pass return cli.aS(gen) if fileName is None else fileName | cli.aS(gen) cat.pickle = _catPickle
[docs]class splitSeek(BaseCli):
[docs] def __init__(self, n, c=b'\n'): """Splits a file up into n fragments aligned to the closest character and return the seek points. Example:: # preparing the large file range(120) | apply(lambda x: f"{x:3}_56789") | file("test/largeFile.txt") # returns [0, 30, 70, 110, 150, 190, 230, 270, 300, 340] "test/largeFile.txt" | splitSeek(31) | head() # returns 32 "test/largeFile.txt" | splitSeek(31) | shape(0) # returns 32, demonstrating you can also pipe file objects in, if that's what you want open("test/largeFile.txt") | splitSeek(31) | shape(0) # returns [0, 0, 10, 10, 20, 30, 30, 40, 40, 50], notice some segments have zero length "test/largeFile.txt" | splitSeek(200) | head() So, the generated file has 120 lines in total. Each line is 10 bytes (9 for the string, and 1 for the new line character). Splitting the file into 31 fragments will result in 32 seek points (:math:`p_i\quad i\in[1, n+1]`). You can then use these seek points to read the file in multiple threads/processes using :meth:`cat`, like this:: # returns [[' 0_56789', ' 1_56789', ' 2_56789'], [' 3_56789', ' 4_56789', ' 5_56789', ' 6_56789']] "test/largeFile.txt" | splitSeek(31) | window(2) | ~apply(lambda sB, eB: cat("test/largeFile.txt", sB=sB, eB=eB-1)) | head(2) | deref() Because :math:`120/31\\approx4`, most of cat's reads contain 4 lines, but some has 3 lines. Also notice that the lines smoothly transitions between cat's reads (``2_56789`` to ``3_56789``), so that's pretty nice. .. warning:: You have to really test whether reading the same file from multiple processes is going to be really faster or not. If your data is stored in a HDD (aka hard drive, with spinning disks), then it will actually slow you down (10x-100x), because the disk will have to context switch all the time, and each switch has a 10ms cost. You also have to take into account collecting together the results of all processes, which can bottleneck the cpu. Read more about concurrency pitfalls at :class:`~k1lib.cli.modifier.applyMp`. In some scenarios where you want to adjust the seek points even more, like when you want to parse FASTA genome files, which has blocks of 2/4 lines each like this: .. code-block:: text @FP200005993L1C001R00100000061/2 TTTTAAACTTGCATTCTTTGGAGATTTGCTGAGTGTTGCTAGAGCTGGGAAACTTTTTTAATGAGATACGTGCATATTTTTCAAATTTACAGATCTTTTTTCACAAAAATAGAAAGTCATAAATGTGAAATGGAAACCTAAACAAGGCAA + GFEEEDEFGFFFFEFFFFFIFCEEEFFFGFFDFEGEDGFFFGDGFFGDGCE@GGGEEFDFGFGCFDFGGHCHFFFGFFFFFGEFDFFGHGFGEEHGFGEGFGFHFFEGFFFE;GEGEFGGHFFEI=GDAEDIFDDFGHFGEFGFEGGGGF @FP200005993L1C001R00100000167/2 CTGGAATTTGGTATCTTATTGCCAAAGAATCTGTTTTGTGAAACTTGGGATCTCTATTTTAATGTTAATTCTGGTCAGTTGTGCCTAAACTCCATAAAGCAGGGACTATACTGAGGCGTATTCAATCTTCCTTCTTACCAAGGCCAGGAA + EEFECEDEFFCGFFFFFEEEGEGFEDECCEFEFDFEEFDFEDDFEFEEFDDFFEEFFEEFEFFHEEFEEFEFFDEFFFECF>FFFEFEDFCFFFEGFEDEEGDDFEEFEFGEEBD@EG>EEFFECEEGFEEEFFEDGEEEDE5EBDG:CC Here, each 4 lines are (title, read, blank, quality). Because by default, this will only split neatly along new lines, you will have to write extra functions to detect if a particular seek point is desirable, and if not, either jump forward or backward using :meth:`splitSeek.forward` and :meth:`splitSeek.backward`. :param n: how many splits do you want? :param c: block-boundary character, usually just the new line character""" self.n = n; self.c = c
[docs] @staticmethod def forward(f, i:int, c=b'\n') -> int: """Returns char location after the search char, going forward. Example:: f = io.BytesIO(b"123\\n456\\n789\\nabc") f | splitSeek(2) # returns [0, 4, 15] splitSeek.forward(f, 2) # returns 4 splitSeek.forward(f, 3) # returns 4 splitSeek.forward(f, 4) # returns 8 :param f: file handle :param i: current seek point :param c: block-boundary character""" def inner(f): f.seek(i) while True: b = f.tell(); s = f.read(1000); di = s.find(c) if di > -1: return b + di + 1 if s == "": return -1 if isinstance(f, str): with open(os.path.expanduser(f), "rb") as _f: return inner(_f) else: return inner(f)
[docs] @staticmethod def backward(f, i:int, c=b'\n') -> int: """Returns char location after the search char, going backward. Example:: f = io.BytesIO(b"123\\n456\\n789\\nabc") f | splitSeek(2) # returns [0, 4, 15] splitSeek.backward(f, 5) # returns 4 splitSeek.backward(f, 4) # returns 4 splitSeek.backward(f, 3) # returns 0 :param f: file handle :param i: current seek point :param c: block-boundary character""" def inner(f): mul = 1 while True: begin = max(i-1000*mul, 0); end = max(i-1000*(mul-1), 0); mul += 1 # search range f.seek(begin); b = f.tell() s = f.read(end-begin); di = s.rfind(c) if di > -1: return b + di + 1 if b == 0: return 0 if isinstance(f, str): with open(os.path.expanduser(f), "rb") as _f: return inner(_f) else: return inner(f)
[docs] def __ror__(self, fn): n = self.n; c = self.c def func(f): f.seek(0, os.SEEK_END); end = f.tell(); return [*range(n) | cli.apply(lambda x: int(x*end/n)) | cli.apply(lambda x: splitSeek.backward(f, x, c)), end] if isinstance(fn, str): with open(os.path.expanduser(fn), 'rb') as f: return func(f) else: return func(fn)
[docs]def curl(url:str) -> Iterator[str]: """Gets file from url. File can't be a binary blob. Example:: # prints out first 10 lines of the website curl("https://k1lib.github.io/") | headOut()""" for line in urllib.request.urlopen(url): line = line.decode() if line[-1] == "\n": yield line[:-1] else: yield line
[docs]def wget(url:str, fileName:str=None): """Downloads a file. Also returns the file name, in case you want to pipe it to something else. :param url: The url of the file :param fileName: if None, then tries to infer it from the url""" if fileName is None: fileName = url.split("/")[-1] urllib.request.urlretrieve(url, fileName) return fileName
[docs]def ls(folder:str=None): """List every file and folder inside the specified folder. Example:: # returns List[str] ls("/home") # same as above "/home" | ls() # only outputs files, not folders ls("/home") | filt(os.path.isfile)""" if folder is None: return _ls() else: return folder | _ls()
class _ls(BaseCli): def _typehint(self, ignored=None): return tList(str) def __ror__(self, folder:str): folder = os.path.expanduser(folder.rstrip(os.sep)) return [f"{folder}{os.sep}{e}" for e in os.listdir(folder)] k1lib.settings.cli.add("quiet", False, "whether to mute extra outputs from clis or not") newline = b'\n'[0] class lazySt: def __init__(self, st, text:bool): """Converts byte stream into lazy text/byte stream, with nice __repr__.""" self.st = st; self.text = text; def __iter__(self): if self.text: while True: line = self.st.readline() if len(line) == 0: break yield line.decode().rstrip("\n") else: while True: line = self.st.readline() if len(line) == 0: break yield line def __repr__(self): self | cli.stdout(); return "" def executeCmd(cmd:str, inp:bytes, text): """Runs a command, and returns stdout and stderr streams""" p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=k1lib.settings.wd) if inp is not None: if isinstance(inp, (str, bytes)): p.stdin.write(inp if isinstance(inp, bytes) else inp.encode()) else: for e in inp: if not isinstance(e, (str, bytes)): e = str(e) if not isinstance(e, bytes): e = e.encode() p.stdin.write(e); p.stdin.write(b"\n") p.stdin.close(); return lazySt(p.stdout, text), lazySt(p.stderr, text) def printStderr(err): if not k1lib.settings.cli.quiet: e, it = err | cli.peek() if it != []: it | cli.insert("\nError encountered:\n") | cli.apply(k1lib.fmt.txt.red) | cli.stdout()
[docs]def requireCli(cliTool:str): """Searches for a particular cli tool (eg. "ls"), throws ImportError if not found, else do nothing""" a = cmd(cliTool); None | a; if len(a.err) > 0: raise ImportError(f"""Can't find cli tool {cliTool}. Please install it first.""")
[docs]class cmd(BaseCli):
[docs] def __init__(self, cmd:str, mode:int=1, text=True, block=False): # 0: return (stdout, stderr). 1: return stdout, 2: return stderr """Runs a command, and returns the output line by line. Can pipe in some inputs. If no inputs then have to pipe in :data:`None`. Example:: # return detailed list of files None | cmd("ls -la") # return list of files that ends with "ipynb" None | cmd("ls -la") | cmd('grep ipynb$') It might be tiresome to pipe in :data:`None` all the time. So, you can use ">" operator to yield values right away:: # prints out first 10 lines of list of files cmd("ls -la") > headOut() If you're using Jupyter notebook/lab, then if you were to display a :class:`cmd` object, it will print out the outputs. So, a single command ``cmd("mkdir")`` displayed at the end of a cell is enough to trigger creating the directory. Reminder that ">" operator in here sort of has a different meaning to that of :class:`~k1lib.cli.init.BaseCli`. So you kinda have to becareful about this:: # returns a serial cli, cmd not executed cmd("ls -la") | deref() # executes cmd with no input stream and pipes output to deref cmd("ls -la") > deref() # returns a serial cli cmd("ls -la") > grep("txt") > headOut() # executes pipeline cmd("ls -la") > grep("txt") | headOut() General advice is, right ater a :class:`cmd`, use ">", and use "|" everywhere else. Let's see a few more exotic examples. File ``a.sh``: .. code-block:: bash #!/bin/bash echo 1; sleep 0.5 echo This message goes to stderr >&2 echo 2; sleep 0.5 echo $(</dev/stdin) sleep 0.5; echo 3 Examples:: # returns [b'1\\n', b'2\\n', b'45\\n', b'3\\n'] and prints out the error message "45" | cmd("./a.sh", text=False) | deref() # returns [b'This message goes to stderr\\n'] "45" | cmd("./a.sh", mode=2, text=False) | deref() # returns [[b'1\\n', b'2\\n', b'45\\n', b'3\\n'], [b'This message goes to stderr\\n']] "45" | cmd("./a.sh", mode=0, text=False) | deref() Performance-wise, stdout and stderr will yield values right away as soon as the process outputs it, so you get real time feedback. However, this will convert the entire input into a :class:`bytes` object, and not feed it bit by bit lazily, so if you have a humongous input, it might slow you down a little. Also, because stdout and stderr yield values right away, it means that if you want the operation to be blocking until finished, you have to consume the output:: None | cmd("mkdir abc") # might fail, because this might get executed before the previous line None | cmd("echo a>abc/rg.txt") None | cmd("mkdir abc") | ignore() # will succeed, because this will be guaranteed to execute after the previous line None | cmd("echo a>abc/rg.txt") Settings: - cli.quiet: if True, won't display errors in mode 1 :param mode: if 0, returns ``(stdout, stderr)``. If 1, returns ``stdout`` and prints ``stderr`` if there are any errors. If 2, returns ``stderr`` :param text: whether to decode the outputs into :class:`str` or return raw :class:`bytes` :param block: whether to wait for the task to finish before returning to Python or not""" super().__init__(); self.cmd = cmd; self.mode = mode self.text = text; self.block = block; self.ro = k1lib.RunOnce()
def _typehint(self, ignored=None): t = tIter(str) if self.text else tIter(bytes) if self.mode == 0: return tCollection(t, t) return t
[docs] def __ror__(self, it:Union[None, str, bytes, Iterator[Any]]) -> Iterator[Union[str, bytes]]: """Pipes in lines of input, or if there's nothing to pass, then pass None""" if not self.ro.done(): self.out, self.err = executeCmd(self.cmd, it, self.text); mode = self.mode if self.block: self.out = self.out | cli.deref() self.err = self.err | cli.deref() if mode == 0: return (self.out, self.err) elif mode == 1: threading.Thread(target=lambda: printStderr(self.err)).start() return self.out elif mode == 2: return self.err
def __gt__(self, it): return None | self | it def __repr__(self): return (None | self).__repr__()
[docs]class walk(BaseCli):
[docs] def __init__(self, **kwargs): """Recursively get all files inside a dictionary. Example:: # prints out first 10 files "." | walk() | headOut()""" self.kwargs = kwargs
[docs] def __ror__(self, path): return os.walk(path, **self.kwargs) | ~cli.apply(lambda x, y, z: z | cli.apply(lambda e: x + os.sep + e)) | cli.joinStreams()