# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for functions that are actually biology-related
"""
from k1lib.cli.init import BaseCli
import k1lib; import k1lib.cli as cli
import os; from functools import partial
from typing import Iterator, Union
__all__ = ["go", "quality", "longFa", "idx",
           "transcribe", "complement", "translate", "medAa", "longAa"]
settings = k1lib.Settings()
k1lib.settings.cli.add("bio", settings, "from k1lib.cli.bio module");
def _patchDir(term, s, p):                                                       # _patchDir
    if p != None: p = os.path.abspath(os.path.expanduser(p))                     # _patchDir
    s.__dict__[term] = p                                                         # _patchDir
settings.add("blast", None, "location of BLAST database", partial(_patchDir, "blast")) # _patchDir
settings.add("go", None, "location of gene ontology file (.obo)", partial(_patchDir, "go")) # _patchDir
settings.add("so", None, "location of sequence ontology file", partial(_patchDir, "so")); # _patchDir
settings.add("lookupImgs", True, "sort of niche. Whether to auto looks up extra gene ontology relationship images") # _patchDir
[docs]def go(term:int):                                                                # go
    """Looks up a GO term"""                                                     # go
    if settings.go is None and not os.path.exists("go.obo"):                     # go
        answer = input("""No gene ontology obo file specified! You can:
- Specify the file using `settings.cli.bio.go = '/some/folder/go.obo'`
- Download this automatically to file `go.obo`
You want to download this automatically? (y/n) """)                              # go
        if answer.lower().startswith("y"):                                       # go
            url = "http://current.geneontology.org/ontology/go.obo"              # go
            print(f"Downloading from {url}...      ", end="")                    # go
            cli.wget(url); print("Finished!")                                    # go
        else: return print("Aborted")                                            # go
    file = settings.go or "go.obo"; term = f"{term}".rjust(7, "0")               # go
    cli.cat(file) | cli.grep(f"id: GO:{term}", 0, 10) > cli.stdout()             # go
    print(f"https://www.ebi.ac.uk/QuickGO/GTerm?id=GO:{term}")                   # go
    if settings.lookupImgs:                                                      # go
        class Repr:                                                              # go
            def _repr_html_(self):                                               # go
                return f"""<img src="http://amigo.geneontology.org/visualize?mode=amigo&term_data_type=string&format=png&inline=false&term_data=GO%3A{term}" />""" # go
        return Repr()                                                            # go 
settings.add("phred", """!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJ""", "Phred quality score") # go
class toIdx(BaseCli):                                                            # toIdx
    def __init__(self, chars:str): self.chars = {v:k for k, v in enumerate(chars)} # toIdx
    def __ror__(self, it):                                                       # toIdx
        chars = self.chars                                                       # toIdx
        for e in it: yield chars[e]                                              # toIdx
[docs]def quality(log=True):                                                           # quality
    """Get numeric quality of sequence.
Example::
    # returns [2, 2, 5, 30]
    "##&?" | quality() | deref()
:param log: whether to use log scale (0 -> 40), or linear scale (1 -> 0.0001)""" # quality
    if log: return toIdx(settings.phred)                                         # quality
    else: return toIdx(settings.phred) | cli.apply(lambda x: 10**(-x/10))        # quality 
[docs]def longFa():                                                                    # longFa
    """Takes in a fasta file and put each sequence on 1 line.
File "gene.fa"::
    >AF086833.2 Ebola virus - Mayinga, Zaire, 1976, complete genome
    CGGACACACAAAAAGAAAGAAGAATTTTTAGGATC
    TTTTGTGTGCGAATAACTATGAGGAAGATTAATAA
    >something other gene
    CGGACACACAAAAAGAAAGAAGA
    TTTTGTGTGCGAATAACTATGAG
Code::
    cat("gene.fa") | bio.longFa() | cli.headOut()
Prints out::
    >AF086833.2 Ebola virus - Mayinga, Zaire, 1976, complete genome
    CGGACACACAAAAAGAAAGAAGAATTTTTAGGATCTTTTGTGTGCGAATAACTATGAGGAAGATTAATAA
    >something other gene
    CGGACACACAAAAAGAAAGAAGATTTTGTGTGCGAATAACTATGAG"""                            # longFa
    return cli.grep("^>", sep=True).till() | (cli.item() & (~cli.head(1) | cli.join(""))).all() | cli.joinStreams() # longFa 
def _fileWithoutExt(f): return ".".join(f.split(".")[:-1])                       # _fileWithoutExt
[docs]class idx(BaseCli):                                                              # idx
    """Indexes files with various formats."""                                    # idx
[docs]    @staticmethod                                                                # idx
    def blast(fileName:str=None, dbtype:str=None):                               # idx
        """Uses ``makeblastdb`` to create a blast database from a fasta file.
Example::
    "file.fa" | bio.idx.blast()
    bio.idx.blast("file.fa")"""                                                  # idx
        f = cli.applyS(lambda fileName: None | cli.cmd(f"makeblastdb -dbtype {dbtype or 'nucl'} -in {fileName} -out {_fileWithoutExt(fileName)}")) # idx
        return f if fileName is None else f(fileName)                            # idx 
[docs]    @staticmethod                                                                # idx
    def bwa(fileName:str=None):                                                  # idx
        """Uses ``bwa`` to index a fasta file.
Example::
    "file.fa" | bio.idx.bwa()
    bio.idx.bwa("file.bwa")"""                                                   # idx
        f = cli.applyS(lambda fileName: None | cli.cmd(f"bwa index {fileName}")) # idx
        return f if fileName is None else f(fileName)                            # idx 
[docs]    @staticmethod                                                                # idx
    def bam(fileName:str=None):                                                  # idx
        """Uses ``samtools`` to index a bam file.
Example::
    "file.bam" | bio.idx.bam()
    bio.idx.bam("file.bam")"""                                                   # idx
        f = cli.applyS(lambda fileName: None | cli.cmd(f"samtools index {fileName}")) # idx
        return f if fileName is None else f(fileName)                            # idx  
[docs]class transcribe(BaseCli):                                                       # transcribe
    """Transcribes (DNA -> RNA) incoming rows.
Example::
    # returns "AUCG"
    "ATCG" | transcribe()
    # returns ["AUCG"]
    ["ATCG"] | transcribe() | deref()"""                                         # transcribe
[docs]    def __ror__(self, it:Union[Iterator[str], str]):                             # transcribe
        if isinstance(it, str): return [it] | self | cli.item()                  # transcribe
        return (line.upper().replace("T", "U") for line in it)                   # transcribe  
[docs]class complement(BaseCli):                                                       # complement
    """Get the reverse complement of DNA.
Example::
    # returns "TAGC"
    "ATCG" | bio.complement()
    # returns ["TAGC"]
    ["ATCG"] | bio.complement() | deref()"""                                     # complement
[docs]    def __ror__(self, it:Union[Iterator[str], str]):                             # complement
        if isinstance(it, str): return [it] | self | cli.item()                  # complement
        return (line.upper().replace("A", "0").replace("T", "A").replace("0", "T").upper().replace("C", "0").replace("G", "C").replace("0", "G") for line in it) # complement  
ntAa = {"UUU": "F", "UUC": "F", "UUA": "L", "UUG": "L",                          # complement
        "UCU": "S", "UCC": "S", "UCA": "S", "UCG": "S",                          # complement
        "UAU": "Y", "UAC": "Y", "UAA": "*", "UAG": "*",                          # complement
        "UGU": "C", "UGC": "C", "UGA": "*", "UGG": "W",                          # complement
                                                                                 # complement
        "CUU": "L", "CUC": "L", "CUA": "L", "CUG": "L",                          # complement
        "CCU": "P", "CCC": "P", "CCA": "P", "CCG": "P",                          # complement
        "CAU": "H", "CAC": "H", "CAA": "Q", "CAG": "Q",                          # complement
        "CGU": "R", "CGC": "R", "CGA": "R", "CGG": "R",                          # complement
                                                                                 # complement
        "AUU": "I", "AUC": "I", "AUA": "I", "AUG": "M",                          # complement
        "ACU": "T", "ACC": "T", "ACA": "T", "ACG": "T",                          # complement
        "AAU": "N", "AAC": "N", "AAA": "K", "AAG": "K",                          # complement
        "AGU": "S", "AGC": "S", "AGA": "R", "AGG": "R",                          # complement
                                                                                 # complement
        "GUU": "V", "GUC": "V", "GUA": "V", "GUG": "V",                          # complement
        "GCU": "A", "GCC": "A", "GCA": "A", "GCG": "A",                          # complement
        "GAU": "D", "GAC": "D", "GAA": "E", "GAG": "E",                          # complement
        "GGU": "G", "GGC": "G", "GGA": "G", "GGG": "G"}                          # complement
_shortAa = {v:v for v in ntAa.values()}                                          # complement
_medAa = {                                                                       # complement
    "F": "Phe", "L": "Leu", "I": "Ile", "M": "Met", "V": "Val",                  # complement
    "S": "Ser", "P": "Pro", "T": "Thr", "A": "Ala", "Y": "Tyr",                  # complement
    "*": "Stop", "H": "His", "Q": "Gln", "N": "Asn", "K": "Lys",                 # complement
    "D": "Asp", "E": "Glu", "C": "Cys", "W": "Trp", "R": "Arg",                  # complement
    "G": "Gly", "U": "Sec", "?": "?"                                             # complement
}                                                                                # complement
_longAa = {                                                                      # complement
    "F": "Phenylalanine", "L": "Leucine", "I": "Isoleucine", "M": "Methionine", "V": "Valine", # complement
    "S": "Serine", "P": "Proline", "T": "Threonine", "A": "Alanine", "Y": "Tyrosine", # complement
    "*": "Stop", "H": "Histidine", "Q": "Glutamine", "N": "Asparagine", "K": "Lysine", # complement
    "D": "AsparticAcid", "E": "GlutamicAcid", "C": "Cysteine", "W": "Tryptophan", "R": "Arginine", # complement
    "G": "Glycine", "U": "Selenocysteine", "?": "?"                              # complement
}                                                                                # complement
[docs]class translate(BaseCli):                                                        # translate
[docs]    def __init__(self, length:int=0):                                            # translate
        """Translates incoming rows.
:param length: 0 for short (L), 1 for med (Leu), 2 for long (Leucine)"""         # translate
        super().__init__(); self.delim = "" if length == 0 else " "              # translate
        self.dict = [_shortAa, _medAa, _longAa][length]                          # translate 
[docs]    def __ror__(self, it:Iterator[str]):                                         # translate
        super().__ror__(it)                                                      # translate
        if isinstance(it, str): it = [it]                                        # translate
        it = it | transcribe()                                                   # translate
        for line in it:                                                          # translate
            line = line.replace(" ", "")                                         # translate
            answer = ""; n = len(line)                                           # translate
            for i in range(0, n - n % 3, 3):                                     # translate
                codon = line[i:i+3].upper()                                      # translate
                answer += (self.dict[ntAa[codon]] if codon in ntAa else "?") + self.delim # translate
            yield answer                                                         # translate  
[docs]class medAa(BaseCli):                                                            # medAa
    """Converts short aa sequence to medium one"""                               # medAa
[docs]    def __ror__(self, it:Iterator[str]):                                         # medAa
        if isinstance(it, str): it = [it]                                        # medAa
        for line in it:                                                          # medAa
            yield " ".join(_medAa[c] for c in line)                              # medAa  
[docs]class longAa(BaseCli):                                                           # longAa
    """Converts short aa sequence to long one"""                                 # longAa
[docs]    def __ror__(self, it:Iterator[str]):                                         # longAa
        if isinstance(it, str): it = [it]                                        # longAa
        for line in it:                                                          # longAa
            yield " ".join(_longAa[c] for c in line)                             # longAa