Source code for k1lib.cli.inp

# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""This module for tools that will likely start the processing stream."""
from typing import Iterator, Union, Any
import k1lib, urllib, subprocess, warnings, os, k1lib, threading, time, warnings, math, io, dill, urllib
from k1lib.cli import BaseCli; import k1lib.cli as cli
from k1lib.cli.typehint import *
try: import minio; hasMinio = True
except: hasMinio = False
__all__ = ["cat", "splitSeek", "refineSeek", "curl", "wget", "ls", "cmd", "walk", "requireCli", "urlPath"]
settings = k1lib.settings.cli
catSettings = k1lib.Settings().add("chunkSize", 100000, "file reading chunk size for binary+chunk mode. Decrease it to avoid wasting memory and increase it to avoid disk latency")
catSettings.add("every", k1lib.Settings().add("text", 1000, "for text mode, will print every n lines").add("binary", 10, "for binary mode, will print every n 100000-byte blocks"), "profiler print frequency")
settings.add("cat", catSettings, "inp.cat() settings")
def _catGenText(fn, sB, eB): # fn for "file name"
    try:
        if sB == 0 and eB == -1: # fast path without bounds (90-160 MB/s expected)
            with open(fn) as f:
                while True:
                    line = f.readline()
                    if line == "": return
                    yield line[:-1] if line[-1] == "\n" else line
        else: # slow path with bounds (15 MB/s expected). Update: much faster now, expect only 40% slower than the path above
            sB = wrap(fn, sB); eB = wrap(fn, eB)
            with open(fn) as f:
                f.seek(sB); b = sB # current byte
                while True:
                    line = f.readline(); b += len(line)
                    if line == "": return
                    if b > eB: yield line[:len(line)-(b-eB)]; return
                    yield line[:-1] if line[-1] == "\n" else line
    except FileNotFoundError: pass
def _catGenBin(fn, sB, eB):
    chunkSize = settings.cat.chunkSize; sB = wrap(fn, sB); eB = wrap(fn, eB); nB = eB - sB # number of bytes to read total
    with open(fn, "rb") as f:
        f.seek(sB); nChunks = math.ceil(nB / chunkSize); lastChunkSize = nB - chunkSize*(nChunks-1)
        yield from range(nChunks) | cli.applyTh(lambda i: f.read(chunkSize) if i < nChunks-1 else f.read(chunkSize)[:lastChunkSize], prefetch=10)
def fileLength(fn):
    with open(fn, 'rb') as f: return f.seek(0, os.SEEK_END)
def wrap(fn, b): return b if b >= 0 else b + fileLength(fn) + 1
class _cat(BaseCli):
    def __init__(self, text, chunks, sB, eB): self.text = text; self.chunks = chunks; self.sB = sB; self.eB = eB
    def _typehint(self, ignored=None):
        if self.text: return tIter(str) if self.chunks else tList(str)
        else: return tIter(bytes) if self.chunks else bytes
    def __ror__(self, fn:str) -> Union[Iterator[str], bytes]:
        text = self.text; chunks = self.chunks; sB = self.sB; eB = self.eB
        fn = os.path.expanduser(fn)
        if text and chunks and k1lib._settings.packages.k1a:
            return k1lib._k1a.k1a.StrIterCat(fn, sB, eB)
        if chunks: return _catGenText(fn, sB, eB) if text else _catGenBin(fn, sB, eB)
        sB = wrap(fn, sB); eB = wrap(fn, eB)
        if text:
            with open(fn) as f: f.seek(sB); return f.read(eB-sB).splitlines()
        else:
            with open(fn, "rb") as f: f.seek(sB); return f.read(eB-sB)
class Profile(BaseCli):
    def __init__(self, text): self.data = []; self.text = text
    def __ror__(self, it):
        fmt = k1lib.fmt; chars = 0; beginTime = time.time()
        if self.text:
            a, b, c, d, f = k1lib.ConstantPad.multi(5); every = settings.cat.every.text
            for lines, e in enumerate(it):
                chars += len(e)
                if lines % every == 0: # every 1000 lines, print stuff out
                    elapsed = time.time() - beginTime#; self.data.append([lines, chars, elapsed])
                    print(f"Current line: {fmt.item(lines) | a} ({fmt.item(lines/elapsed) | b} lines/s), current byte/chars: {fmt.size(chars) | c} ({fmt.size(chars/elapsed) | d}/s), elapsed: {fmt.time(elapsed) | f}                                 ", end="\r")
                yield e
        else:
            a, b, c = k1lib.ConstantPad.multi(3); every = settings.cat.every.binary
            for i, e in enumerate(it):
                chars += len(e)
                if i % every == 0: # every 10 100000-byte chunks, print stuff out
                    elapsed = time.time() - beginTime#; self.data.append([chars, elapsed])
                    print(f"Current size/chars: {fmt.size(chars) | a} ({fmt.size(chars/elapsed) | b}/s), elapsed: {fmt.time(elapsed) | c}                                 ", end="\r")
                yield e
[docs]def cat(fileName:str=None, text:bool=True, chunks:bool=None, profile:bool=False, sB=0, eB=-1): """Reads a file line by line. Example:: # display first 10 lines of file cat("file.txt") | headOut() # piping in also works "file.txt" | cat() | headOut() # read bytes from an image file and dumps it to another file cat("img.png", False) | file("img2.png") If you want to read only specific sections of the file, you can specify the start (``sB``) and end byte (``eB``) like this:: "123456\\n89" | file("test/catTest.pth") # returns ['3456', '8'] cat("test/catTest.pth", sB=2, eB=8) | deref() settings.cat.context.chunkSize=3 # just for demonstration, don't do it normally # returns [b'123', b'456', b'\\n8'] cat("test/catTest.pth", text=False, chunks=True, eB=8) | deref() If you are working with large files and would like to read 1 file from multiple threads/processes, then you can use this cli in conjunction with :class:`splitSeek`. If you are dumping multiple pickled objects into a single file, you can read all of them using :meth:`cat.pickle`. This cli has lots of settings at :data:`~k1lib.settings`.cli.cat :param fileName: if None, then return a :class:`~k1lib.cli.init.BaseCli` that accepts a file name and outputs Iterator[str] :param text: if True, read text file, else read binary file :param chunks: if True then reads the file chunk by chunk, else reads the entire file. Defaults to True in text mode and False in binary mode :param profile: whether to profile the file reading rate or not. Can adjust printing frequency using `settings.cli.cat.every` :param sB: "start byte". Specify this if you want to start reading from this byte :param eB: "end byte", exclusive. Default -1 means end of file""" if chunks is None: chunks = True if text else False if profile and not chunks: warnings.warn(f"Can't profile reading rate when you're trying to read everything at once"); profile = False f = _cat(text, chunks, sB, eB) if profile: f = f | Profile(text) return f if fileName is None else fileName | f
def _catPickle(fileName=None, pickleModule=dill): """Reads a file as a series of pickled objects. Example:: "ab" | aS(dill.dumps) | file("test/catTest.pth") "cd" | aS(dill.dumps) >> file("test/catTest.pth") # append to the file # returns ["ab", "cd"] cat.pickle("test/catTest.pth") | deref() # also returns ["ab", "cd"], same style as :class:`cat` "test/catTest.pth" | cat.pickle() | deref() :param fileName: name of the pickled file :param pickleModule: pickle module to use. Python's default is "pickle", but I usually use :mod:`dill` because it's more robust""" def gen(fn): with open(os.path.expanduser(fn), "rb") as f: try: while True: yield dill.load(f) except: pass return cli.aS(gen) if fileName is None else fileName | cli.aS(gen) cat.pickle = _catPickle
[docs]class splitSeek(BaseCli):
[docs] def __init__(self, n=None, c=b'\n', ws=None): """Splits a file up into n fragments aligned to the closest character and return the seek points. Example:: # preparing the large file range(120) | apply(lambda x: f"{x:3}_56789") | file("test/largeFile.txt") # returns [0, 30, 70, 110, 150, 190, 230, 270, 300, 340] "test/largeFile.txt" | splitSeek(31) | head() # returns 32 "test/largeFile.txt" | splitSeek(31) | shape(0) # returns 32, demonstrating you can also pipe file objects in, if that's what you want open("test/largeFile.txt") | splitSeek(31) | shape(0) # returns [0, 0, 10, 10, 20, 30, 30, 40, 40, 50], notice some segments have zero length "test/largeFile.txt" | splitSeek(200) | head() # returns [0, 400, 1200], demonstrating that you can split a file up unevenly by weights "test/largeFile.txt" | splitSeek(ws=[1, 2]) | deref() So, the generated file has 120 lines in total. Each line is 10 bytes (9 for the string, and 1 for the new line character). Splitting the file into 31 fragments will result in 32 seek points (:math:`p_i\quad i\in[1, n+1]`). You can then use these seek points to read the file in multiple threads/processes using :meth:`cat`, like this:: # returns [[' 0_56789', ' 1_56789', ' 2_56789'], [' 3_56789', ' 4_56789', ' 5_56789', ' 6_56789']] "test/largeFile.txt" | splitSeek(31) | splitSeek.window() | ~apply(lambda sB, eB: cat("test/largeFile.txt", sB=sB, eB=eB)) | head(2) | deref() Because :math:`120/31\\approx4`, most of cat's reads contain 4 lines, but some has 3 lines. Also notice that the lines smoothly transitions between cat's reads (``2_56789`` to ``3_56789``), so that's pretty nice. .. warning:: You have to really test whether reading the same file from multiple processes is going to be really faster or not. If your data is stored in a HDD (aka hard drive, with spinning disks), then it will actually slow you down (10x-100x), because the disk will have to context switch all the time, and each switch has a 10ms cost. You also have to take into account collecting together the results of all processes, which can bottleneck the cpu. Read more about concurrency pitfalls at :class:`~k1lib.cli.modifier.applyMp`. In some scenarios where you want to adjust the seek points even more, like when you want to parse FASTA genome files, which has blocks of 2/4 lines each like this: .. code-block:: text @FP200005993L1C001R00100000061/2 TTTTAAACTTGCATTCTTTGGAGATTTGCTGAGTGTTGCTAGAGCTGGGAAACTTTTTTAATGAGATACGTGCATATTTTTCAAATTTACAGATCTTTTTTCACAAAAATAGAAAGTCATAAATGTGAAATGGAAACCTAAACAAGGCAA + GFEEEDEFGFFFFEFFFFFIFCEEEFFFGFFDFEGEDGFFFGDGFFGDGCE@GGGEEFDFGFGCFDFGGHCHFFFGFFFFFGEFDFFGHGFGEEHGFGEGFGFHFFEGFFFE;GEGEFGGHFFEI=GDAEDIFDDFGHFGEFGFEGGGGF @FP200005993L1C001R00100000167/2 CTGGAATTTGGTATCTTATTGCCAAAGAATCTGTTTTGTGAAACTTGGGATCTCTATTTTAATGTTAATTCTGGTCAGTTGTGCCTAAACTCCATAAAGCAGGGACTATACTGAGGCGTATTCAATCTTCCTTCTTACCAAGGCCAGGAA + EEFECEDEFFCGFFFFFEEEGEGFEDECCEFEFDFEEFDFEDDFEFEEFDDFFEEFFEEFEFFHEEFEEFEFFDEFFFECF>FFFEFEDFCFFFEGFEDEEGDDFEEFEFGEEBD@EG>EEFFECEEGFEEEFFEDGEEEDE5EBDG:CC Here, each 4 lines are (title, read, blank, quality). Because by default, this will only split neatly along new lines, you will have to write extra functions to detect if a particular seek point is desirable, and if not, either jump forward or backward using :meth:`splitSeek.forward` and :meth:`splitSeek.backward`. :param n: how many splits do you want? :param c: block-boundary character, usually just the new line character :param ws: weights. If given, the splits length ratios will roughly correspond to this""" self.n = n; self.c = c; self.ws = ws; self.fn = None; self.res = None # file name, result if ws is None and n is None: raise Exception("Specify at least n or ws for splitSeek to work")
[docs] @staticmethod def forward(f, i:int, c=b'\n') -> int: """Returns char location after the search char, going forward. Example:: f = io.BytesIO(b"123\\n456\\n789\\nabc") f | splitSeek(2) # returns [0, 4, 15] splitSeek.forward(f, 2) # returns 4 splitSeek.forward(f, 3) # returns 4 splitSeek.forward(f, 4) # returns 8 :param f: file handle :param i: current seek point :param c: block-boundary character""" def inner(f): f.seek(i) while True: b = f.tell(); s = f.read(1000); di = s.find(c) if di > -1: return b + di + 1 if s == "": return -1 if isinstance(f, str): with open(os.path.expanduser(f), "rb") as _f: return inner(_f) else: return inner(f)
[docs] @staticmethod def backward(f, i:int, c=b'\n') -> int: """Returns char location after the search char, going backward. Example:: f = io.BytesIO(b"123\\n456\\n789\\nabc") f | splitSeek(2) # returns [0, 4, 15] splitSeek.backward(f, 5) # returns 4 splitSeek.backward(f, 4) # returns 4 splitSeek.backward(f, 3) # returns 0 :param f: file handle :param i: current seek point :param c: block-boundary character""" def inner(f): mul = 1 while True: begin = max(i-1000*mul, 0); end = max(i-1000*(mul-1), 0); mul += 1 # search range f.seek(begin); b = f.tell() s = f.read(end-begin); di = s.rfind(c) if di > -1: return b + di + 1 if b == 0: return 0 if isinstance(f, str): with open(os.path.expanduser(f), "rb") as _f: return inner(_f) else: return inner(f)
[docs] def __ror__(self, fn): # why return self instead of the seek positions directly? Because we want to pass along dependencies like file name to downstream processes like refineSeek if isinstance(fn, str): fn = os.path.expanduser(fn) n = self.n; c = self.c; ws = self.ws; self.fn = fn def func(f): f.seek(0, os.SEEK_END); end = f.tell() if ws is None: begins = range(n) | cli.apply(lambda x: int(x*end/n)) else: begins = range(end) | cli.splitW(*ws) | cli.apply(lambda x: x.start) return [*begins | cli.apply(lambda x: splitSeek.backward(f, x, c)), end] if isinstance(fn, str): with open(os.path.expanduser(fn), 'rb') as f: self.res = func(f); return self else: self.res = func(fn); return self
def __or__(self, aft): if self.res is None: return super().__or__(aft) return aft.__ror__(self) def __getitem__(self, idx): return self.res[idx] def __len__(self): return len(self.res) def __iter__(self): return iter(self.res)
[docs] @staticmethod def window(): """Converts input boundaries into segment windows. Example:: # returns [[0, 4], [5, 9], [10, 20]] [0, 5, 10, 20] | splitSeek.window() # example use case "abc.txt" | splitSeek(10) | splitSeek.window() """ def inner(it): it = it | cli.aS(list); ranges = it | cli.window(2) | cli.apply(lambda x: x-1, 1) | cli.deref() ranges[-1][1] += 1; return ranges return cli.aS(inner)
def __repr__(self): return self.res.__repr__()
[docs]class refineSeek(BaseCli):
[docs] def __init__(self, f, window=1): """Refines seek positions. Example:: # returns list of integers for seek positions "abc.txt" | splitSeek(30) # returns refined seek positions, such that the line starting at the seek positions starts with "@" "abc.txt" | splitSeek(30) | refineSeek(lambda x: x.startswith(b"@")) # same thing as above "abc.txt" | splitSeek(30) | refineSeek(lambda x: x[0] == b"@"[0]) # returns refined seek positions, such that 0th line starts with "@" and 2nd line starts with "+". This demonstrates `window` parameter "abc.txt" | splitSeek(30) | refineSeek(lambda x: x[0][0] == b"@"[0] and x[2][0] == b"+"[0], 3) # same thing as above, demonstrating some builtin refine seek functions "abc.txt" | splitSeek(30) | refineSeek.fastq() :param f: function that returns True if the current line/lines is a valid block boundary :param window: by default (1), will fetch 1 line and check boundary using ``f(line)``. If a value greater than 1 is passed (for example, 3), will fetch 3 lines and check boundary using ``f([line1, line2, line3])`` """ self.f = cli.fastF(f); self.window = window; self.fn = None
[docs] def __ror__(self, seeks): f = self.f; window = self.window def read(fio, sB:int): fio.seek(sB) if window == 1: return fio.readline() return list(cli.repeatF(lambda: fio.readline(), window)) if len(seeks) <= 2: return seeks fn = self.fn or seeks.fn; newSeeks = [seeks[0]] def process(fio): with open(fn, "rb") as fio: for seek in seeks[1:-1]: line = read(fio, seek) while not f(line): seek = splitSeek.forward(fio, seek); line = read(fio, seek) newSeeks.append(seek) newSeeks.append(seeks[-1]); return newSeeks if isinstance(fn, str): with open(fn, "rb") as fio: return process(fio) else: return process(fn)
[docs] @classmethod def fastq(cls): """Refine fastq file's seek points""" return cls(lambda x: x[0][0] == b"@"[0] and x[2][0] == b"+"[0], 3)
[docs]def curl(url:str, tries=3) -> Iterator[str]: """Gets file from url. File can't be a binary blob. Example:: # prints out first 10 lines of the website curl("https://k1lib.github.io/") | headOut() :param url: the url to get the contents of :param tries: how many times to retry the request""" for i in range(tries): try: for line in urllib.request.urlopen(url): line = line.decode() if line[-1] == "\n": yield line[:-1] else: yield line except Exception as e: if i == tries-1: raise e
[docs]def wget(url:str, fileName:str=None, mkdir=True): """Downloads a file. Also returns the file name, in case you want to pipe it to something else. :param url: The url of the file :param fileName: if None, then tries to infer it from the url :param mkdir: whether to make the directory leading up to the file or not""" if fileName is None: fileName = url.split("/")[-1] fileName = os.path.expanduser(fileName); dirname = os.path.dirname(fileName) if mkdir: os.makedirs(dirname, exist_ok=True) try: urllib.request.urlretrieve(url, fileName); return fileName except: return None
[docs]def ls(folder:str=None): """List every file and folder inside the specified folder. Example:: # returns List[str] ls("/home") # same as above "/home" | ls() # only outputs files, not folders ls("/home") | filt(os.path.isfile)""" if folder is None: return _ls() else: return folder | _ls()
class _ls(BaseCli): def _typehint(self, ignored=None): return tList(str) def __ror__(self, path:str): path = os.path.expanduser(path.rstrip(os.sep)) return [f"{path}{os.sep}{e}" for e in os.listdir(path)] k1lib.settings.cli.add("quiet", False, "whether to mute extra outputs from clis or not") newline = b'\n'[0] class lazySt: def __init__(self, st, text:bool): """Converts byte stream into lazy text/byte stream, with nice __repr__.""" self.st = st; self.text = text; def __iter__(self): if self.text: while True: line = self.st.readline() if len(line) == 0: break yield line.decode().rstrip("\n") else: while True: line = self.st.readline() if len(line) == 0: break yield line def __repr__(self): self | cli.stdout(); return "" def executeCmd(cmd:str, inp:bytes, text): """Runs a command, and returns stdout and stderr streams""" p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=k1lib.settings.wd) if inp is not None: if isinstance(inp, (str, bytes)): p.stdin.write(inp if isinstance(inp, bytes) else inp.encode()) else: for e in inp: if not isinstance(e, (str, bytes)): e = str(e) if not isinstance(e, bytes): e = e.encode() p.stdin.write(e); p.stdin.write(b"\n") p.stdin.close(); return lazySt(p.stdout, text), lazySt(p.stderr, text) def printStderr(err): if not k1lib.settings.cli.quiet: e, it = err | cli.peek() if it != []: it | cli.insert("\nError encountered:\n") | cli.apply(k1lib.fmt.txt.red) | cli.stdout()
[docs]def requireCli(cliTool:str): """Searches for a particular cli tool (eg. "ls"), throws ImportError if not found, else do nothing""" a = cmd(cliTool); None | a; if len(a.err) > 0: raise ImportError(f"""Can't find cli tool {cliTool}. Please install it first.""")
[docs]class cmd(BaseCli):
[docs] def __init__(self, cmd:str, mode:int=1, text=True, block=False): # 0: return (stdout, stderr). 1: return stdout, 2: return stderr """Runs a command, and returns the output line by line. Can pipe in some inputs. If no inputs then have to pipe in :data:`None`. Example:: # return detailed list of files None | cmd("ls -la") # return list of files that ends with "ipynb" None | cmd("ls -la") | cmd('grep ipynb$') It might be tiresome to pipe in :data:`None` all the time. So, you can use ">" operator to yield values right away:: # prints out first 10 lines of list of files cmd("ls -la") > headOut() If you're using Jupyter notebook/lab, then if you were to display a :class:`cmd` object, it will print out the outputs. So, a single command ``cmd("mkdir")`` displayed at the end of a cell is enough to trigger creating the directory. Reminder that ">" operator in here sort of has a different meaning to that of :class:`~k1lib.cli.init.BaseCli`. So you kinda have to becareful about this:: # returns a serial cli, cmd not executed cmd("ls -la") | deref() # executes cmd with no input stream and pipes output to deref cmd("ls -la") > deref() # returns a serial cli cmd("ls -la") > grep("txt") > headOut() # executes pipeline cmd("ls -la") > grep("txt") | headOut() General advice is, right ater a :class:`cmd`, use ">", and use "|" everywhere else. Let's see a few more exotic examples. File ``a.sh``: .. code-block:: bash #!/bin/bash echo 1; sleep 0.5 echo This message goes to stderr >&2 echo 2; sleep 0.5 echo $(</dev/stdin) sleep 0.5; echo 3 Examples:: # returns [b'1\\n', b'2\\n', b'45\\n', b'3\\n'] and prints out the error message "45" | cmd("./a.sh", text=False) | deref() # returns [b'This message goes to stderr\\n'] "45" | cmd("./a.sh", mode=2, text=False) | deref() # returns [[b'1\\n', b'2\\n', b'45\\n', b'3\\n'], [b'This message goes to stderr\\n']] "45" | cmd("./a.sh", mode=0, text=False) | deref() Performance-wise, stdout and stderr will yield values right away as soon as the process outputs it, so you get real time feedback. However, this will convert the entire input into a :class:`bytes` object, and not feed it bit by bit lazily, so if you have a humongous input, it might slow you down a little. Also, because stdout and stderr yield values right away, it means that if you want the operation to be blocking until finished, you have to consume the output:: None | cmd("mkdir abc") # might fail, because this might get executed before the previous line None | cmd("echo a>abc/rg.txt") None | cmd("mkdir abc") | ignore() # will succeed, because this will be guaranteed to execute after the previous line None | cmd("echo a>abc/rg.txt") Settings: - cli.quiet: if True, won't display errors in mode 1 :param mode: if 0, returns ``(stdout, stderr)``. If 1, returns ``stdout`` and prints ``stderr`` if there are any errors. If 2, returns ``stderr`` :param text: whether to decode the outputs into :class:`str` or return raw :class:`bytes` :param block: whether to wait for the task to finish before returning to Python or not""" super().__init__(); self.cmd = cmd; self.mode = mode self.text = text; self.block = block; self.ro = k1lib.RunOnce()
def _typehint(self, ignored=None): t = tIter(str) if self.text else tIter(bytes) if self.mode == 0: return tCollection(t, t) return t
[docs] def __ror__(self, it:Union[None, str, bytes, Iterator[Any]]) -> Iterator[Union[str, bytes]]: """Pipes in lines of input, or if there's nothing to pass, then pass None""" if not self.ro.done(): self.out, self.err = executeCmd(self.cmd, it, self.text); mode = self.mode if self.block: self.out = self.out | cli.deref() self.err = self.err | cli.deref() if mode == 0: return (self.out, self.err) elif mode == 1: threading.Thread(target=lambda: printStderr(self.err)).start() return self.out elif mode == 2: return self.err
def __gt__(self, it): return None | self | it def __repr__(self): return (None | self).__repr__()
[docs]class walk(BaseCli):
[docs] def __init__(self, **kwargs): """Recursively get all files inside a dictionary. Example:: # prints out first 10 files "." | walk() | headOut()""" self.kwargs = kwargs
[docs] def __ror__(self, path): return os.walk(path, **self.kwargs) | ~cli.apply(lambda x, y, z: z | cli.apply(lambda e: x + os.sep + e)) | cli.joinStreams()
[docs]def urlPath(base:str, host:bool=True): """Translates from a url to a file path. Example:: base = "~/ssd/some/other/path" url = "http://example.com/some/path/to/file.txt" url | urlPath(base) # returns "~/ssd/some/other/path/example_com/some/path/to/file.txt" url | urlPath(base, False) # returns "~/ssd/some/other/path/some/path/to/file.txt" :param base: base directory you want the files to be in""" base = base.rstrip("/\\") def inner(url): p = urllib.parse.urlparse(url) a = (p.netloc.replace(".", "_") + os.sep) if host else "" return f"{base}{os.sep}{a}{p.path.strip('/')}" return cli.aS(inner)