# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""This module for tools that will likely start the processing stream."""
from typing import Iterator, Union, Any, List
import k1lib, urllib, subprocess, warnings, os, k1lib, threading, time, warnings, math, io, dill, urllib, validators
from collections import deque
from k1lib.cli import BaseCli, init; import k1lib.cli as cli
from k1lib.cli.typehint import *
from contextlib import contextmanager
requests = k1lib.dep("requests")
try: import minio; hasMinio = True
except: hasMinio = False
__all__ = ["cat", "catPickle", "splitSeek", "refineSeek", "wget", "ls", "cmd", "walk", "requireCli", "urlPath", "kzip", "kunzip", "unzip"]
settings = k1lib.settings.cli
class NoPartialContent(Exception): pass                                          # NoPartialContent
def getChunk(url:str, sB:int, eB:int, timeout:float, retries:int) -> bytes: # start inclusive, end exclusive # getChunk
    for i in range(retries):                                                     # getChunk
        try: res = requests.get(url, headers={"Range": f"bytes={sB}-{eB-1}"}, timeout=timeout) # getChunk
        except Exception as e:                                                   # getChunk
            if i >= retries-1: raise Exception(f"Can't get file chunk")          # getChunk
            continue                                                             # getChunk
        if res.status_code != 206: raise NoPartialContent(f"Server doesn't allow partial downloads at this particular url. Status code: {res.status_code}") # getChunk
        return res.content                                                       # getChunk
def getChunks(url:str, sB:int, eB:int, chunkSize=None, chunkTimeout:float=10, chunkRetries:int=10) -> List[bytes]: # getChunks
    """Grabs bytes from sB to eB in chunks"""                                    # getChunks
    chunkSize = chunkSize or settings.cli.cat.chunkSize                          # getChunks
    return range(sB, eB+1) | cli.batched(chunkSize, True) | cli.apply(lambda r: getChunk(url, r.start, r.stop-1, chunkTimeout, chunkRetries)) # getChunks
catSettings = k1lib.Settings().add("chunkSize", 100000, "file reading chunk size for binary+chunk mode. Decrease it to avoid wasting memory and increase it to avoid disk latency") # getChunks
catSettings.add("every", k1lib.Settings().add("text", 1000, "for text mode, will print every n lines").add("binary", 10, "for binary mode, will print every n 100000-byte blocks"), "profiler print frequency") # getChunks
settings.add("cat", catSettings, "inp.cat() settings")                           # getChunks
                                                                                 # getChunks
rfS = k1lib.Settings()                                                           # getChunks
settings.add("RemoteFile", rfS, "inp.RemoteFile() settings, used in cat(), splitSeek() and the like") # getChunks
rfS.add("memoryLimit", 100_000_000, "if the internal cache exceeds this limit (in bytes), and randomAccess is False, then old downloaded chunks will be deleted") # getChunks
rfS.add("timeout", 10, "seconds before terminating the remote request and retrying") # getChunks
rfS.add("retries", 10, "how many times to retry sending the request before giving up") # getChunks
def noPartial(url, *args):                                                       # noPartial
    try: return len(getChunk(url, 0, 10, *args)) != 10                           # noPartial
    except NoPartialContent: return True                                         # noPartial
class RemoteFile:                                                                # RemoteFile
    def __init__(self, url, randomAccess=True, blockSize=None, noPartialConfirm=False, timeout:float=None, retries:int=None): # RemoteFile
        """
:param url: url of the remote file
:param randomAccess: is random accessing parts of the file expected? If
    True, then keeps all of the reads in ram internally, else free them
    as soon as possible
:param blockSize: all reads will fetch roughly this amount of bytes"""           # RemoteFile
        self.url = url; self.randomAccess = randomAccess; self.blockSize = blockSize or settings.cat.chunkSize # RemoteFile
        self.noPartialConfirm = noPartialConfirm; self.size = None; self.domain = k1lib.Domain() # RemoteFile
        self.seekPos = 0; self.reads = deque() # List[sB, eB, content]           # RemoteFile
        self._confirmMsgShown = False; self.timeout = timeout or rfS.timeout; self.retries = retries or rfS.retries # RemoteFile
        self._totalReadSize = 0; self.noPartial = noPartial(url, self.timeout, self.retries) # RemoteFile
    def _fetch(self, sB:int, eB:int): # fetches from start to end byte and dumps to internal memory. Inclusive start and end byte # RemoteFile
        if not self.noPartial:                                                   # RemoteFile
            eB = max(eB, min(sB+self.blockSize, len(self)))                      # RemoteFile
            chunk = getChunk(self.url, sB, eB, self.timeout, self.retries)       # RemoteFile
        else:                                                                    # RemoteFile
            if self.noPartialConfirm and not self._confirmMsgShown:              # RemoteFile
                ans = input(f"""Remote file '{self.url}' don't support partial downloads.
Therefore the entire file will be loaded into RAM, which
could be undesireable. Do you want to continue? Y/n: """)                        # RemoteFile
                self._confirmMsgShown = True                                     # RemoteFile
                if ans.lower()[0] != "y": self.reads.append([0, 0, b""]); return # RemoteFile
            sB = 0; chunk = requests.get(self.url).content; eB = len(chunk)      # RemoteFile
        self.reads.append([sB, eB, chunk])                                       # RemoteFile
        self._totalReadSize += len(chunk); self.domain = self.domain + k1lib.Domain([sB, eB]) # RemoteFile
        if not self.randomAccess and self._totalReadSize > rfS.memoryLimit: # deletes old reads # RemoteFile
            sB, eB, chunk = self.reads.popleft()                                 # RemoteFile
            self._totalReadSize -= len(chunk)                                    # RemoteFile
    def _ensureRange(self, sB, eB): # makes sure that all ranges between sB and eB are available # RemoteFile
        missingDomain = k1lib.Domain([max(sB-30, 0), min(eB+30, len(self))]) & -self.domain # RemoteFile
        for sB, eB in missingDomain.ranges: self._fetch(sB, eB)                  # RemoteFile
    def _readChunks(self, sB, eB): # read from sB to eB, but in chunks, to be optimized. inclusive sB, exclusive eB # RemoteFile
        sB = max(min(sB, len(self)), 0); eB = max(min(eB, len(self)), 0); self._ensureRange(sB, eB) # RemoteFile
        return self.reads | cli.filt(~cli.aS(lambda s,e,chunk: e>=sB and s<=eB)) | cli.sort() | ~cli.apply(lambda s,e,chunk: chunk[max(sB-s, 0):len(chunk)+min(eB-e, 0)]) | cli.filt(len) # RemoteFile
    def seek(self, cookie, whence=0):                                            # RemoteFile
        if whence == 0: self.seekPos = cookie                                    # RemoteFile
        elif whence == 1: self.seekPos += cookie                                 # RemoteFile
        elif whence == 2: self.seekPos = len(self) + cookie                      # RemoteFile
        else: raise Exception("Invalid whence")                                  # RemoteFile
        return self.seekPos                                                      # RemoteFile
    def read(self, size, join=True):                                             # RemoteFile
        chunks = self._readChunks(self.seekPos, self.seekPos + size)             # RemoteFile
        self.seekPos += size; return b"".join(chunks) if join else chunks        # RemoteFile
    def readline(self, newLine=True):                                            # RemoteFile
        ans = []; seekPos = self.seekPos                                         # RemoteFile
        try:                                                                     # RemoteFile
            while self.seekPos < len(self):                                      # RemoteFile
                for chunk in self.read(self.blockSize, False):                   # RemoteFile
                    if len(chunk) == 0: raise SyntaxError()                      # RemoteFile
                    ans.append(chunk)                                            # RemoteFile
                    if b"\n" in chunk: raise SyntaxError()                       # RemoteFile
        except SyntaxError: pass                                                 # RemoteFile
        ans = b"".join(ans)                                                      # RemoteFile
        try: n = ans.index(b"\n")                                                # RemoteFile
        except ValueError: n = len(ans) # only happens at end of file            # RemoteFile
        self.seekPos = seekPos + n+1; return (ans[:n+1] if newLine else ans[:n]).decode() # RemoteFile
    def readlines(self, newLine=True):                                           # RemoteFile
        while True:                                                              # RemoteFile
            yield self.readline(newLine)                                         # RemoteFile
            if self.seekPos >= len(self): break                                  # RemoteFile
    def tell(self): return self.seekPos                                          # RemoteFile
    def _getSize(self):                                                          # RemoteFile
        if self.noPartial: self._fetch(0, 10); return self.reads[0][1]           # RemoteFile
        for i in range(self.retries):                                            # RemoteFile
            try: return requests.head(self.url, timeout=self.timeout).headers.items() | cli.apply(cli.op().lower(), 0) | cli.toDict() | cli.op()["content-length"].ab_int() # RemoteFile
            except Exception as e:                                               # RemoteFile
                if i >= self.retries: raise Exception(f"Can't get size of remote file: {e}") # RemoteFile
    def __len__(self):                                                           # RemoteFile
        if self.size is None: self.size = self._getSize()                        # RemoteFile
        return self.size                                                         # RemoteFile
    def __repr__(self): return f"<RemoteFile url={self.url} size={k1lib.fmt.size(len(self))}>" # RemoteFile
import zipfile, inspect                                                          # RemoteFile
class ZipWrapper:                                                                # ZipWrapper
    def __init__(self, a, zfn): self.a = a; self.zfn = zfn                       # ZipWrapper
    def __repr__(self):                                                          # ZipWrapper
        a = self.a; s = f" ({round(a.compress_size/a.file_size*100)}%)" if a.file_size > 0 else "" # ZipWrapper
        return f"<Zip subfile name='{a.filename}' {k1lib.fmt.size(a.file_size)} -> {k1lib.fmt.size(a.compress_size)}{s}>" # ZipWrapper
    @property                                                                    # ZipWrapper
    def size(self): return self.a.file_size                                      # ZipWrapper
    @property                                                                    # ZipWrapper
    def compressedSize(self): return self.a.compress_size                        # ZipWrapper
    def _catHandle(self):                                                        # ZipWrapper
        with zipfile.ZipFile(self.zfn) as zipf:                                  # ZipWrapper
            with zipf.open(self.a.filename) as subfile:                          # ZipWrapper
                yield subfile                                                    # ZipWrapper
@contextmanager                                                                  # ZipWrapper
def openFile(fn, text, noPartialConfirm=False): # can be actual file or url      # openFile
    if not isinstance(fn, str):                                                  # openFile
        if hasattr(fn, "_catHandle"): yield from fn._catHandle(); return # custom datatype case # openFile
        else: yield fn; return # file handle case, just return itself            # openFile
    if os.path.exists(fn):                                                       # openFile
        if text:                                                                 # openFile
            with open(fn, "r", settings.cat.chunkSize) as f: yield f             # openFile
        else:                                                                    # openFile
            with open(fn, "rb", settings.cat.chunkSize) as f: yield f            # openFile
    elif validators.url(fn) is True:                                             # openFile
        yield RemoteFile(fn, False, noPartialConfirm=noPartialConfirm)           # openFile
    else: raise FileNotFoundError(f"The file {fn} doesn't seem to exist and/or it's not a valid url") # openFile
def _catGenText(fn, sB, eB): # fn for "file name"                                # _catGenText
    try:                                                                         # _catGenText
        if sB == 0 and eB == -1: # fast path without bounds (90-160 MB/s expected) # _catGenText
            with openFile(fn, True) as f:                                        # _catGenText
                line = f.readline()                                              # _catGenText
                if isinstance(line, str): # why put the if outside the loop? Speed reasons. Also, isn't .readline() supposed to return a string? Well, cause we're supporting random file handles from custom datatypes, some asshole file handles might return bytes instead # _catGenText
                    while True:                                                  # _catGenText
                        if line == "": return                                    # _catGenText
                        yield line[:-1] if line[-1] == "\n" else line            # _catGenText
                        line = f.readline()                                      # _catGenText
                else:                                                            # _catGenText
                    while True:                                                  # _catGenText
                        line = line.decode()                                     # _catGenText
                        if line == "": return                                    # _catGenText
                        yield line[:-1] if line[-1] == "\n" else line            # _catGenText
                        line = f.readline()                                      # _catGenText
        else: # slow path with bounds (15 MB/s expected). Update: much faster now, expect only 40% slower than the path above # _catGenText
            sB = wrap(fn, sB); eB = wrap(fn, eB)                                 # _catGenText
            with openFile(fn, True) as f:                                        # _catGenText
                f.seek(sB); b = sB # current byte                                # _catGenText
                line = f.readline()                                              # _catGenText
                if isinstance(line, str):                                        # _catGenText
                    while True:                                                  # _catGenText
                        b += len(line)                                           # _catGenText
                        if len(line) == 0: return                                # _catGenText
                        if b > eB: yield line[:len(line)-(b-eB)]; return         # _catGenText
                        yield line[:-1] if line[-1] == "\n" else line            # _catGenText
                        line = f.readline()                                      # _catGenText
                else:                                                            # _catGenText
                    while True:                                                  # _catGenText
                        line = line.decode()                                     # _catGenText
                        b += len(line)                                           # _catGenText
                        if len(line) == 0: return                                # _catGenText
                        if b > eB: yield line[:len(line)-(b-eB)]; return         # _catGenText
                        yield line[:-1] if line[-1] == "\n" else line            # _catGenText
                        line = f.readline()                                      # _catGenText
    except FileNotFoundError: pass                                               # _catGenText
def _catGenBin(fn, sB, eB):                                                      # _catGenBin
    chunkSize = settings.cat.chunkSize; sB = wrap(fn, sB); eB = wrap(fn, eB); nB = eB - sB # number of bytes to read total # _catGenBin
    with openFile(fn, False) as f:                                               # _catGenBin
        f.seek(sB); nChunks = math.ceil(nB / chunkSize); lastChunkSize = nB - chunkSize*(nChunks-1) # _catGenBin
        # had to do this because RemoteFile is actually not thread-safe          # _catGenBin
        # applyF = (lambda f_: cli.apply(f_)) if isinstance(f, RemoteFile) else (lambda f_: cli.applyTh(f_, prefetch=10)) # _catGenBin
        applyF = (lambda f_: cli.apply(f_)) # actually, normal reads are not thread-safe either. Stupid me # _catGenBin
        yield from range(nChunks) | applyF(lambda i: f.read(chunkSize) if i < nChunks-1 else f.read(chunkSize)[:lastChunkSize]) # _catGenBin
def fileLength(fn):                                                              # fileLength
    with openFile(fn, False) as f: return f.seek(0, os.SEEK_END)                 # fileLength
def wrap(fn, b): return b if b >= 0 else b + fileLength(fn) + 1                  # wrap
class _cat(BaseCli):                                                             # _cat
    def __init__(self, text, chunks, sB, eB):                                    # _cat
        super().__init__(capture=True)                                           # _cat
        self.text = text; self.chunks = chunks; self.sB = sB; self.eB = eB       # _cat
    def _typehint(self, ignored=None):                                           # _cat
        if self.text: return tIter(str) if self.chunks else tList(str)           # _cat
        else: return tIter(bytes) if self.chunks else bytes                      # _cat
    def __ror__(self, fn:Union[str, "fileHandle"]) -> Union[Iterator[str], bytes]: # _cat
        ser = self.capturedSerial; text = self.text; chunks = self.chunks; sB = self.sB; eB = self.eB # _cat
        if not isinstance(fn, str) and hasattr(fn, "_cat"): # custom datatype, _cat method defined, so it will take control of things # _cat
            kwargs = {"text": text, "chunks": chunks, "sB": sB, "eB": eB}        # _cat
            kwdiff = [a for a, b in [["text",text!=True], ["chunks",chunks!=True if text else chunks!=False], ["sB",sB!=0], ["eB",eB!=-1]] if b] # _cat
            args = inspect.getfullargspec(fn._cat).args[1:]; n = len(args)       # _cat
            s = set(["ser", "kwargs"]); weirdArgs = [a for a in args if a not in s] # _cat
            if len(weirdArgs) > 0: raise Exception(f"Custom datatype `{type(fn)}` has ._cat() method, which expects only `ser` and `kwargs` arguments, but detected these arguments instead: {weirdArgs}. Please fix `{type(fn)}`") # _cat
            def guard():                                                         # _cat
                if kwdiff: raise Exception(f"Custom datatype `{type(fn)}` does not support custom cat() arguments like {kwdiff}") # _cat
            if n == 0: guard(); return fn._cat() | ser                           # _cat
            elif n == 1:                                                         # _cat
                if args[0] == "ser": guard(); return fn._cat(ser)                # _cat
                if args[0] == "kwargs": return fn._cat(kwargs) | ser             # _cat
            elif n == 2:                                                         # _cat
                if args[0] == "ser": return fn._cat(ser, kwargs)                 # _cat
                else: return fn._cat(kwargs, ser)                                # _cat
            else: raise Exception("Unreachable")                                 # _cat
        fn = os.path.expanduser(fn) if isinstance(fn, str) else fn               # _cat
        if text and chunks and k1lib._settings.packages.k1a and isinstance(fn, str) and os.path.exists(fn): # _cat
            return k1lib._k1a.k1a.StrIterCat(fn, sB, eB) | ser # accelerated C version # _cat
        if chunks: return (_catGenText(fn, sB, eB) | ser) if text else (_catGenBin(fn, sB, eB) | ser) # _cat
        sB = wrap(fn, sB); eB = wrap(fn, eB)                                     # _cat
        if text:                                                                 # _cat
            with openFile(fn, True) as f: f.seek(sB); return f.read(eB-sB).splitlines() | ser # _cat
        else:                                                                    # _cat
            with openFile(fn, False) as f: f.seek(sB); return f.read(eB-sB) | ser # _cat
    def _jsF(self, meta):                                                        # _cat
        """Only supports get requests and JS objects that has async ._cat() function""" # _cat
        fIdx = init._jsFAuto(); dataIdx = init._jsDAuto()                        # _cat
        header, _fIdx, _async = k1lib.kast.asyncGuard(self.capturedSerial._jsF(meta)) # _cat
        return f"""\
{header}
const {fIdx} = async ({dataIdx}) => {{
    if (typeof({dataIdx}) === "string") {{
        const res = await fetch({dataIdx}, {{ method: "GET" }})
        if (res.ok) return {'await ' if _async else ''}{_fIdx}(await res.text());
        throw new Error(`Can't fetch ${{{dataIdx}}}: ${{res.status}} - ${{res.statusText}}`)
    }}
    return {'await ' if _async else ''}{_fIdx}(await {dataIdx}._cat());
}}""", fIdx                                                                      # _cat
        pass                                                                     # _cat
class Profile(BaseCli):                                                          # Profile
    def __init__(self, text): self.data = []; self.text = text                   # Profile
    def __ror__(self, it):                                                       # Profile
        fmt = k1lib.fmt; chars = 0; beginTime = time.time()                      # Profile
        if self.text:                                                            # Profile
            a, b, c, d, f = k1lib.ConstantPad.multi(5); every = settings.cat.every.text # Profile
            for lines, e in enumerate(it):                                       # Profile
                chars += len(e)                                                  # Profile
                if lines % every == 0: # every 1000 lines, print stuff out       # Profile
                    elapsed = time.time() - beginTime#; self.data.append([lines, chars, elapsed]) # Profile
                    print(f"Current line: {fmt.item(lines) | a} ({fmt.item(lines/elapsed) | b} lines/s), current byte/chars: {fmt.size(chars) | c} ({fmt.size(chars/elapsed) | d}/s), elapsed: {fmt.time(elapsed) | f}                                 ", end="\r") # Profile
                yield e                                                          # Profile
        else:                                                                    # Profile
            a, b, c = k1lib.ConstantPad.multi(3); every = settings.cat.every.binary # Profile
            for i, e in enumerate(it):                                           # Profile
                chars += len(e)                                                  # Profile
                if i % every == 0: # every 10 100000-byte chunks, print stuff out # Profile
                    elapsed = time.time() - beginTime#; self.data.append([chars, elapsed]) # Profile
                    print(f"Current size/chars: {fmt.size(chars) | a} ({fmt.size(chars/elapsed) | b}/s), elapsed: {fmt.time(elapsed) | c}                                 ", end="\r") # Profile
                yield e                                                          # Profile
[docs]def cat(fileName:str=None, text:bool=True, chunks:bool=None, profile:bool=False, sB=0, eB=-1): # cat
    """Reads a file line by line.
Example::
    # display first 10 lines of file
    cat("file.txt") | headOut()
    # piping in also works
    "file.txt" | cat() | headOut()
    # read bytes from an image file and dumps it to another file
    cat("img.png", False) | file("img2.png")
If you want to read only specific sections of the file, you can specify the
start (``sB``) and end byte (``eB``) like this::
    "123456\\n89" | file("test/catTest.pth")
    # returns ['3456', '8']
    cat("test/catTest.pth", sB=2, eB=8) | deref()
    settings.cat.context.chunkSize=3 # just for demonstration, don't do it normally
    # returns [b'123', b'456', b'\\n8']
    cat("test/catTest.pth", text=False, chunks=True, eB=8) | deref()
.. admonition:: Remote files
    You can also read from urls directly, like this::
        cat("https://k1lib.com/latest/") | deref()
    For remote files like this, there are extra settings at :data:`~k1lib.settings`.cli.RemoteFile.
    This will also read the file chunk by chunk if required. If the website doesn't support
    partial downloads, then all of it will be downloaded and stored into ram, which may not be
    desireable. The available settings are:
    - timeout: seconds before killing the existing request
    - retries: try to resend the request for this much before giving up
If you are working with large files and would like to read 1 file from multiple
threads/processes, then you can use this cli in conjunction with :class:`splitSeek`.
If you are dumping multiple pickled objects into a single file, you can read all
of them using :meth:`cat.pickle`, which uses :class:`catPickle` underneath.
This cli has lots of settings at :data:`~k1lib.settings`.cli.cat
See also: :meth:`ls`
.. admonition:: Custom datatype
    It is possible to build objects that can interoperate with this cli,
    like this::
        class custom1:
            def __init__(self, config=None): ...
            def _cat(self): return ["abc", "def"]
        custom1() |  cat()           # returns ["abc", "def"]
        custom1() |  cat() | item()  # returns "abc"
        custom1() | (cat() | item()) # returns "abc"
    When called upon, :meth:`cat` will see that the input is not a simple string, which
    will prompt it to look for ``_cat()`` method of the complex object and execute it.
    By default, if the user specifies any non-default arguments like ``text=False``,
    it will errors out because :meth:`cat` does not know how to handle it. Here's how
    to do it right::
        class custom2:
            def __init__(self, config=None): ...
            def _cat(self, kwargs): # default kwargs if user doesn't specify anything else is `{"text": True, "chunks": None, "sB": 0, "eB": -1}`
                if kwargs["text"]: return ["abc", "def"]
                else: return [b"abc", b"def"]
        custom2() |  cat()                     # returns ["abc", "def"]
        custom2() |  cat()           | item()  # returns "abc"
        custom2() | (cat()           | item()) # returns "abc"
        custom2() |  cat(text=False)           # returns [b"abc", b"def"]
        custom2() |  cat(text=False) | item()  # returns b"abc"
    Here, you're saying that your function can handle non-standard arguments, so
    :meth:`cat` will give you all the args. You may support only some arguments
    and completely ignore others, it's all up to you. Might still be worth it to
    throw some errors warning users of arguments your custom datatype does not support.
    You can also capture future clis like this::
        class custom3:
            def __init__(self, config=None): ...
            def _cat(self, ser):            # "ser" stands for "serial"
                if len(ser.clis) == 1 and isinstance(ser.clis[0], item):
                    return "123"            # fancy optimization
                return ["abc", "def"] | ser # default "slow" base case
        custom3() |  cat()           # returns ["abc", "def"]
        custom3() |  cat() | item()  # returns "abc", because cat() did not capture any clis
        custom3() | (cat() | item()) # returns "123", which might be desireable, up to you
    This feature is pretty advanced actually, in that you can actually do different
    things based on future processing tasks. Let's say that the captured cli looks like
    ``cut(3) | batched(10) | rItem(3)``. This essentially means "give me elements 30
    through 39 from the 4th column". With this information, you can query your custom
    database for exactly those elements only, while fetching nothing else, which would be
    great for performance.
    You can also outright lie to the user like in the example, where if :class:`~k1lib.cli.utils.item`
    is detected, it will return a completely different version ("123") while it should return
    "abc" instead. The possibilities are endless. As you can probably feel, it's super hard to
    actually utilize this in the right way without breaking something, as you are completely
    responsible for the captured clis. Let's see another example::
        class custom4:
            def __init__(self, config=None): ...
            def _cat(self, ser):
                return ["abc", "def"]
        custom4() |  cat()           # returns ["abc", "def"]
        custom4() |  cat() | item()  # returns "abc"
        custom4() | (cat() | item()) # returns ["abc", "def"]
    Same story as ``custom3``, demonstrating that if you declare that you want to see and
    manipulate ``ser``, you'll be completely responsible for it, and if you don't handle it
    correctly, it will create horrible bugs. You can, of course, have access to ``ser`` and
    ``kwargs`` at the same time::
        class custom5:
            def __init__(self, config=None): ...
            def _cat(self, ser, kwargs): return ["abc", "def"] | ser
    If your custom datatype feels like a block device, and you don't want to rewrite all
    the functionalities in :meth:`cat`, you can implement the method ``_catHandle`` that
    yields the custom file handle instead::
        class custom6:
            def __init__(self, config=None): ...
            def _catHandle(self): yield io.BytesIO(b"abc\\ndef\\n123")
        class custom7:
            def __init__(self, config=None): ...
            def _catHandle(self): yield io.StringIO("abc\\ndef\\n123")
        custom6() | cat(text=False) # returns b'abc\\ndef\\n123'
        custom7() | cat() | deref() # returns ['abc', 'def', '123']
    Remember that you have to yield instead of returning the file handle. This is so that
    you can use ``with`` statements, and if you return the file handle, the file might be
    closed by the time :meth:`cat` decides to use it.
:param fileName: if None, then return a :class:`~k1lib.cli.init.BaseCli`
    that accepts a file name and outputs Iterator[str]
:param text: if True, read text file, else read binary file
:param chunks: if True then reads the file chunk by chunk, else reads the
    entire file. Defaults to True in text mode and False in binary mode
:param profile: whether to profile the file reading rate or not. Can adjust
    printing frequency using `settings.cli.cat.every`
:param sB: "start byte". Specify this if you want to start reading from this byte
:param eB: "end byte", exclusive. Default -1 means end of file"""                # cat
    if chunks is None: chunks = True if text else False # dev note: if default arguments are changed, please also change _cat()'s implementation for custom datatypes # cat
    if profile and not chunks: warnings.warn(f"Can't profile reading rate when you're trying to read everything at once"); profile = False # cat
    f = _cat(text, chunks, sB, eB)                                               # cat
    if profile: f = f | Profile(text)                                            # cat
    return f if fileName is None else fileName | f                               # cat 
k1lib.settings.cli.cat.add("pickle", k1lib.Settings().add("safetyFactor", 2, "for when using `fn | (cat.pickle() | tail())` operation. Check out more docs at inp.catPickle() class"), "inp.cat.pickle() settings") # cat
[docs]class catPickle(BaseCli):                                                        # catPickle
[docs]    def __init__(self, pickleModule=None):                                       # catPickle
        """Reads a file as a series of pickled objects.
Example::
    "ab" | aS(dill.dumps) | file("test/catTest.pth")
    "cd" | aS(dill.dumps) >> file("test/catTest.pth") # append to the file
    # returns ["ab", "cd"]
    cat.pickle("test/catTest.pth") | deref()
    # also returns ["ab", "cd"], same style as :class:`cat`
    "test/catTest.pth" | cat.pickle() | deref()
The function ``cat.pickle`` internally uses this class, so don't worry about the discrepancy
in the examples. So this kinda relies on a trick that you can continuously dump pickled
objects to a single file and it would still work as usual::
    [1, 2, 3] | aS(dill.dumps) |  file("b.pth")
    [4, 5]    | aS(dill.dumps) >> file("b.pth")
    with open("b.pth", "rb") as f:
        objs = [dill.load(f), dill.load(f)]
    assert objs == [[1, 2, 3], [4, 5]]
.. admonition:: tail() optimization
    If you regularly append pickled objects to files, say you're trying to save
    complex time series data, and you want to grab the most recent data, you
    might do it like ``fn | cat.pickle() | tail(10)`` to get the last 10 objects.
    But normally, this means that you have to read through the entire file in order
    to just read the last bit of it. You can't exactly start close to the end as
    you don't know where the object boundaries are. Luckily, I got fed up with this
    so much that now when you do this: ``fn | (cat.pickle() | tail(10))`` (note the
    parenthesis!), it will only read the end of the file! This optimization is mostly
    airtight, but not entirely.
    How it works is that cat.pickle() tries to unpickle the first object in the file
    and grab the object's size in bytes. This info is used to make guesses as to where
    it should start reading the file from, which is ``endByte - tail.n * object_size * 2``
    That factor ``2`` is the safety factor, which should be a nice tradeoff between
    being airtight and being fast, configurable using ``settings.cli.cat.pickle.safetyFactor = 2``.
    This assumes that your objects are roughly similar in sizes. This of course, might
    be broken by you and thus, you might see only 5 results returned when you asked for
    10. In these scenarios, either increase the safetyFactor, or just don't use the
    optimized version by getting rid of the parenthesis: ``fn | cat.pickle() | tail(10)``
    Also, this optimization has only been tested on :mod:`dill` and :mod:`pickle`.
    :mod:`cloudpickle` has been tested to not work (magic numbers are a little different).
    I can potentially add in a mechanism for you to specify magic numbers for any pickler
    you want, but it feels niche, so I'm going to skip that for now.
:param pickleModule: pickle module to use. Python's default is "pickle", but
    I usually use :mod:`dill` because it's more robust"""                        # catPickle
        super().__init__(capture=True); self.pm = pickleModule or dill           # catPickle 
[docs]    def __ror__(self, fn):                                                       # catPickle
        pm = self.pm                                                             # catPickle
        def g(sB=0):                                                             # catPickle
            with open(os.path.expanduser(fn), "rb") as f:                        # catPickle
                f.seek(sB)                                                       # catPickle
                try:                                                             # catPickle
                    while True: yield pm.load(f)                                 # catPickle
                except EOFError: pass                                            # catPickle
        if len(self.capturedClis) >= 1:                                          # catPickle
            c0 = self.capturedClis[0]; s = os.path.getsize(fn)                   # catPickle
            if isinstance(c0, cli.tail) and s > 1e6 and isinstance(c0.n, int) and c0.n > 0: # if less than 1MB then it's not worth optimizing! # catPickle
                # figures out the size of the first element to guage the relative steps we need to take! # catPickle
                with open(os.path.expanduser(fn), "rb") as f: s1 = len(pm.dumps(pm.load(f)))*k1lib.settings.cli.cat.pickle.safetyFactor # multiply by 2 for safety factor, to make sure we've included all the data required by tail()! # catPickle
                return g((fn | cli.splitSeek(c=b"\x80\x04\x95", ws=[s-c0.n*s1, c0.n*s1]))[1] - 1) | self.capturedSerial # then execute the rest of the pipeline # catPickle
        return g() | self.capturedSerial                                         # catPickle  
def _catPickle(fileName=None, pickleModule=dill):                                # _catPickle
    """Reads a file as a series of pickled objects. See :class:`catPickle`"""    # _catPickle
    return catPickle(pickleModule) if fileName is None else fileName | catPickle(pickleModule) # _catPickle
cat.pickle = _catPickle                                                          # _catPickle
class splitSeekRes:                                                              # splitSeekRes
    def __init__(self, fn, res):                                                 # splitSeekRes
        """Return result of splitSeek(). We don't want to return just a list alone, because
we also want to pass along information like the file name"""                     # splitSeekRes
        self.fn = fn; self.res = res                                             # splitSeekRes
    def __getitem__(self, idx): return self.res[idx]                             # splitSeekRes
    def __len__(self): return len(self.res)                                      # splitSeekRes
    def __iter__(self): return iter(self.res)                                    # splitSeekRes
    def __repr__(self): return self.res.__repr__()                               # splitSeekRes
[docs]class splitSeek(BaseCli):                                                        # splitSeek
[docs]    def __init__(self, n=None, c=b'\n', ws=None):                                # splitSeek
        """Splits a file up into n fragments aligned to the closest character and return the seek points.
Example::
    # preparing the large file
    range(120) | apply(lambda x: f"{x:3}_56789") | file("test/largeFile.txt")
    # returns [0, 30, 70, 110, 150, 190, 230, 270, 300, 340]
    "test/largeFile.txt" | splitSeek(31) | head()
    # returns 32
    "test/largeFile.txt" | splitSeek(31) | shape(0)
    # returns 32, demonstrating you can also pipe file objects in, if that's what you want
    open("test/largeFile.txt") | splitSeek(31) | shape(0)
    # returns [0, 0, 10, 10, 20, 30, 30, 40, 40, 50], notice some segments have zero length
    "test/largeFile.txt" | splitSeek(200) | head()
    # returns [0, 400, 1200], demonstrating that you can split a file up unevenly by weights
    "test/largeFile.txt" | splitSeek(ws=[1, 2]) | deref()
So, the generated file has 120 lines in total. Each line is 10 bytes (9 for
the string, and 1 for the new line character). Splitting the file into 31
fragments will result in 32 seek points (:math:`p_i\quad i\in[1, n+1]`).
Also notice how the returned seek position is not the position of the character
themselves, but the character right after. So if we're splitting by the new line
character, then it will return the next character after the new line. This is
by default because the majority of the time, I just want it to split by lines.
But you might have other ideas on how to use this in mind. Then just subtract 1
from all returned seek points.
You can then use these seek points to read the file in multiple threads/processes
using :meth:`cat`, like this::
    # returns [['  0_56789', '  1_56789', '  2_56789'], ['  3_56789', '  4_56789', '  5_56789', '  6_56789']]
    "test/largeFile.txt" | splitSeek(31) | window(2) | ~apply(lambda sB, eB: cat("test/largeFile.txt", sB=sB, eB=eB-1)) | head(2) | deref()
Because :math:`120/31\\approx4`, most of cat's reads contain 4 lines, but some
has 3 lines. Also notice that the lines smoothly transitions between cat's
reads (``2_56789`` to ``3_56789``), so that's pretty nice. Just like with :meth:`cat`,
this also works with urls::
    "https://example.com" | splitSeek(10)
.. warning::
    You have to really test whether reading the same file from multiple processes is
    going to be really faster or not. If your data is stored in a HDD (aka hard drive,
    with spinning disks), then it will actually slow you down (10x-100x), because the
    disk will have to context switch all the time, and each switch has a 10ms cost.
    You also have to take into account collecting together the results of all processes,
    which can bottleneck the cpu. Read more about concurrency pitfalls at :class:`~k1lib.cli.modifier.applyMp`.
In some scenarios where you want to adjust the seek points even more, like when
you want to parse FASTA genome files, which has blocks of 2/4 lines each like this:
.. code-block:: text
    @FP200005993L1C001R00100000061/2
    TTTTAAACTTGCATTCTTTGGAGATTTGCTGAGTGTTGCTAGAGCTGGGAAACTTTTTTAATGAGATACGTGCATATTTTTCAAATTTACAGATCTTTTTTCACAAAAATAGAAAGTCATAAATGTGAAATGGAAACCTAAACAAGGCAA
    +
    GFEEEDEFGFFFFEFFFFFIFCEEEFFFGFFDFEGEDGFFFGDGFFGDGCE@GGGEEFDFGFGCFDFGGHCHFFFGFFFFFGEFDFFGHGFGEEHGFGEGFGFHFFEGFFFE;GEGEFGGHFFEI=GDAEDIFDDFGHFGEFGFEGGGGF
    @FP200005993L1C001R00100000167/2
    CTGGAATTTGGTATCTTATTGCCAAAGAATCTGTTTTGTGAAACTTGGGATCTCTATTTTAATGTTAATTCTGGTCAGTTGTGCCTAAACTCCATAAAGCAGGGACTATACTGAGGCGTATTCAATCTTCCTTCTTACCAAGGCCAGGAA
    +
    EEFECEDEFFCGFFFFFEEEGEGFEDECCEFEFDFEEFDFEDDFEFEEFDDFFEEFFEEFEFFHEEFEEFEFFDEFFFECF>FFFEFEDFCFFFEGFEDEEGDDFEEFEFGEEBD@EG>EEFFECEEGFEEEFFEDGEEEDE5EBDG:CC
Here, each 4 lines are (title, read, blank, quality). Because by default, this will
only split neatly along new lines, you will have to write extra functions to detect
if a particular seek point is desirable, and if not, either jump forward or backward
using :meth:`splitSeek.forward` and :meth:`splitSeek.backward`. To help with this,
:class:`refineSeek` has some useful methods that you might want to check out.
:param n: how many splits do you want?
:param c: block-boundary character, usually just the new line character
:param ws: weights. If given, the splits length ratios will roughly correspond to this""" # splitSeek
        self.n = n; self.c = c; self.ws = ws                                     # splitSeek
        if ws is None and n is None: raise Exception("Specify at least n or ws for splitSeek to work") # splitSeek 
[docs]    @staticmethod                                                                # splitSeek
    def forward(f, i:int, c=b'\n') -> int:                                       # splitSeek
        """Returns char location after the search char, going forward.
Example::
    f = io.BytesIO(b"123\\n456\\n789\\nabc")
    f | splitSeek(2) # returns [0, 4, 15]
    splitSeek.forward(f, 2) # returns 4
    splitSeek.forward(f, 3) # returns 4
    splitSeek.forward(f, 4) # returns 8
:param f: file handle
:param i: current seek point
:param c: block-boundary character"""                                            # splitSeek
        def inner(f):                                                            # splitSeek
            f.seek(i)                                                            # splitSeek
            while True:                                                          # splitSeek
                b = f.tell(); s = f.read(1000); di = s.find(c)                   # splitSeek
                if di > -1: return b + di + 1                                    # splitSeek
                if s == "": return -1                                            # splitSeek
        if isinstance(f, str):                                                   # splitSeek
            with openFile(os.path.expanduser(f), False) as _f: return inner(_f)  # splitSeek
        else: return inner(f)                                                    # splitSeek 
[docs]    @staticmethod                                                                # splitSeek
    def backward(f, i:int, c=b'\n') -> int:                                      # splitSeek
        """Returns char location after the search char, going backward.
Example::
    f = io.BytesIO(b"123\\n456\\n789\\nabc")
    f | splitSeek(2) # returns [0, 4, 15]
    splitSeek.backward(f, 5) # returns 4
    splitSeek.backward(f, 4) # returns 4
    splitSeek.backward(f, 3) # returns 0
:param f: file handle
:param i: current seek point
:param c: block-boundary character"""                                            # splitSeek
        def inner(f):                                                            # splitSeek
            mul = 1                                                              # splitSeek
            while True:                                                          # splitSeek
                begin = max(i-1000*mul, 0); end = max(i-1000*(mul-1), 0); mul += 1 # search range # splitSeek
                f.seek(begin); b = f.tell(); s = f.read(end-begin); di = s.rfind(c) # splitSeek
                if di > -1: return b + di + 1                                    # splitSeek
                if b == 0: return 0                                              # splitSeek
        if isinstance(f, str):                                                   # splitSeek
            with openFile(os.path.expanduser(f), False) as _f: return inner(_f)  # splitSeek
        else: return inner(f)                                                    # splitSeek 
[docs]    def __ror__(self, fn): # why return splitSeekRes instead of the seek positions as a list directly? Because we want to pass along dependencies like file name to downstream processes like refineSeek # splitSeek
        if isinstance(fn, str): fn = os.path.expanduser(fn)                      # splitSeek
        n = self.n; c = self.c; ws = self.ws                                     # splitSeek
        def func(f): # f is a file-like object                                   # splitSeek
            f.seek(0, os.SEEK_END); end = f.tell()                               # splitSeek
            if ws is None: begins = range(n) | cli.apply(lambda x: int(x*end/n)) # splitSeek
            else: begins = range(end) | cli.splitW(*ws) | cli.apply(lambda x: x.start) # splitSeek
            return [*begins | cli.apply(lambda x: splitSeek.backward(f, x, c)), end] # splitSeek
        if isinstance(fn, str):                                                  # splitSeek
            with openFile(os.path.expanduser(fn), False, True) as f: return splitSeekRes(fn, func(f)) # splitSeek
        else: return splitSeekRes(fn, func(fn))                                  # splitSeek  
[docs]class refineSeek(BaseCli):                                                       # refineSeek
[docs]    def __init__(self, f=None, window=1):                                        # refineSeek
        """Refines seek positions.
Example::
    # returns list of integers for seek positions
    "abc.txt" | splitSeek(30)
    # returns refined seek positions, such that the line starting at the seek positions starts with "@"
    "abc.txt" | splitSeek(30) | refineSeek(lambda x: x.startswith(b"@"))
    # same thing as above
    "abc.txt" | splitSeek(30) | refineSeek(lambda x: x[0] == b"@"[0])
    # returns refined seek positions, such that 0th line starts with "@" and 2nd line starts with "+". This demonstrates `window` parameter
    "abc.txt" | splitSeek(30) | refineSeek(lambda x: x[0][0] == b"@"[0] and x[2][0] == b"+"[0], 3)
    # same thing as above, demonstrating some builtin refine seek functions
    "abc.txt" | splitSeek(30) | refineSeek.fastq()
:param f: function that returns True if the current line/lines is a valid block boundary
:param window: by default (1), will fetch 1 line and check boundary using ``f(line)``.
    If a value greater than 1 is passed (for example, 3), will fetch 3 lines and check
    boundary using ``f([line1, line2, line3])``
"""                                                                              # refineSeek
        if f is None: f = lambda x: True                                         # refineSeek
        self.f = cli.fastF(f); self.window = window; self.fn = None              # refineSeek 
[docs]    def __ror__(self, seeks):                                                    # refineSeek
        f = self.f; window = self.window                                         # refineSeek
        def read(fio, sB:int):                                                   # refineSeek
            fio.seek(sB)                                                         # refineSeek
            if window == 1: return fio.readline()                                # refineSeek
            return list(cli.repeatF(lambda: fio.readline(), window))             # refineSeek
        if len(seeks) <= 2: return seeks                                         # refineSeek
        fn = self.fn or seeks.fn; newSeeks = [seeks[0]]                          # refineSeek
        def process(fio):                                                        # refineSeek
            with openFile(fn, False) as fio:                                     # refineSeek
                for seek in seeks[1:-1]:                                         # refineSeek
                    line = read(fio, seek)                                       # refineSeek
                    while not f(line): seek = splitSeek.forward(fio, seek); line = read(fio, seek) # refineSeek
                    newSeeks.append(seek)                                        # refineSeek
            newSeeks.append(seeks[-1]); return newSeeks                          # refineSeek
        if isinstance(fn, str):                                                  # refineSeek
            with openFile(fn, False) as fio: return process(fio)                 # refineSeek
        else: return process(fn)                                                 # refineSeek 
[docs]    def injectFn(self, fn):                                                      # refineSeek
        """Injects file name dependency, if this is not used right after :class:`splitSeek`""" # refineSeek
        self.fn = fn; return self                                                # refineSeek 
[docs]    @classmethod                                                                 # refineSeek
    def fastq(cls):                                                              # refineSeek
        """Refine fastq file's seek points"""                                    # refineSeek
        return cls(lambda x: x[0][0] == b"@"[0] and x[2][0] == b"+"[0], 3)       # refineSeek  
[docs]def wget(url:str, fileName:str=None, mkdir=True):                                # wget
    """Downloads a file. Also returns the file name, in case you want to pipe it
to something else.
:param url: The url of the file
:param fileName: if None, then tries to infer it from the url
:param mkdir: whether to make the directory leading up to the file or not"""     # wget
    if fileName is None: fileName = url.split("/")[-1]                           # wget
    fileName = os.path.expanduser(fileName); dirname = os.path.dirname(fileName) # wget
    if mkdir: os.makedirs(dirname, exist_ok=True)                                # wget
    try: urllib.request.urlretrieve(url, fileName); return fileName              # wget
    except: return None                                                          # wget 
[docs]def ls(folder:str=None):                                                         # ls
    """List every file and folder inside the specified folder.
Example::
    # returns List[str]
    ls("/home")
    # same as above
    "/home" | ls()
    # only outputs files, not folders
    ls("/home") | filt(os.path.isfile)
This can handle things that are not plain folders. For example,
it can handle zip file whereby it will list out all the files contained
within a particular .zip::
    ls("abc.zip")
Then, you can use :meth:`cat` as usual, like this::
    ls("abc.zip") | item() | cat()
Pretty nice!
.. admonition:: Custom datatype
    It is possible to build objects that can interoperate with this cli,
    like this::
        class sql:
            def __init__(self): ...
            def _ls(self): return ["something", "here"]
    ls() will identify that what's inputted is not a string, and will try
    to execute the object's "_ls()" method, so you can just simply implement
    it in your classes
"""                                                                              # ls
    if folder is None: return _ls()                                              # ls
    else: return folder | _ls()                                                  # ls 
class _ls(BaseCli):                                                              # _ls
    def _typehint(self, ignored=None): return tList(str)                         # _ls
    def __ror__(self, path:str):                                                 # _ls
        if isinstance(path, str):                                                # _ls
            path = os.path.expanduser(path.rstrip(os.sep))                       # _ls
            if os.path.exists(path):                                             # _ls
                if os.path.isfile(path):                                         # _ls
                    if cat(path, False, eB=2) == b"PK": # list subfiles in a zip file # _ls
                        return [ZipWrapper(e, path) for e in zipfile.ZipFile(path).infolist()] # _ls
                    else: raise Exception(f"{path} is a file, not a folder, so can't list child directories") # _ls
                else: return [f"{path}{os.sep}{e}" for e in os.listdir(path)]    # _ls
            else: return []                                                      # _ls
        else: return path._ls()                                                  # _ls
k1lib.settings.cli.add("quiet", False, "whether to mute extra outputs from clis or not") # _ls
newline = b'\n'[0]                                                               # _ls
class lazySt:                                                                    # lazySt
    def __init__(self, st, text:bool):                                           # lazySt
        """Converts byte stream into lazy text/byte stream, with nice __repr__.""" # lazySt
        self.st = st; self.text = text;                                          # lazySt
    def __iter__(self):                                                          # lazySt
        if self.text:                                                            # lazySt
            while True:                                                          # lazySt
                line = self.st.readline()                                        # lazySt
                if len(line) == 0: break                                         # lazySt
                yield line.decode().rstrip("\n")                                 # lazySt
        else:                                                                    # lazySt
            while True:                                                          # lazySt
                line = self.st.readline()                                        # lazySt
                if len(line) == 0: break                                         # lazySt
                yield line                                                       # lazySt
    def __repr__(self): self | cli.stdout(); return ""                           # lazySt
def executeCmd(cmd:str, inp:bytes, text):                                        # executeCmd
    """Runs a command, and returns stdout and stderr streams"""                  # executeCmd
    p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=k1lib.settings.wd) # executeCmd
    if inp is not None:                                                          # executeCmd
        if isinstance(inp, (str, bytes)): p.stdin.write(inp if isinstance(inp, bytes) else inp.encode()) # executeCmd
        else:                                                                    # executeCmd
            for e in inp:                                                        # executeCmd
                if not isinstance(e, (str, bytes)): e = str(e)                   # executeCmd
                if not isinstance(e, bytes): e = e.encode()                      # executeCmd
                p.stdin.write(e); p.stdin.write(b"\n")                           # executeCmd
    p.stdin.close(); return p, lazySt(p.stdout, text), lazySt(p.stderr, text)    # executeCmd
def printStderr(err):                                                            # printStderr
    if not k1lib.settings.cli.quiet:                                             # printStderr
        e, it = err | cli.peek()                                                 # printStderr
        if it != []: it | cli.insert("\nError encountered:\n") | cli.apply(k1lib.fmt.txt.red) | cli.stdout() # printStderr
[docs]def requireCli(cliTool:str):                                                     # requireCli
    """Searches for a particular cli tool (eg. "ls"), throws ImportError if not
found, else do nothing"""                                                        # requireCli
    a = cmd(cliTool); None | a;                                                  # requireCli
    if len(a.err) > 0: raise ImportError(f"""Can't find cli tool {cliTool}. Please install it first.""") # requireCli 
[docs]class cmd(BaseCli):                                                              # cmd
[docs]    def __init__(self, cmd:str, mode:int=1, text=True, block=False): # 0: return (stdout, stderr). 1: return stdout, 2: return stderr # cmd
        """Runs a command, and returns the output line by line. Can pipe in some
inputs. If no inputs then have to pipe in :data:`None`. Example::
    # return detailed list of files
    None | cmd("ls -la")
    # return list of files that ends with "ipynb"
    None | cmd("ls -la") | cmd('grep ipynb$')
It might be tiresome to pipe in :data:`None` all the time. So, you can use ">"
operator to yield values right away::
    # prints out first 10 lines of list of files
    cmd("ls -la") > headOut()
If you're using Jupyter notebook/lab, then if you were to display a :class:`cmd`
object, it will print out the outputs. So, a single command ``cmd("mkdir")``
displayed at the end of a cell is enough to trigger creating the directory.
Reminder that ">" operator in here sort of has a different meaning to that of
:class:`~k1lib.cli.init.BaseCli`. So you kinda have to becareful about this::
    # returns a serial cli, cmd not executed
    cmd("ls -la") | deref()
    # executes cmd with no input stream and pipes output to deref
    cmd("ls -la") > deref()
    # returns a serial cli
    cmd("ls -la") > grep("txt") > headOut()
    # executes pipeline
    cmd("ls -la") > grep("txt") | headOut()
General advice is, right ater a :class:`cmd`, use ">", and use "|" everywhere else.
Let's see a few more exotic examples. File ``a.sh``:
.. code-block:: bash
    #!/bin/bash
    echo 1; sleep 0.5
    echo This message goes to stderr >&2
    echo 2; sleep 0.5
    echo $(</dev/stdin)
    sleep 0.5; echo 3
Examples::
    # returns [b'1\\n', b'2\\n', b'45\\n', b'3\\n'] and prints out the error message
    "45" | cmd("./a.sh", text=False) | deref()
    # returns [b'This message goes to stderr\\n']
    "45" | cmd("./a.sh", mode=2, text=False) | deref()
    # returns [[b'1\\n', b'2\\n', b'45\\n', b'3\\n'], [b'This message goes to stderr\\n']]
    "45" | cmd("./a.sh", mode=0, text=False) | deref()
Performance-wise, stdout and stderr will yield values right away as soon
as the process outputs it, so you get real time feedback. However, this will
convert the entire input into a :class:`bytes` object, and not feed it bit by
bit lazily, so if you have a humongous input, it might slow you down a little.
Also, because stdout and stderr yield values right away, it means that if you
want the operation to be blocking until finished, you have to consume the output::
    None | cmd("mkdir abc")
    # might fail, because this might get executed before the previous line
    None | cmd("echo a>abc/rg.txt")
    None | cmd("mkdir abc") | ignore()
    # will succeed, because this will be guaranteed to execute after the previous line
    None | cmd("echo a>abc/rg.txt")
Settings:
- cli.quiet: if True, won't display errors in mode 1
:param mode: if 0, returns ``(stdout, stderr)``. If 1, returns ``stdout`` and prints
    ``stderr`` if there are any errors. If 2, returns ``stderr``
:param text: whether to decode the outputs into :class:`str` or return raw :class:`bytes`
:param block: whether to wait for the task to finish before returning to Python or not""" # cmd
        super().__init__(); self.cmd = cmd; self.mode = mode                     # cmd
        self.text = text; self.block = block; self.ro = k1lib.RunOnce()          # cmd 
    def _typehint(self, ignored=None):                                           # cmd
        t = tIter(str) if self.text else tIter(bytes)                            # cmd
        if self.mode == 0: return tCollection(t, t)                              # cmd
        return t                                                                 # cmd
[docs]    def __ror__(self, it:Union[None, str, bytes, Iterator[Any]]) -> Iterator[Union[str, bytes]]: # cmd
        """Pipes in lines of input, or if there's nothing to
pass, then pass None"""                                                          # cmd
        if not self.ro.done(): self.p, self.out, self.err = executeCmd(self.cmd, it, self.text); mode = self.mode # cmd
        if self.block:                                                           # cmd
            self.out = self.out | cli.deref()                                    # cmd
            self.err = self.err | cli.deref()                                    # cmd
        if mode == 0: return (self.out, self.err)                                # cmd
        elif mode == 1:                                                          # cmd
            threading.Thread(target=lambda: printStderr(self.err)).start()       # cmd
            return self.out                                                      # cmd
        elif mode == 2: return self.err                                          # cmd 
    def __gt__(self, it): return None | self | it                                # cmd
    def __repr__(self):                                                          # cmd
        return (None | self).__repr__()                                          # cmd 
[docs]class walk(BaseCli):                                                             # walk
[docs]    def __init__(self, **kwargs):                                                # walk
        """Recursively get all files inside a dictionary.
Example::
    # prints out first 10 files
    "." | walk() | headOut()"""                                                  # walk
        self.kwargs = kwargs                                                     # walk 
[docs]    def __ror__(self, path):                                                     # walk
        return os.walk(path, **self.kwargs) | ~cli.apply(lambda x, y, z: z | cli.apply(lambda e: x + os.sep + e)) | cli.joinStreams() # walk  
[docs]def urlPath(base:str, host:bool=True):                                           # urlPath
    """Translates from a url to a file path.
Example::
    base = "~/ssd/some/other/path"
    url = "http://example.com/some/path/to/file.txt"
    url | urlPath(base) # returns "~/ssd/some/other/path/example_com/some/path/to/file.txt"
    url | urlPath(base, False) # returns "~/ssd/some/other/path/some/path/to/file.txt"
:param base: base directory you want the files to be in"""                       # urlPath
    base = base.rstrip("/\\")                                                    # urlPath
    def inner(url):                                                              # urlPath
        p = urllib.parse.urlparse(url)                                           # urlPath
        a = (p.netloc.replace(".", "_") + os.sep) if host else ""                # urlPath
        return f"{base}{os.sep}{a}{p.path.strip('/')}".replace("//", "/")        # urlPath
    return cli.aS(inner)                                                         # urlPath 
import bz2, gzip, zlib                                                           # urlPath
[docs]class kzip(BaseCli):                                                             # kzip
[docs]    def __init__(self, fmt="gz"):                                                # kzip
        """Incrementally compresses a stream of data using gzip or bzip2.
Example::
    data = range(100) | apply(lambda x: str(x).encode()) | deref() # list of bytes
    data | kzip() | deref() # returns list of bytes
    range(100) | apply(str) | kzip() | deref() # returns list of bytes, demonstrating piping iterator of string works
Quick reminder that if you pass in bytes iterator then it will be compressed as-is.
But if you pass in string iterator then it will append a new line character to each
string and then compress it. This is more intuitive, and it makes the style consistent
with :class:`~k1lib.cli.output.file`.
Passing in single string or byte is ok too::
    "0123456789" | kzip()  # returns gz bytes
    b"0123456789" | kzip() # returns gz bytes, exact same pattern as above
Why "kzip" and not just "zip"? Because "zip" is a builtin python keyword.
See also: :class:`kunzip`
:param fmt: desired compressed format. Currently only supports "gz" or "bz2"
"""                                                                              # kzip
        fmt = fmt.strip(".")                                                     # kzip
        if fmt == "gzip" or fmt == "gz":                                         # kzip
            self.o = zlib.compressobj(wbits=31)                                  # kzip
        elif fmt == "bzip2" or fmt == "bz2":                                     # kzip
            self.o = bz2.BZ2Compressor()                                         # kzip
        else: raise Exception(f"File type {fmt} not supported. Specify either 'gz' or 'bz2'") # kzip 
[docs]    def __ror__(self, it):                                                       # kzip
        o = self.o                                                               # kzip
        def gen():                                                               # kzip
            for e in it:                                                         # kzip
                if isinstance(e, str): e = f"{e}\n".encode()                     # kzip
                res = o.compress(e)                                              # kzip
                if res: yield res                                                # kzip
            try:                                                                 # kzip
                res = o.flush()                                                  # kzip
                if res: yield res                                                # kzip
            except: pass                                                         # kzip
        if isinstance(it, str): it = it.encode()                                 # kzip
        if isinstance(it, bytes): return [o.compress(it), o.flush()] | cli.filt("x") | cli.aS(b"".join) # kzip
        else: return gen()                                                       # kzip  
class decompressobj: # .gz or .bz2 file                                          # decompressobj
    """
Why not just use zlib.decompressobj() directly? I encountered a strange bug when trying
to decompress CommonCrawl's gzip files, posted on Stack Overflow and got help from none
other than Mark Adler, creator of gzip and zlib. That guy's super active on Stack Overflow.
Anyway, this code here is a modified version of his code. Here's the discussion:
https://stackoverflow.com/questions/76452480/pythons-zlib-doesnt-work-on-commoncrawl-file
"""                                                                              # decompressobj
    def __init__(self, gz=True): self.gz = gz                                    # decompressobj
    def __ror__(self, it):                                                       # decompressobj
        gz = self.gz; data = left = b''; o = zlib.decompressobj(zlib.MAX_WBITS|32) if gz else bz2.BZ2Decompressor() # decompressobj
        for got in it:                                                           # decompressobj
            yield o.decompress(left + got); left = b''                           # decompressobj
            if o.eof: left = o.unused_data; o = zlib.decompressobj(zlib.MAX_WBITS|32) if gz else bz2.BZ2Decompressor() # decompressobj
            if len(got) == 0 and len(left) == 0: break                           # decompressobj
stream_unzip = k1lib.dep("stream_unzip")                                         # decompressobj
class decompressobjZip: # .zip files                                             # decompressobjZip
    def __ror__(self, it):                                                       # decompressobjZip
        for a, b, c in stream_unzip.stream_unzip(it): yield from c; return       # decompressobjZip
[docs]class kunzip(BaseCli):                                                           # kunzip
[docs]    def __init__(self, text=False):                                              # kunzip
        """Incrementally decompress a stream of data using gzip or bzip2.
Example::
    # returns an iterator of bytes. cat() command can shorten to cat("someFile.gz", False, True)
    cat("someFile.gz", text=False, chunks=True) | unzip()
    # incrementally fetches remote file, then incrementally unzips it
    cat("https://example.com/someFile.gz", False, True) | unzip()
    data = range(100) | apply(lambda x: str(x).encode()) | deref() # list of bytes
    data | kzip() | unzip() | deref() # returns original data in list of bytes. May split the bytes at different positions though
How does it know which algorithm to pick to decompress? It looks at the
first few bytes of the file for its magic number. Also, if you're expecting
a text file after decompression, you can do this to get the lines directly::
    # returns iterator of strings
    cat("https://example.com/someFile.gz", False, True) | unzip(True)
One more thing. If you're planning to use this to download whole files, you might
want to tune the chunk size in :data:`~k1lib.settings`.cli.cat.chunkSize as
it might speed things up considerably to raise it. Or may be you should just
use wget instead =))
More examples of the different styles to use this cli::
    ["abc"] | kzip() | unzip(True) | deref() # returns ["abc"]
     "abc"  | kzip() | unzip(True)           # returns  "abc"
     "abc"  | kzip() | unzip()               # returns b"abc"
See also: :class:`kzip`.
.. admonition:: Reading files directly
    The original purpose of this cli is to incrementally decompress a byte stream.
    But a lot of time, that byte stream is coming from a file, and typing out
    ``cat(fn, False, True)`` to generate a byte stream to feed into this cli is
    tedious, so instead, you can pipe in the file name and this will just unzips
    it and return the string/byte stream to you::
        "abc.gz"  | unzip() # returns string iterator
        "abc.bz2" | unzip() # same
        "abc.zip" | unzip() # returns string iterator from the first subfile only. All subsequent subfiles are ignored
.. admonition:: .zip files
    This can also work with subfiles within .zip files, like this::
        "abc.zip" | unzip()                   # unzips the first subfile
        "abc.zip" | ls()                      # lists out all subfiles
        "abc.zip" | ls() | rItem(2) | unzip() # unzips the 3rd subfile
:param text: whether to yield string lines or bytes"""                           # kunzip
        self.text = text                                                         # kunzip 
[docs]    def __ror__(self, it):                                                       # kunzip
        def gen(it): # generates a stream of bytes                               # kunzip
            it = iter(it); e = next(it)                                          # kunzip
            if e[:2] == b"\x1f\x8b": o = decompressobj(True) # .gz               # kunzip
            elif e[:3] == b"BZh": o = decompressobj(False)    # .bz2             # kunzip
            elif e[:2] == b"PK": o = decompressobjZip()      # .zip              # kunzip
            else: raise Exception("Can't infer the file type (whether gz or bz2) of this file") # kunzip
            yield from [[e], it] | cli.joinStreams() | o                         # kunzip
        single = False                                                           # kunzip
        if isinstance(it, str): return cat(it, False, True) | kunzip(self.text) # special case for reading file names directly # kunzip
        if isinstance(it, ZipWrapper): return it | cat(text=self.text, chunks=True) # special case for .zip subfile # kunzip
        if isinstance(it, bytes): single = True; it = [it]                       # kunzip
        if self.text:                                                            # kunzip
            def gen2(it): # generates a stream of strings                        # kunzip
                last = b"" # last split is probably not the right line boundary, so gotta do this # kunzip
                for e in gen(it):                                                # kunzip
                    e = last + e; splits = e.split(b"\n")                        # kunzip
                    for line in splits[:-1]: yield line.decode()                 # kunzip
                    last = splits[-1]                                            # kunzip
                yield last.decode()                                              # kunzip
            res = gen2(it)                                                       # kunzip
        else: res = gen(it)                                                      # kunzip
        if not single: return res                                                # kunzip
        else: return "\n".join(res) if self.text else b"".join(res)              # kunzip  
unzip = kunzip                                                                   # kunzip