# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
"""
This is for advanced machine learning models or complex data structures and algorithms, presented in a digestible interface
"""
from typing import Callable, Union, List, overload, Iterator, Any, Set, Tuple
from k1lib.cli.init import BaseCli, fastF
import k1lib.cli as cli; import k1lib, os, math
from k1lib.cli.typehint import *
import numpy as np; from collections import deque
try: import torch; hasTorch = True
except: hasTorch = False
__all__ = ["embed", "complete", "kmeans", "tsne", "bloom"]
settings = k1lib.Settings().add("cuda", None, "whether to run the models on the GPU or not. True for GPU, False for CPU. None (default) for GPU if available, else CPU")
k1lib.settings.cli.add("models", settings, "settings related to k1lib.cli.models");
_cuda = k1lib.Wrapper(None)
def cuda() -> bool: # internal func to figure out whether the funcs should run on gpus or not # cuda
    if _cuda() is None: _cuda.value = torch.cuda.is_available() and torch.cuda.device_count() >= 1 # cuda
    return _cuda()                                                               # cuda
sentence_transformers = k1lib.dep("sentence_transformers"); embed_models_cpu = dict(); embed_models_cuda = dict() # cuda
def embed_models(): return embed_models_cuda if cuda() else embed_models_cpu     # embed_models
settings.add("embed", k1lib.Settings().add("model", "all-MiniLM-L6-v2", "what model to choose from `SentenceTransformer` library").add("bs", 512, "batch size to feed the model. For all-MiniLM-L6-v2, it seems to be able to deal with anything. I've tried 10k batch and it's still doing good")) # embed_models
def embed_model(): # returns correct function capable of passing in List[str] and will spit out np.ndarray with shape (N, F) # embed_model
    modelName = settings.embed.model                                             # embed_model
    if modelName not in embed_models():                                          # embed_model
        model = sentence_transformers.SentenceTransformer(modelName)             # embed_model
        if cuda(): model = model.cuda()                                          # embed_model
        embed_models()[modelName] = model.encode                                 # embed_model
    return embed_models()[modelName]                                             # embed_model
[docs]class embed(BaseCli):                                                            # embed
[docs]    def __init__(self, norm=True):                                               # embed
        """Gets an embedding vector for every sentence piped into this.
Example::
    # returns (384,)
    "abc" | embed() | shape()
    # returns (2, 384). Don't worry that this is less performant, as behind the scenes, it will automatically batch all lines together and pass through the model only once
    ["abc", "def"] | embed().all() | shape()
There are several settings you can set::
    settings.cli.models.embed.model = "msmarco-distilbert-base-v4" # specifies model used in this function
    settings.cli.models.cuda = False # tells the system to only use the CPU to run the models
I'd suggest picking "all-MiniLM-L6-v2" for general purpose tasks, and "msmarco-distilbert-base-v4"
for document-lookup style applications.
:param norm: whether to normalize the output embeddings or not"""                # embed
        self.model = embed_model(); self.normF = (lambda x: (x - (x | cli.toMean())) / (x | cli.toStd())) if norm else (lambda x: x) # embed 
[docs]    def __ror__(self, it): return self.normF(self.model([it])[0] if isinstance(it, str) else self.model(list(it))) # embed 
    def _all_opt(self, it:List[str]): return it | cli.batched(settings.embed.bs, True) | cli.apply(self.__ror__) | cli.joinStreams() # embed 
transformers = k1lib.dep("transformers"); generic_models_cpu = dict(); generic_models_cuda = dict() # embed
def generic_models(): return generic_models_cuda if cuda() else generic_models_cpu # generic_models
settings.add("generic", k1lib.Settings().add("model", "google/flan-t5-xl", "what model to choose from `transformers` library").add("bs", 16, "batch size to feed the model. For flan-t5-xl, 16 seems to be the sweet spot for 24GB VRAM (RTX 3090/4090). Decrease it if you don't have as much VRAM")) # generic_models
def generic_model(maxTokens=100): # returns correct function capable of passing in str|List[str] and will spit out List[str] # generic_model
    modelName = settings.generic.model                                           # generic_model
    if modelName not in generic_models():                                        # generic_model
        if modelName.startswith("google/flan-t5-"):                              # generic_model
            tokenizer = transformers.T5Tokenizer.from_pretrained(modelName, **({"device_map": "auto"} if cuda() else {})) # generic_model
            model = transformers.T5ForConditionalGeneration.from_pretrained(modelName, **({"device_map": "auto"} if cuda() else {})) # generic_model
            conf = transformers.GenerationConfig(max_new_tokens=maxTokens); cuda_ = cuda() # generic_model
            # if cuda_: model = model.cuda()                                     # generic_model
            def inner(it):                                                       # generic_model
                inputs = tokenizer(list(it), return_tensors="pt", padding=True).input_ids # generic_model
                if cuda_: inputs = inputs.cuda()                                 # generic_model
                return (tokenizer.decode(line) for line in model.generate(inputs, conf)) # generic_model
            generic_models()[modelName] = inner                                  # generic_model
        else: raise Exception("Currently, only Google T5 Flan models are supported") # generic_model
    return generic_models()[modelName]                                           # generic_model
[docs]class complete(BaseCli):                                                         # complete
[docs]    def __init__(self, prompt:str=""):                                           # complete
        """Uses a LLM to autocomplete something.
Example::
    # returns "4". In case you're living in a cage, these LLMs are not entirely math savants. They sure understand English though
    "What is 2 + 6?" | complete()
    # returns ["4", "4"]
    ["What is 2 + 6?", "What is 8 + 2?"] | complete().all() | deref()
Can change model type by doing ``settings.cli.models.generic.model = "google/flan-t5-xl"``
:param prompt: if specified, will inject this bit of text after all of the inputs.
    Can be something like "Please translate the above paragraph to German"
"""                                                                              # complete
        self.model = generic_model(); self.prompt = prompt                       # complete 
[docs]    def __ror__(self, it):                                                       # complete
        arrMode = not isinstance(it, str); prompt = self.prompt                  # complete
        it = (list(it) if arrMode else [it]) | cli.apply(lambda x: f"{x}\n\n\n{prompt}: ") | cli.deref() # complete
        ans = self.model(it) | cli.apply(lambda x: x.replace("<pad>", "").replace("<unk>", "").replace("</s>", "").strip()) # complete
        return ans if arrMode else ans | cli.item()                              # complete 
    def _all_opt(self, it:List[str]):                                            # complete
        return it | cli.batched(settings.generic.bs, True) | cli.apply(self.__ror__) | cli.joinStreams() # complete 
skclus = k1lib.dep("sklearn.cluster")                                            # complete
skpre = k1lib.dep("sklearn.preprocessing")                                       # complete
skmet = k1lib.dep("sklearn.metrics")                                             # complete
def refine(fea, a, b, kwargs, timeout=1):                                        # refine
    scores = []; topScore = float("-inf")                                        # refine
    for k in torch.loglinspace(a, b, 10).numpy().astype(int) | cli.aS(np.unique): # refine
        km = skclus.KMeans(n_clusters=k, **{**{"init": "k-means++", "n_init": 10, "max_iter": 30}, **kwargs}) # refine
        try: [fea] | cli.applyTh(km.fit, timeout=timeout) | cli.ignore()         # refine
        except: break                                                            # refine
        score = skmet.silhouette_score(fea, km.labels_)                          # refine
        topScore = max(topScore, score); scores.append([k, score])               # refine
        if score*2 < topScore: break                                             # refine
    return scores | ~cli.sort(1) | cli.cut(0) | cli.item(), k                    # refine
def findCenters(fea, c, kwargs, timeout=1):                                      # findCenters
    if c is None:                                                                # findCenters
        a = 2; b = len(fea); c, b = refine(fea, a, b, kwargs, timeout); a = round(a*0.2 + c*0.8); b = round(b*0.5 + c*0.5) # findCenters
        while b - a > 3:     c, b = refine(fea, a, b, kwargs, timeout); a = round(a*0.2 + c*0.8); b = round(b*0.5 + c*0.5) # findCenters
    km = skclus.KMeans(n_clusters=c, init="k-means++", n_init=10, max_iter=100)  # findCenters
    km.fit(fea); return km.cluster_centers_, km.labels_                          # findCenters
[docs]class kmeans(BaseCli):                                                           # kmeans
[docs]    def __init__(self, k=None, mode=1, timeout=1, **kwargs):                     # kmeans
        """Do k-means clustering, returning the cluster centers.
Example::
    features, true_labels = sklearn.datasets.make_blobs(n_samples=1_000, centers=5, cluster_std=0.5)
    centers = features | kmeans();
    centers | shape() # likely return (5, 2)
    # plotting things out
    plt.plot(*features.T, ".")
    plt.plot(*centers.T,  ".")
.. image:: ../images/kmeans.png
:param k: if specified, will use this k value. Else tries to guess what the best value is
:param mode: mode 0 (returns [cluster centers, labels]), mode 1 (returns cluster centers only), mode 2 (returns labels only)
:param timeout: internally will try kmeans for up to this number of seconds only.
    Will kill the job if it's taking longer
:param kwargs: keyword arguments will be passed into sklearn.cluster.KMeans directly.
    Some interesting parameters include ``init``, ``n_init``, ``max_iter``
"""                                                                              # kmeans
        self.k = k; self.mode = mode; self.timeout = timeout; self.kwargs = kwargs # kmeans 
[docs]    def __ror__(self, it):                                                       # kmeans
        scaler = skpre.StandardScaler(); fea = scaler.fit_transform(it); mode = self.mode # kmeans
        centers, labels = findCenters(fea, self.k, self.kwargs, self.timeout)    # kmeans
        centers = scaler.inverse_transform(centers)                              # kmeans
        if mode == 0: return [centers, labels]                                   # kmeans
        return centers if mode == 1 else labels                                  # kmeans  
skmani = k1lib.dep("sklearn.manifold")                                           # kmeans
[docs]class tsne(BaseCli):                                                             # tsne
[docs]    def __init__(self, n=2, **kwargs):                                           # tsne
        """Transforms feature vectors of shape (N, F) down to (N, 2) for easy plotting.
Example::
    from sklearn.datasets import make_blobs
    features, true_labels = make_blobs(n_samples=1_000, n_features=5, centers=5, cluster_std=0.2)
    features | shape()          # returns (1000, 5)
    features | tsne() | shape() # returns (1000, 2)
    # plotting things out that has nice colors and whatnot
    features | tsne() & kmeans(5, 2) | ~aS(lambda xy,c: plt.scatter(*xy.T,c=c))
.. image:: ../images/tsne.png
:param n: number of output components (aka size of feature vector)
:param kwargs: other keyword arguments passed into ``sklearn.manifold.TSNE``"""  # tsne
        self.n = n; self.kwargs = kwargs                                         # tsne 
[docs]    def __ror__(self, it):                                                       # tsne
        if not isinstance(it, k1lib.settings.cli.arrayTypes): it = np.array(list(it)) # tsne
        return skmani.TSNE(self.n, **self.kwargs).fit_transform(it)              # tsne  
settings.add("bloom", k1lib.Settings().add("scalable", k1lib.Settings().add("capacity", 1000, "initial filter's capacity").add("growth", 4, "how fast does the filter's capacity grow over time when the capacity is reached"), "settings for when you don't declare the bloom's capacity ahead of time"), "bloom filter settings") # tsne
pybloom_live = k1lib.dep("pybloom_live", "pybloom-live", "https://github.com/joseph-fox/python-bloomfilter, https://pypi.org/project/pybloom-live/") # tsne
[docs]class bloom(BaseCli):                                                            # bloom
[docs]    def __init__(self, n:int=None, p:float=0.1, overflow:bool=False):            # bloom
        """Creates a bloom filter.
Example::
    bf = ["raptor", "skylake", "merlin", "twinscan nxt", "sapphire rapids"] | bloom()
    "raptor"       in bf # returns True
    "twinscan nxt" in bf # returns True
    "twin"         in bf # most likely returns False, small chance returns True
This also allows distributed computing quite easily::
    bf = range(10) | applyMp(lambda i: cat(f"file-{i}.txt") | bloom()) | bloom.join()
This code assumes that you have 10 files filled with text with file names "file-1.txt" and
you want to check whether a string exists or not in all of those files. It is expected that
.. admonition:: Scalable bloom filter
    It's possible to leave the filter's capacity empty, which will create an initial filter
    with capacity 1000. When that capacity is reached, it will expand the filter to a capacity
    of 4000. Then 16000, and so on. This can be tweaked in the settings::
        settings.cli.models.bloom.scalable.capacity = 2000 # sets filter's default initial capacity
        settings.cli.models.bloom.scalable.growth = 2      # sets filter's growth factor when it runs out of space
    Because fundamentally, bloom filters can't grow, internally, this will create multiple bloom
    filters with increasing capacity, and whenever you search for a term, it will have to search
    through multiple filters to get the answer. So even though you can leave the capacity empty,
    it will degrade performance a little bit, which might be undesirable.
:param n: number of elements to be stored, aka filter's capacity
:param p: false positive probability (put 0.1 for 10% false positive, 0.01 for 1%)
:param overflow: if True, allows append more elements than the capacity of the filter, else (default) don't allow it""" # bloom
        self.n = n; self.p = p; self.overflow = overflow                         # bloom 
[docs]    def __ror__(self, it):                                                       # bloom
        n = self.n; p = self.p                                                   # bloom
        bf = pybloom_live.ScalableBloomFilter(settings.bloom.scalable.capacity, p, settings.bloom.scalable.growth) if n is None else pybloom_live.BloomFilter(n, p) # bloom
        if self.overflow:                                                        # bloom
            for e in it:                                                         # bloom
                try: bf.add(e)                                                   # bloom
                except IndexError: bf.count = 0; bf.add(e)                       # bloom
        else:                                                                    # bloom
            for e in it: bf.add(e)                                               # bloom
        return bf                                                                # bloom 
[docs]    @staticmethod                                                                # bloom
    def join():                                                                  # bloom
        def inner(bfs):                                                          # bloom
            bfs = list(bfs)                                                      # bloom
            if len(set([type(b) for b in bfs])) > 1: raise Exception("Can't join normal filters and scalable filters together. Please specify a common capacity to all of the bloom filters") # bloom
            if type(bfs[0]) == pybloom_live.BloomFilter: # my implementation, which should be a little faster than builtin .union() # bloom
                b = bfs[0].copy()                                                # bloom
                for b_ in bfs[1:]: b.bitarray |= b_.bitarray                     # bloom
                return b                                                         # bloom
            else: # has to use complex .union() in this case                     # bloom
                b = bfs[0]                                                       # bloom
                for b_ in bfs[1:]: b = b.union(b_)                               # bloom
                return b                                                         # bloom
        return cli.aS(inner)                                                     # bloom  
try:                                                                             # bloom
    import pybloom_live                                                          # bloom
    @k1lib.patch(pybloom_live.pybloom.BloomFilter)                               # bloom
    def __or__(self, other):                                                     # bloom
        if isinstance(other, BaseCli): return other.__ror__(self)                # bloom
        return self.union(other)                                                 # bloom
except: pass                                                                     # bloom