k1lib.cli module

The main idea of this package is to emulate the terminal (hence “cli”, or “command line interface”), but doing all of that inside Python itself. So this bash statement:

cat file.txt | head -5 > headerFile.txt

Turns into this statement:

cat("file.txt") | head(5) > file("headerFile.txt")

You can even integrate with existing shell commands:

ls("~") | cmd("grep so")

Here, “ls” will list out files inside the home directory, then pipes it into regular grep on linux, which is then piped back into Python as a list of strings. So it’s equivalent to this bash statement:

ls | grep so

“cat”, “head”, “file”, “ls” and “cmd” are all classes extended from BaseCli. All of them implements the “reverse or” operation, or __ror__. So essentially, these 2 statements are equivalent:

3 | obj
obj.__ror__(3)

Also, a lot of these tools (like apply and filt) assume that we are operating on a table. So this table:

col1

col2

col3

1

2

3

4

5

6

Is equivalent to this list:

[["col1", "col2", "col3"], [1, 2, 3], [4, 5, 6]]

transpose and mtmS provides more flexible ways to transform a table structure (but usually involves more code).

Also, the expected way to use these tools is to import everything directly into the current environment, like this:

from k1lib.imports import *

If you just want clis without other baggage, you can do this:

from k1lib.cli import *

Because there are a lot of clis, you may sometimes unintentionally overwrite an exposed cli tool. No worries, every tool is also under the cli object, meaning you can use deref() or cli.deref().

Besides operating on string iterators alone, this package can also be extra meta, and operate on streams of strings, or streams of streams of anything. I think this is one of the most powerful concept of the cli workflow. If this interests you, check over this:

All cli tools should work fine with torch.Tensor, numpy.ndarray and pandas.core.series.Series, but k1lib actually modifies Numpy arrays and Pandas series deep down for it to work. This means that you can still do normal bitwise or with a numpy float value, and they work fine in all regression tests that I have, but you might encounter strange bugs. You can disable it manually by changing settings.startup.or_patch. If you chooses to do this, you have to be careful and use these workarounds:

# returns (2, 3, 5), works fine
torch.randn(2, 3, 5) | shape()
# will not work, returns weird numpy array of shape (2, 3, 5)
np.random.randn(2, 3, 5) | shape()
# returns (2, 3, 5), mitigation strategy #1
shape()(np.random.randn(2, 3, 5))
# returns (2, 3, 5), mitigation strategy #2
[np.random.randn(2, 3, 5)] | (item() | shape())

All cli-related settings are at settings.cli.

Where to start?

Core clis include:

Then other important, not necessarily core clis include:

So, start reading over what these do first, as you can pretty much 95% utilize everything the cli workflow has to offer with those alone. Then skim over basic conversions in module conv. While you’re doing that, checkout trace(), for a quite powerful debugging tool.

There are several written tutorials about cli here, and I also made some video tutorials as well, so go check those out.

For every example in the tutorials that you found, you might find it useful to follow the following debugging steps, to see how everything works:

# assume there's this piece of code:
A | B | C | D
# do this instead:
A | deref()
# once you understand it, do this:
A | B | deref()

# assume there's this piece of code:
A | B.all() | C
# do this instead:
A | item() | B | deref()
# once you understand it, you can move on:
A | B.all() | deref()

# assume there's this piece of code:
A | (B & C)
# do this instead:
A | B | deref()

# assume there's this piece of code:
A | (B + C)
# do these instead:
A | deref() | op()[0] | B | deref()
A | deref() | op()[1] | C | dereF()
# there are alternatives to that:
A | item() | B | deref()
A | rows(1) | item() | C | deref()

Finally, you can read over the summary below, see what catches your eye and check that cli out.

Summary

structural

conv

utils

typehint

filt

transpose

toStr

size

tBase

filt

reshape

toTensor

shape

tAny

inSet()

insert

toList

item

tList

contains()

splitW

toSet

iden

tIter

empty

joinStreams

toIter

join

tSet

isNumeric()

joinStreamsRandom

toRange

wrapList

tCollection

instanceOf()

activeSamples

toSum

equals

tExpand

head

table()

toProd

reverse

tNpArray

tail()

batched

toAvg

ignore

tTensor

columns

window

toMean

rateLimit

tListIterSet()

cut

groupBy

toMax

timeLimit

tListSet()

rows

insertColumn

toMin

tab()

tListIter()

intersection

insertIdColumn()

toPIL

indent()

tArrayTypes()

union

expandE

toImg

clipboard

inferType()

unique

unsqueeze()

toRgb

deref

TypeHintException

breakIf

count

toRgba

bindec

tLowest()

mask

permute

toBin

smooth

tCheck

accumulate

toIdx

disassemble()

tOpt

AA_

toDict

tree()

peek

toDictF

lookup

peekF

toFloat

dictFields

repeat

toInt

repeatF()

repeatFrom

oneHot

modifier

init

output

inp

kxml

applyS

BaseCli

stdout

cat()

node

aS

Table

tee

curl()

maxDepth

apply

T()

file

wget()

tags

applyMp

fastF()

pretty

ls()

pretty

parallel

yieldT()

display()

cmd

display

applyTh

serial

headOut()

requireCli()

applySerial

oneToMany

intercept

sort

mtmS

split

sortF

consume

randomize

stagger

op

integrate

nb

grep

kcsv

trace

optimizations

cells()

grep

cat()

trace

dummy()

pretty

grepTemplate

execute

bio module

This is for functions that are actually biology-related

k1lib.cli.bio.go(term: int)[source]

Looks up a GO term

k1lib.cli.bio.quality(log=True)[source]

Get numeric quality of sequence. Example:

# returns [2, 2, 5, 30]
"##&?" | quality() | deref()
Parameters

log – whether to use log scale (0 -> 40), or linear scale (1 -> 0.0001)

k1lib.cli.bio.longFa()[source]

Takes in a fasta file and put each sequence on 1 line. File “gene.fa”:

>AF086833.2 Ebola virus - Mayinga, Zaire, 1976, complete genome
CGGACACACAAAAAGAAAGAAGAATTTTTAGGATC
TTTTGTGTGCGAATAACTATGAGGAAGATTAATAA
>something other gene
CGGACACACAAAAAGAAAGAAGA
TTTTGTGTGCGAATAACTATGAG

Code:

cat("gene.fa") | bio.longFa() | cli.headOut()

Prints out:

>AF086833.2 Ebola virus - Mayinga, Zaire, 1976, complete genome
CGGACACACAAAAAGAAAGAAGAATTTTTAGGATCTTTTGTGTGCGAATAACTATGAGGAAGATTAATAA
>something other gene
CGGACACACAAAAAGAAAGAAGATTTTGTGTGCGAATAACTATGAG
class k1lib.cli.bio.idx(fs: list = [])[source]

Bases: k1lib.cli.init.BaseCli

Indexes files with various formats.

static blast(fileName: Optional[str] = None, dbtype: Optional[str] = None)[source]

Uses makeblastdb to create a blast database from a fasta file. Example:

"file.fa" | bio.idx.blast()
bio.idx.blast("file.fa")
static bwa(fileName: Optional[str] = None)[source]

Uses bwa to index a fasta file. Example:

"file.fa" | bio.idx.bwa()
bio.idx.bwa("file.bwa")
static bam(fileName: Optional[str] = None)[source]

Uses samtools to index a bam file. Example:

"file.bam" | bio.idx.bam()
bio.idx.bam("file.bam")
class k1lib.cli.bio.transcribe(fs: list = [])[source]

Bases: k1lib.cli.init.BaseCli

Transcribes (DNA -> RNA) incoming rows. Example:

# returns "AUCG"
"ATCG" | transcribe()
# returns ["AUCG"]
["ATCG"] | transcribe() | deref()
__ror__(it: Union[Iterator[str], str])[source]
class k1lib.cli.bio.complement(fs: list = [])[source]

Bases: k1lib.cli.init.BaseCli

Get the reverse complement of DNA. Example:

# returns "TAGC"
"ATCG" | bio.complement()
# returns ["TAGC"]
["ATCG"] | bio.complement() | deref()
__ror__(it: Union[Iterator[str], str])[source]
class k1lib.cli.bio.translate(length: int = 0)[source]

Bases: k1lib.cli.init.BaseCli

__init__(length: int = 0)[source]

Translates incoming rows.

Parameters

length – 0 for short (L), 1 for med (Leu), 2 for long (Leucine)

__ror__(it: Iterator[str])[source]
class k1lib.cli.bio.medAa(fs: list = [])[source]

Bases: k1lib.cli.init.BaseCli

Converts short aa sequence to medium one

__ror__(it: Iterator[str])[source]
class k1lib.cli.bio.longAa(fs: list = [])[source]

Bases: k1lib.cli.init.BaseCli

Converts short aa sequence to long one

__ror__(it: Iterator[str])[source]

cif module

All tools related to cif file format that describes protein structures. Expected to use behind the “cif” module name, like this:

from k1lib.imports import *
cif.cat("abc.cif")
k1lib.cli.cif.tables()[source]

Loads table info. Example:

"abc.cif" | cif.tables()

Result is a dictionary of table name -> dict(). That inner dictionary maps from column name to a list of elements. All columns should have the same number of rows.

conv module

This is for all short utilities that converts from 1 data type to another. They might feel they have different styles, as toFloat converts object iterator to float iterator, while toPIL converts single image url to single PIL image, whereas toSum converts float iterator into a single float value.

The general convention is, if the intended operation sounds simple (convert to floats, strings, types, …), then most likely it will convert iterator to iterator, as you can always use the function directly if you only want to apply it on 1 object.

If it sounds complicated (convert to PIL image, tensor, …) then most likely it will convert object to object. Lastly, there are some that just feels right to input an iterator and output a single object (like getting max, min, std, mean values).

class k1lib.cli.conv.toStr(column: Optional[int] = None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(column: Optional[int] = None)[source]

Converts every line to a string. Example:

# returns ['2', 'a']
[2, "a"] | toStr() | deref()
# returns [[2, 'a'], [3, '5']]
assert [[2, "a"], [3, 5]] | toStr(1) | deref()
__ror__(it: Iterator[str])[source]
class k1lib.cli.conv.toTensor(dtype=torch.float32)[source]

Bases: k1lib.cli.init.BaseCli

__init__(dtype=torch.float32)[source]

Converts generator to torch.Tensor. Essentially torch.tensor(list(it)).

Also checks if input is a PIL Image. If yes, turn it into a torch.Tensor and return.

__ror__(it: Iterator[float])torch.Tensor[source]
class k1lib.cli.conv.toList[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Converts generator to list. list would do the same, but this is just to maintain the style

__ror__(it: Iterator[Any])List[Any][source]
class k1lib.cli.conv.toSet[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Converts generator to set. set would do the same, but this is just to maintain the style

__ror__(it: Iterator[T])Set[T][source]
class k1lib.cli.conv.toIter[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Converts object to iterator. iter() would do the same, but this is just to maintain the style

__ror__(it: List[T])Iterator[T][source]
class k1lib.cli.conv.toRange[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Returns iter(range(len(it))), effectively

__ror__(it: Iterator[Any])Iterator[int][source]
class k1lib.cli.conv.toSum[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Calculates the sum of list of numbers. Can pipe in torch.Tensor or numpy.ndarray. Example:

# returns 45
range(10) | toSum()
__ror__(it: Iterator[float])[source]
class k1lib.cli.conv.toProd[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Calculates the product of a list of numbers. Can pipe in torch.Tensor or numpy.ndarray. Example:

# returns 362880
range(1,10) | toProd()
__ror__(it)[source]
class k1lib.cli.conv.toAvg[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Calculates average of list of numbers. Can pipe in torch.Tensor or numpy.ndarray. Example:

# returns 4.5
range(10) | toAvg()
# returns nan
[] | toAvg()
__ror__(it: Iterator[float])[source]
k1lib.cli.conv.toMean

alias of k1lib.cli.conv.toAvg

class k1lib.cli.conv.toMax[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Calculates the max of a bunch of numbers. Can pipe in torch.Tensor or numpy.ndarray. Example:

# returns 6
[2, 5, 6, 1, 2] | toMax()
__ror__(it: Iterator[float])float[source]
class k1lib.cli.conv.toMin[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Calculates the min of a bunch of numbers. Can pipe in torch.Tensor or numpy.ndarray. Example:

# returns 1
[2, 5, 6, 1, 2] | toMin()
__ror__(it: Iterator[float])float[source]
class k1lib.cli.conv.toPIL[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Converts a path to a PIL image. Example:

ls(".") | toPIL().all() | item() # get first image
__ror__(path)PIL.Image.Image[source]
k1lib.cli.conv.toImg

alias of k1lib.cli.conv.toPIL

class k1lib.cli.conv.toRgb[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Converts greyscale/rgb PIL image to rgb image. Example:

# reads image file and converts it to rgb
"a.png" | toPIL() | toRgb()
__ror__(i)[source]
class k1lib.cli.conv.toRgba[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Converts random PIL image to rgba image. Example:

# reads image file and converts it to rgba
"a.png" | toPIL() | toRgba()
__ror__(i)[source]
class k1lib.cli.conv.toBin[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Converts integer to binary string. Example:

# returns "101"
5 | toBin()
__ror__(it)[source]
class k1lib.cli.conv.toIdx(chars: str)[source]

Bases: k1lib.cli.init.BaseCli

__init__(chars: str)[source]

Get index of characters according to a reference. Example:

# returns [1, 4, 4, 8]
"#&&*" | toIdx("!#$%&'()*+") | deref()
__ror__(it)[source]
class k1lib.cli.conv.toDict[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Converts 2 Iterators, 1 key, 1 value into a dictionary. Example:

# returns {1: 3, 2: 4}
[[1, 2], [3, 4]] | toDict()
__ror__(it: Tuple[Iterator[T], Iterator[T]])dict[source]
class k1lib.cli.conv.toDictF(keyF: Optional[Callable[[Any], str]] = None, valueF: Optional[Callable[[Any], Any]] = None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(keyF: Optional[Callable[[Any], str]] = None, valueF: Optional[Callable[[Any], Any]] = None)[source]

Transform an incoming stream into a dict using a function for values. Example:

names = ["wanda", "vision", "loki", "mobius"]
names | toDictF(valueF=lambda s: len(s)) # will return {"wanda": 5, "vision": 6, ...}
names | toDictF(lambda s: s.title(), lambda s: len(s)) # will return {"Wanda": 5, "Vision": 6, ...}
__ror__(keys: Iterator[Any])Dict[Any, Any][source]
class k1lib.cli.conv.toFloat(*columns, mode=2)[source]

Bases: k1lib.cli.init.BaseCli

__init__(*columns, mode=2)[source]

Converts every row into a float. Example:

# returns [1, 3, -2.3]
["1", "3", "-2.3"] | toFloat() | deref()
# returns [[1.0, 'a'], [2.3, 'b'], [8.0, 'c']]
[["1", "a"], ["2.3", "b"], [8, "c"]] | toFloat(0) | deref()

With weird rows:

# returns [[1.0, 'a'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0) | deref()
# returns [[1.0, 'a'], [0.0, 'b'], [8.0, 'c']]
[["1", "a"], ["c", "b"], [8, "c"]] | toFloat(0, force=True) | deref()

This also works well with torch.Tensor and numpy.ndarray, as they will not be broken up into an iterator:

# returns a numpy array, instead of an iterator
np.array(range(10)) | toFloat()
Parameters
  • columns – if nothing, then will convert each row. If available, then convert all the specified columns

  • mode – different conversion styles - 0: simple float() function, fastest, but will throw errors if it can’t be parsed - 1: if there are errors, then replace it with zero - 2: if there are errors, then eliminate the row

__ror__(it)[source]
class k1lib.cli.conv.toInt(*columns, mode=2)[source]

Bases: k1lib.cli.init.BaseCli

__init__(*columns, mode=2)[source]

Converts every row into an integer. Example:

# returns [1, 3, -2]
["1", "3", "-2.3"] | toInt() | deref()
Parameters
  • columns – if nothing, then will convert each row. If available, then convert all the specified columns

  • mode – different conversion styles - 0: simple float() function, fastest, but will throw errors if it can’t be parsed - 1: if there are errors, then replace it with zero - 2: if there are errors, then eliminate the row

See also: toFloat()

__ror__(it)[source]

mgi module

All tools related to the MGI database. Expected to use behind the “mgi” module name, like this:

from k1lib.imports import *
["SOD1", "AMPK"] | mgi.batch()
class k1lib.cli.mgi.batch(headless=True)[source]

Bases: k1lib.cli.init.BaseCli

Queries MGI database, convert list of genes to MGI ids

__init__(headless=True)[source]
Parameters

headless – whether to run this operation headless, or actually display the browser

__ror__(it: List[str])[source]

filt module

This is for functions that cuts out specific parts of the table

class k1lib.cli.filt.filt(predicate: Callable[[T], bool], column: Optional[int] = None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(predicate: Callable[[T], bool], column: Optional[int] = None)[source]

Filters out lines. Examples:

# returns [2, 6]
[2, 3, 5, 6] | filt(lambda x: x%2 == 0) | deref()
# returns [3, 5]
[2, 3, 5, 6] | ~filt(lambda x: x%2 == 0) | deref()
# returns [[2, 'a'], [6, 'c']]
[[2, "a"], [3, "b"], [5, "a"], [6, "c"]] | filt(lambda x: x%2 == 0, 0) | deref()

You can also pass in op, for extra intuitiveness:

# returns [2, 6]
[2, 3, 5, 6] | filt(op() % 2 == 0) | deref()
# returns ['abc', 'a12']
["abc", "def", "a12"] | filt(op().startswith("a")) | deref()
# returns [3, 4, 5, 6, 7, 8, 9]
range(100) | filt(3 <= op() < 10) | deref()

If you need more extensive filtering capabilities, check out grep

Parameters

column

  • if integer, then predicate(row[column])

  • if None, then predicate(row)

__ror__(it: Iterator[T])Iterator[T][source]
__invert__()[source]

Negate the condition

k1lib.cli.filt.inSet(values: Set[Any], column: Optional[int] = None)k1lib.cli.filt.filt[source]

Filters out lines that is not in the specified set. Example:

# returns [2, 3]
range(5) | inSet([2, 8, 3]) | deref()
# returns [0, 1, 4]
range(5) | ~inSet([2, 8, 3]) | deref()

You can also use op like this, so you don’t have to remember this cli:

# returns [2, 3]
range(5) | filt(op() in [2, 8, 3]) | deref()
# returns [0, 1, 4]
range(5) | ~filt(op() in [2, 8, 3]) | deref()

However, this feature is very experimental

k1lib.cli.filt.contains(s: str, column: Optional[int] = None)k1lib.cli.filt.filt[source]

Filters out lines that don’t contain the specified substring. Sort of similar to grep, but this is simpler, and can be inverted. Example:

# returns ['abcd', '2bcr']
["abcd", "0123", "2bcr"] | contains("bc") | deref()

You can also use op like this:

# returns ['abcd', '2bcr']
["abcd", "0123", "2bcr"] | filt("bc" in op()) | deref()
class k1lib.cli.filt.empty(reverse=False)[source]

Bases: k1lib.cli.init.BaseCli

__init__(reverse=False)[source]

Filters out streams that is not empty. Almost always used inverted, but “empty” is a short, sweet name that’s easy to remember. Example:

# returns [[1, 2], ['a']]
[[], [1, 2], [], ["a"]] | ~empty() | deref()
Parameters

reverse – not intended to be used by the end user. Do ~empty() instead.

__ror__(streams: Iterator[Iterator[T]])Iterator[Iterator[T]][source]
__invert__()[source]
k1lib.cli.filt.isNumeric(column: Optional[int] = None)k1lib.cli.filt.filt[source]

Filters out a line if that column is not a number. Example:

# returns [0, 2, '3']
[0, 2, "3", "a"] | isNumeric() | deref()
k1lib.cli.filt.instanceOf(cls: Union[type, Tuple[type]], column: Optional[int] = None)k1lib.cli.filt.filt[source]

Filters out lines that is not an instance of the given type. Example:

# returns [2]
[2, 2.3, "a"] | instanceOf(int) | deref()
# returns [2, 2.3]
[2, 2.3, "a"] | instanceOf((int, float)) | deref()
class k1lib.cli.filt.head(n=10)[source]

Bases: k1lib.cli.init.BaseCli

__init__(n=10)[source]

Only outputs first n lines. You can also negate it (like ~head(5)), which then only outputs after first n lines. Examples:

"abcde" | head(2) | deref() # returns ["a", "b"]
"abcde" | ~head(2) | deref() # returns ["c", "d", "e"]
"0123456" | head(-3) | deref() # returns ['0', '1', '2', '3']
"0123456" | ~head(-3) | deref() # returns ['4', '5', '6']
"012" | head(None) | deref() # returns ['0', '1', '2']
"012" | ~head(None) | deref() # returns []

You can also pass in fractional head:

range(20) | head(0.25) | deref() # returns [0, 1, 2, 3, 4], or the first 25% of samples

Also works well and fast with numpy.ndarray, torch.Tensor and other sliceable types:

# returns (10,)
np.linspace(1, 3) | head(10) | shape()
__ror__(it: Iterator[T])Iterator[T][source]
__invert__()[source]
k1lib.cli.filt.tail(n: int = 10)[source]

Basically an inverted head. Examples:

range(10) | tail(3) | deref() # returns [7, 8, 9]
class k1lib.cli.filt.columns(*columns: List[int])[source]

Bases: k1lib.cli.init.BaseCli

__init__(*columns: List[int])[source]

Cuts out specific columns, sliceable. Examples:

["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']]
["0123456789"] | cut(2) | deref() # returns ['2']
["0123456789"] | cut(5, 8) | deref() # returns [['5', '8']]
["0123456789"] | ~cut()[:7:2] | deref() # returns [['1', '3', '5', '7', '8', '9']]

If you’re selecting only 1 column, then Iterator[T] will be returned, not Table[T].

__ror__(it: k1lib.cli.init.Table[T])k1lib.cli.init.Table[T][source]
__invert__()[source]
k1lib.cli.filt.cut

alias of k1lib.cli.filt.columns

class k1lib.cli.filt.rows(*rows: List[int])[source]

Bases: k1lib.cli.init.BaseCli

__init__(*rows: List[int])[source]

Cuts out specific rows. Space complexity O(1) as a list is not constructed (unless you’re using some really weird slices).

Parameters

rows – ints for the row indices

Example:

"0123456789" | rows(2) | toList() # returns ["2"]
"0123456789" | rows(5, 8) | toList() # returns ["5", "8"]
"0123456789" | rows()[2:5] | toList() # returns ["2", "3", "4"]
"0123456789" | ~rows()[2:5] | toList() # returns ["0", "1", "5", "6", "7", "8", "9"]
"0123456789" | ~rows()[:7:2] | toList() # returns ['1', '3', '5', '7', '8', '9']
"0123456789" | rows()[:-4] | toList() # returns ['0', '1', '2', '3', '4', '5']
"0123456789" | ~rows()[:-4] | toList() # returns ['6', '7', '8', '9']
__invert__()[source]
__ror__(it: Iterator[str])[source]
class k1lib.cli.filt.intersection[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Returns the intersection of multiple streams. Example:

# returns set([2, 4, 5])
[[1, 2, 3, 4, 5], [7, 2, 4, 6, 5]] | intersection()
__ror__(its: Iterator[Iterator[Any]])Set[Any][source]
class k1lib.cli.filt.union[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Returns the union of multiple streams. Example:

# returns {0, 1, 2, 10, 11, 12, 13, 14}
[range(3), range(10, 15)] | union()
__ror__(its: Iterator[Iterator[Any]])Set[Any][source]
class k1lib.cli.filt.unique(column: int)[source]

Bases: k1lib.cli.init.BaseCli

__init__(column: int)[source]

Filters out non-unique row elements. Example:

# returns [[1, "a"], [2, "a"]]
[[1, "a"], [2, "a"], [1, "b"]] | unique(0) | deref()
Parameters

column – doesn’t have the default case of None, because you can always use k1lib.cli.conv.toSet

__ror__(it: k1lib.cli.init.Table[T])k1lib.cli.init.Table[T][source]
class k1lib.cli.filt.breakIf(f)[source]

Bases: k1lib.cli.init.BaseCli

__init__(f)[source]

Breaks the input iterator if a condition is met. Example:

# returns [0, 1, 2, 3, 4, 5]
[*range(10), 2, 3] | breakIf(lambda x: x > 5) | deref()
__ror__(it: Iterator[T])Iterator[T][source]
class k1lib.cli.filt.mask(mask: Iterator[bool])[source]

Bases: k1lib.cli.init.BaseCli

__init__(mask: Iterator[bool])[source]

Masks the input stream. Example:

# returns [0, 1, 3]
range(5) | mask([True, True, False, True, False]) | deref()
# returns torch.tensor([0, 1, 3])
torch.tensor(range(5)) | mask([True, True, False, True, False])
__ror__(it)[source]

gb module

All tools related to GenBank file format. Expected to use behind the “gb” module name, like this:

from k1lib.imports import *
cat("abc.gb") | gb.feats()
class k1lib.cli.gb.feats(fs: list = [])[source]

Bases: k1lib.cli.init.BaseCli

Fetches features, each on a separate stream. Example:

cat("a.gb") | gb.feats()

Output example:

[['     source          1..248956422',
  '                     /organism="Homo sapiens"',
  '                     /mol_type="genomic DNA"',
  '                     /db_xref="taxon:9606"',
  '                     /chromosome="1"'],
 ['     gene            11874..14409',
  '                     /gene="DDX11L1"',
  '                     /note="DEAD/H-box helicase 11 like 1 (pseudogene); Derived',
  '                     by automated computational analysis using gene prediction',
  '                     method: BestRefSeq."',
  '                     /pseudo',
  '                     /db_xref="GeneID:100287102"',
  '                     /db_xref="HGNC:HGNC:37102"']]
__ror__(it)[source]
static filt(*terms: str)k1lib.cli.init.BaseCli[source]

Filters for specific terms in all the features texts. If there are multiple terms, then filters for first term, then second, then third, so the term’s order might matter to you. Example:

[['     source          1..248956422',
  '                     /organism="Homo sapiens"',
  '                     /mol_type="genomic DNA"',
  '                     /db_xref="taxon:9606"',
  '                     /chromosome="1"'],
 ['     gene            11874..14409',
  '                     /gene="DDX11L1"',
  '                     /note="DEAD/H-box helicase 11 like 1 (pseudogene); Derived',
  '                     by automated computational analysis using gene prediction',
  '                     method: BestRefSeq."',
  '                     /pseudo',
  '                     /db_xref="GeneID:100287102"',
  '                     /db_xref="HGNC:HGNC:37102"']] | gb.feats.filt("mol_type")

Output:

[['     source          1..248956422',
  '                     /organism="Homo sapiens"',
  '                     /mol_type="genomic DNA"',
  '                     /db_xref="taxon:9606"',
  '                     /chromosome="1"']]
static root()k1lib.cli.init.BaseCli[source]

Gets root (top most unnamed tag) of a feature. Example:

['     misc_RNA        complement(join(14362..14829,14970..15038,15796..15947,',
 '                     16607..16765,16858..17055,17233..17368,17606..17742,',
 '                     17915..18061,18268..18366,24738..24891,29321..29370))',
 '                     /gene="WASH7P"',
 '                     /gene_synonym="FAM39F; WASH5P"',
 '                     /product="WASP family homolog 7, pseudogene"',
 '                     /note="Derived by automated computational analysis using',
 '                     gene prediction method: BestRefSeq."',
 '                     /pseudo',
 '                     /transcript_id="NR_024540.1"',
 '                     /db_xref="GeneID:653635"',
 '                     /db_xref="HGNC:HGNC:38034"'] | feats.root()

Output:

['misc_RNA',
 ['complement(join(14362..14829,14970..15038,15796..15947,',
  '16607..16765,16858..17055,17233..17368,17606..17742,',
  '17915..18061,18268..18366,24738..24891,29321..29370))']]
static tags(*tags: List[str])k1lib.cli.init.BaseCli[source]

Grabs a list of tags. Example:

s = ['     misc_RNA        complement(join(14362..14829,14970..15038,15796..15947,',
     '                     16607..16765,16858..17055,17233..17368,17606..17742,',
     '                     17915..18061,18268..18366,24738..24891,29321..29370))',
     '                     /gene="WASH7P"',
     '                     /gene_synonym="FAM39F; WASH5P"',
     '                     /product="WASP family homolog 7, pseudogene"',
     '                     /note="Derived by automated computational analysis using',
     '                     gene prediction method: BestRefSeq."',
     '                     /pseudo',
     '                     /transcript_id="NR_024540.1"',
     '                     /db_xref="GeneID:653635"',
     '                     /db_xref="HGNC:HGNC:38034"']
s | feats.tags()

Output:

[['gene', 'WASH7P'],
 ['gene_synonym', 'FAM39F; WASH5P'],
 ['product', 'WASP family homolog 7, pseudogene'],
 ['note',
  'Derived by automated computational analysis using gene prediction method: BestRefSeq.'],
 ['pseudo', ''],
 ['transcript_id', 'NR_024540.1'],
 ['db_xref', 'GeneID:653635'],
 ['db_xref', 'HGNC:HGNC:38034']]

With filters:

# returns [['gene', 'WASH7P'], ['db_xref', 'HGNC:HGNC:38034'], ['organism', '']]
s | feats.tags("gene", "db_xref", "organism")
class k1lib.cli.gb.origin(fs: list = [])[source]

Bases: k1lib.cli.init.BaseCli

Return the origin section of the genbank file. Example:

# returns single fasta string
cat("a.gb") | gb.origin()
__ror__(it)[source]

grep module

class k1lib.cli.grep.grep(pattern: Union[str, Callable[[Any], bool]], before: int = 0, after: int = 0, N: int = inf, sep: bool = False, col: Optional[int] = None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(pattern: Union[str, Callable[[Any], bool]], before: int = 0, after: int = 0, N: int = inf, sep: bool = False, col: Optional[int] = None)[source]

Find lines that has the specified pattern. Example:

# returns ['d', 'd']
"abcde12d34" | grep("d") | deref()
# returns ['c', 'd', '2', 'd'], 2 sections of ['c', 'd'] and ['2', 'd']
"abcde12d34" | grep("d", 1) | deref()
# returns ['c', 'd']
"abcde12d34" | grep("d", 1, N=1) | deref()
# returns ['d', 'e', 'd', '3', '4'], 2 sections of ['d', 'e'] and ['d', '3', '4']
"abcde12d34" | grep("d", 0, 3).till("e") | deref()
# returns [['0', '1', '2'], ['3', '1', '4']]
"0123145" | grep("1", 2, 1, sep=True) | deref()

You can also separate out the sections:

# returns [['c', 'd'], ['2', 'd']]
"abcde12d34" | grep("d", 1, sep=True) | deref()
# returns [['c', 'd']]
"abcde12d34" | grep("d", 1, N=1, sep=True) | deref()
# returns [['1', '2', '3'], ['1', '4', '5']]
"0123145" | grep("1", sep=True).till() | deref()

You can also put in predicates instead of regex patterns:

# returns ['d', 'd']
"abcde12d34" | grep(lambda x: x == "d") | deref()
# also returns ['d', 'd']
"abcde12d34" | filt(lambda x: x == "d") | deref()
# returns ['d', 'e', 'd', '3', '4']
"abcde12d34" | grep(lambda x: x == "d").till(lambda x: x == "e") | deref()

The first scenario looks like a regular filter function, already implemented by filt, but grep brings in more clustering features for the price of reduced execution speed. So for simple scenarios it’s advised that you use filt.

See also: groupBy

Also, there’s a whole tutorial devoted to just this cli

Parameters
  • pattern – regex pattern to search for in a line

  • before – lines before the hit. Outputs independent lines

  • after – lines after the hit. Outputs independent lines

  • N – max sections to output

  • sep – whether to separate out the sections as lists

  • col – searches for pattern in a specific column

till(pattern: Optional[Union[str, Callable[[Any], bool]]] = None)[source]

Greps until some other pattern appear. Inclusive, so you might want to trim the last line. Example:

# returns ['5', '6', '7', '8'], includes last item
range(10) | join("") | grep("5").till("8") | deref()
# returns ['d', 'e', 'd', '3', '4']
"abcde12d34" | grep("d").till("e") | deref()
# returns ['d', 'e']
"abcde12d34" | grep("d", N=1).till("e") | deref()

If initial pattern and till pattern are the same, then you don’t have use this method at all. Instead, do something like this:

# returns ['1', '2', '3']
"0123145" | grep("1", after=1e9, N=1) | deref()
__ror__(it: Iterator[str])Iterator[str][source]
class k1lib.cli.grep.grepTemplate(pattern: str, template: str)[source]

Bases: k1lib.cli.init.BaseCli

__init__(pattern: str, template: str)[source]

Searches over all lines, pick out the match, and expands it to the templateand yields

__ror__(it: Iterator[str])[source]

init module

class k1lib.cli.init.BaseCli(fs: list = [])[source]

Bases: object

A base class for all the cli stuff. You can definitely create new cli tools that have the same feel without extending from this class, but advanced stream operations (like +, &, .all(), |) won’t work.

At the moment, you don’t have to call super().__init__() and super().__ror__(), as __init__’s only job right now is to solidify any op passed to it, and __ror__ does nothing.

__init__(fs: list = [])[source]

Not expected to be instantiated by the end user.

fs param

Expected to use it like this:

class A(BaseCli):
    def __init__(self, f):
        fs = [f]; super().__init__(fs); self.f = fs[0]

Where f is some (potentially exotic) function. This will replace f with a “normal” function that’s executable. See source code of filt for an example of why this is useful. Currently, it will:

  • Replace with last recorded 4 in op(), if f is True, because Python does not allow returning complex objects from __contains__ method

  • Solidifies every op.

hint(_hint: k1lib.cli.typehint.tBase)[source]

Specifies output type hint.

property hasHint
__and__(cli: k1lib.cli.init.BaseCli)k1lib.cli.init.oneToMany[source]

Duplicates input stream to multiple joined clis. Example:

# returns [[5], [0, 1, 2, 3, 4]]
range(5) | (shape() & iden()) | deref()

Kinda like apply. There’re just multiple ways of doing this. This I think, is more intuitive, and apply is more for lambdas and columns mode. Performances are pretty much identical.

__add__(cli: k1lib.cli.init.BaseCli)k1lib.cli.init.mtmS[source]

Parallel pass multiple streams to multiple clis. Example:

# returns [8, 15]
[2, 3] | ((op() * 4) + (op() * 5)) | deref()
all(n: int = 1)k1lib.cli.init.BaseCli[source]

Applies this cli to all incoming streams. Example:

# returns (3,)
torch.randn(3, 4) | toMean().all() | shape()
# returns (3, 4)
torch.randn(3, 4, 5) | toMean().all(2) | shape()
Parameters

n – how many times should I chain .all()?

__or__(cli)k1lib.cli.init.serial[source]

Joins clis end-to-end. Example:

c = apply(op() ** 2) | deref()
# returns [0, 1, 4, 9, 16]
range(5) | c
__ror__(it)[source]
f()k1lib.cli.init.Table[k1lib.cli.init.Table[int]][source]

Creates a normal function \(f(x)\) which is equivalent to x | self.

__lt__(it)[source]

Backup pipe symbol >, purely for style, so that you can do something like this:

range(4) > file("a.txt")
__call__(it, *args)[source]

Another way to do it | cli. If multiple arguments are fed, then the argument list is passed to cli instead of just the first element. Example:

@applyS
def f(it):
    return it
f(2) # returns 2
f(2, 3) # returns [2, 3]
k1lib.cli.init.yieldT

Object often used as a sentinel, or an identifying token in lots of clis, including that can be yielded in a stream to ignore this stream for the moment in joinStreamsRandom, deref, tCheck and tOpt

k1lib.cli.init.fastF(c, x=None)[source]

Tries to figure out what’s going on, is it a normal function, or an applyS, or a BaseCli, etc., and return a really fast function for execution. Example:

# both returns 16, fastF returns "lambda x: x**2", so it's really fast
fastF(op()**2)(4)
fastF(applyS(lambda x: x**2))(4)

At the moment, parameter x does nothing, but potentially in the future, you can pass in an example input to the cli, so that this returns an optimized, C compiled version.

Parameters

x – sample data for the cli

class k1lib.cli.init.serial(*clis: List[k1lib.cli.init.BaseCli])[source]

Bases: k1lib.cli.init.BaseCli

__init__(*clis: List[k1lib.cli.init.BaseCli])[source]

Merges clis into 1, feeding end to end. Used in chaining clis together without a prime iterator. Meaning, without this, stuff like this fails to run:

[1, 2] | a() | b() # runs
c = a() | b(); [1, 2] | c # doesn't run if this class doesn't exist
__ror__(it: Iterator[Any])Iterator[Any][source]
class k1lib.cli.init.oneToMany(*clis: List[k1lib.cli.init.BaseCli])[source]

Bases: k1lib.cli.init.BaseCli

__init__(*clis: List[k1lib.cli.init.BaseCli])[source]

Duplicates 1 stream into multiple streams, each for a cli in the list. Used in the “a & b” joining operator. See also: BaseCli.__and__()

__ror__(it: Iterator[Any])Iterator[Iterator[Any]][source]
class k1lib.cli.init.mtmS(*clis: List[k1lib.cli.init.BaseCli])[source]

Bases: k1lib.cli.init.BaseCli

__init__(*clis: List[k1lib.cli.init.BaseCli])[source]

Applies multiple streams to multiple clis independently. Used in the “a + b” joining operator. See also: BaseCli.__add__().

Weird name is actually a shorthand for “many to many specific”.

__ror__(its: Iterator[Any])Iterator[Any][source]
static f(f, i: int, n: int = 100)[source]

Convenience method, so that this:

mtmS(iden(), op()**2, iden(), iden(), iden())
# also the same as this btw:
(iden() + op()**2 + iden() + iden() + iden())

is the same as this:

mtmS.f(op()**2, 1, 5)

Example:

# returns [5, 36, 7, 8, 9]
range(5, 10) | mtmS.f(op()**2, 1, 5) | deref()
Parameters
  • i – where should I put the function?

  • n – how many clis in total? Defaulted to 100

inp module

This module for tools that will likely start the processing stream.

k1lib.cli.inp.cat(fileName: Optional[str] = None, text: bool = True, _all=False)[source]

Reads a file line by line. Example:

# display first 10 lines of file
cat("file.txt") | headOut()
# piping in also works
"file.txt" | cat() | headOut()
# recommended to insert a `tOpt()` in the middle and `yieldT` in the end, to do lots of optimizations
"file.txt" | tOpt() | cat() | headOut() | yieldT

# rename file
cat("img.png", False) | file("img2.png")
Parameters
  • fileName – if None, then return a BaseCli that accepts a file name and outputs Iterator[str]

  • text – if True, read text file, else read binary file

  • _all – if True, read entire file at once, instead of reading line-by-line. Faster, but uses more memory. Only works with text mode, binary mode always read the entire file

k1lib.cli.inp.curl(url: str)Iterator[str][source]

Gets file from url. File can’t be a binary blob. Example:

# prints out first 10 lines of the website
curl("https://k1lib.github.io/") | headOut()
k1lib.cli.inp.wget(url: str, fileName: Optional[str] = None)[source]

Downloads a file. Also returns the file name, in case you want to pipe it to something else.

Parameters
  • url – The url of the file

  • fileName – if None, then tries to infer it from the url

k1lib.cli.inp.ls(folder: Optional[str] = None)[source]

List every file and folder inside the specified folder. Example:

# returns List[str]
ls("/home")
# same as above
"/home" | ls()
# only outputs files, not folders
ls("/home") | filt(os.path.isfile)
class k1lib.cli.inp.cmd(cmd: str, mode: int = 1, text=True, block=False)[source]

Bases: k1lib.cli.init.BaseCli

__init__(cmd: str, mode: int = 1, text=True, block=False)[source]

Runs a command, and returns the output line by line. Can pipe in some inputs. If no inputs then have to pipe in None. Example:

# return detailed list of files
None | cmd("ls -la")
# return list of files that ends with "ipynb"
None | cmd("ls -la") | cmd('grep ipynb$')

It might be tiresome to pipe in None all the time. So, you can use “>” operator to yield values right away:

# prints out first 10 lines of list of files
cmd("ls -la") > headOut()

If you’re using Jupyter notebook/lab, then if you were to display a cmd object, it will print out the outputs. So, a single command cmd("mkdir") displayed at the end of a cell is enough to trigger creating the directory.

Reminder that “>” operator in here sort of has a different meaning to that of BaseCli. So you kinda have to becareful about this:

# returns a serial cli, cmd not executed
cmd("ls -la") | deref()
# executes cmd with no input stream and pipes output to deref
cmd("ls -la") > deref()
# returns a serial cli
cmd("ls -la") > grep("txt") > headOut()
# executes pipeline
cmd("ls -la") > grep("txt") | headOut()

General advice is, right ater a cmd, use “>”, and use “|” everywhere else.

Let’s see a few more exotic examples. File a.sh:

#!/bin/bash

echo 1; sleep 0.5
echo This message goes to stderr >&2
echo 2; sleep 0.5
echo $(</dev/stdin)
sleep 0.5; echo 3

Examples:

# returns [b'1', b'2', b'45', b'3'] and prints out the error message
"45" | cmd("./a.sh", text=False) | deref()
# returns [b'This message goes to stderr']
"45" | cmd("./a.sh", mode=2, text=False) | deref()
# returns [[b'1', b'2', b'45', b'3'], [b'This message goes to stderr']]
"45" | cmd("./a.sh", mode=0, text=False) | deref()

Performance-wise, stdout and stderr will yield values right away as soon as the process outputs it, so you get real time feedback. However, this will convert the entire input into a bytes object, and not feed it bit by bit lazily, so if you have a humongous input, it might slow you down a little.

Also, because stdout and stderr yield values right away, it means that if you want the operation to be blocking until finished, you have to consume the output:

None | cmd("mkdir abc")
# might fail, because this might get executed before the previous line
None | cmd("echo a>abc/rg.txt")

None | cmd("mkdir abc") | ignore()
# will succeed, because this will be guaranteed to execute after the previous line
None | cmd("echo a>abc/rg.txt")

Settings: - cli.quiet: if True, won’t display errors in mode 1

Parameters
  • mode – if 0, returns (stdout, stderr). If 1, returns stdout and prints stderr if there are any errors. If 2, returns stderr

  • text – whether to decode the outputs into str or return raw bytes

  • block – whether to wait for the task to finish before returning to Python or not

__ror__(it: Union[None, str, bytes, Iterator[Any]])Iterator[Union[str, bytes]][source]

Pipes in lines of input, or if there’s nothing to pass, then pass None

k1lib.cli.inp.requireCli(cliTool: str)[source]

Searches for a particular cli tool (eg. “ls”), throws ImportError if not found, else do nothing

kcsv module

All tools related to csv file format. Expected to use behind the “kcsv” module name, like this:

from k1lib.imports import *
kcsv.cat("file.csv") | display()
k1lib.cli.kcsv.cat(file: Optional[str] = None)k1lib.cli.init.Table[str][source]

Opens a csv file, and turns them into nice row elements

kxml module

All tools related to xml file format. Expected to use behind the “kxml” module name, like this:

from k1lib.imports import *
cat("abc.xml") | kxml.node() | kxml.display()
class k1lib.cli.kxml.node(fs: list = [])[source]

Bases: k1lib.cli.init.BaseCli

Turns lines into a single node. Example:

s = """
<html>
    <head>
        <style></style>
    </head>
    <body>
        <div></div>
    </body>
</html>"""
# returns root node
s | kxml.node()
# same thing as above, demonstrating you can pipe in list of strings
s.split("\n") | kxml.node()
__ror__(it: Iterator[str])xml.etree.ElementTree.Element[source]
class k1lib.cli.kxml.maxDepth(depth: Optional[int] = None, copy: bool = True)[source]

Bases: k1lib.cli.init.BaseCli

__init__(depth: Optional[int] = None, copy: bool = True)[source]

Filters out too deep nodes. Example:

# returns root node, but prunes children deeper than the specified depth
s | kxml.node() | kxml.maxDepth()
Parameters
  • depth – max depth to include in

  • copy – whether to limit the nodes itself, or limit a copy

__ror__(node: xml.etree.ElementTree.Element)xml.etree.ElementTree.Element[source]
class k1lib.cli.kxml.tags(*tags: List[str], nested=False)[source]

Bases: k1lib.cli.init.BaseCli

__init__(*tags: List[str], nested=False)[source]

Finds all tags that have a particular name.. Example:

s = """
<EXPERIMENT_PACKAGE_SET>
  <EXPERIMENT_PACKAGE>
    <EXPERIMENT_PACKAGE/>
    <Pool/>
    <RUN_SET/>
  </EXPERIMENT_PACKAGE>
  <EXPERIMENT_PACKAGE>
    <Pool/>
    <RUN_SET/>
  </EXPERIMENT_PACKAGE>
</EXPERIMENT_PACKAGE_SET>"""

# returns a list of "Pool" tags (with 2 elements) that are 2 levels deep
s | kxml.node() | kxml.tags("Pool") | toList()
# returns list with 2 tags
s | kxml.node() | kxml.tags("EXPERIMENT_PACKAGE")
# returns list with 3 tags
s | kxml.node() | kxml.tags("EXPERIMENT_PACKAGE", nested=True)
Parameters

nested – whether to search for “div” tag inside of another “div” tag

__ror__(node: xml.etree.ElementTree.Element)Iterator[xml.etree.ElementTree.Element][source]
class k1lib.cli.kxml.pretty(indent: Optional[str] = None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(indent: Optional[str] = None)[source]

Converts the element into a list of xml strings, and make them pretty. Example:

# prints out the element
s | kxml.node() | kxml.pretty() | stdout()
__ror__(it: xml.etree.ElementTree.Element)Iterator[str][source]
class k1lib.cli.kxml.display(depth: int = 3, lines: int = 20)[source]

Bases: k1lib.cli.init.BaseCli

__init__(depth: int = 3, lines: int = 20)[source]

Convenience method for getting head, make it pretty and print it out. Example:

# prints out the element
s | kxml.node() | kxml.display()
Parameters
  • depth – prune tags deeper than the specified depth. Put “None” to not prune at all

  • lines – max number of lines to print out. Put “None” if you want to display everything

__ror__(it: xml.etree.ElementTree.Element, lines=10)[source]

modifier module

This is for quick modifiers, think of them as changing formats

class k1lib.cli.modifier.applyS(f: Callable[[T], T], *args, **kwargs)[source]

Bases: k1lib.cli.init.BaseCli

__init__(f: Callable[[T], T], *args, **kwargs)[source]

Like apply, but much simpler, just operating on the entire input object, essentially. The “S” stands for “single”. There’s also an alias shorthand for this called aS. Example:

# returns 5
3 | aS(lambda x: x+2)

Like apply, you can also use this as a decorator like this:

@aS
def f(x):
    return x+2
# returns 5
3 | f

This also decorates the returned object so that it has same qualname, docstring and whatnot.

Parameters
  • f – the function to be executed

  • kwargs – other keyword arguments to pass to the function, together with args

__ror__(it: T)T[source]
__invert__()[source]

Configures it so that it expand the arguments out. Example:

# returns 5
[2, 3] | ~aS(lambda x, y: x + y)

def f(x, y, a=4):
    return x*y + a
# returns 10
[2, 3] | ~aS(f)
# returns 11
[2, 3] | ~aS(f, a=5)
k1lib.cli.modifier.aS

alias of k1lib.cli.modifier.applyS

class k1lib.cli.modifier.apply(f: Callable[[T], T], column: Optional[int] = None, cache: int = 0)[source]

Bases: k1lib.cli.init.BaseCli

__init__(f: Callable[[T], T], column: Optional[int] = None, cache: int = 0)[source]

Applies a function f to every line. Example:

# returns [0, 1, 4, 9, 16]
range(5) | apply(lambda x: x**2) | deref()
# returns [[3.0, 1.0, 1.0], [3.0, 1.0, 1.0]]
torch.ones(2, 3) | apply(lambda x: x+2, 0) | deref()

You can also use this as a decorator, like this:

@apply
def f(x):
    return x**2
# returns [0, 1, 4, 9, 16]
range(5) | f | deref()

You can also add a cache, like this:

def calc(i): time.sleep(0.5); return i**2
# takes 2.5s
range(5) | repeatFrom(2) | apply(calc, cache=10) | deref()
# takes 5s
range(5) | repeatFrom(2) | apply(calc) | deref()
Parameters
  • column – if not None, then applies the function to that column only

  • cache – if specified, then caches this much number of values

__ror__(it: Iterator[str])[source]
__invert__()[source]

Same mechanism as in applyS, it expands the arguments out. Just for convenience really. Example:

# returns [10, 12, 14, 16, 18]
[range(5), range(10, 15)] | transpose() | ~apply(lambda x, y: x+y) | deref()
class k1lib.cli.modifier.applyMp(f: Callable[[T], T], prefetch: Optional[int] = None, timeout: float = 8, utilization: float = 0.8, bs: int = 1, **kwargs)[source]

Bases: k1lib.cli.init.BaseCli

__init__(f: Callable[[T], T], prefetch: Optional[int] = None, timeout: float = 8, utilization: float = 0.8, bs: int = 1, **kwargs)[source]

Like apply, but execute f(row) of each row in multiple processes. Example:

# returns [3, 2]
["abc", "de"] | applyMp(lambda s: len(s)) | deref()
# returns [5, 6, 9]
range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | deref()

# returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work
someList = [1, 2, 3]
["abc", "de"] | applyMp(lambda s: someList) | deref()

Internally, this will continuously spawn new jobs up until 80% of all CPU cores are utilized. On posix systems, the default multiprocessing start method is fork(). This sort of means that all the variables in memory will be copied over. This might be expensive (might also not, with copy-on-write), so you might have to think about that. On windows and macos, the default start method is spawn, meaning each child process is a completely new interpreter, so you have to pass in all required variables and reimport every dependencies. Read more at https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

If you don’t wish to schedule all jobs at once, you can specify a prefetch amount, and it will only schedule that much jobs ahead of time. Example:

range(10000) | applyMp(lambda x: x**2)    | head() | deref() # 700ms
range(10000) | applyMp(lambda x: x**2, 5) | head() | deref() # 300ms

# demonstrating there're no huge penalties even if we want all results at the same time
range(10000) | applyMp(lambda x: x**2)    | deref() # 900ms
range(10000) | applyMp(lambda x: x**2, 5) | deref() # 1000ms

The first line will schedule all jobs at once, and thus will require more RAM and compute power, even though we discard most of the results anyway (the head cli). The second line only schedules 5 jobs ahead of time, and thus will be extremely more efficient if you don’t need all results right away.

Note

Remember that every BaseCli is also a function, meaning that you can do stuff like:

# returns [['ab', 'ac']]
[["ab", "cd", "ac"]] | applyMp(filt(op().startswith("a")) | deref()) | deref()

Also remember that the return result of f should be serializable, meaning it should not be a generator. That’s why in the example above, there’s a deref() inside f. You should also convert PyTorch tensors into Numpy arrays

Most of the time, you would probably want to specify bs to something bigger than 1 (may be 32 or sth like that). This will executes f multiple times in a single job, instead of executing f only once per job. Should reduce overhead of process creation dramatically.

If you encounter strange errors not seen on apply, you can try to clear all pools (using clearPools()), to terminate all child processes and thus free resources. On earlier versions, you have to do this manually before exiting, but now applyMp is much more robust.

Also, you should not immediately assume that applyMp will always be faster than apply. Remember that applyMp will create new processes, serialize and transfer data to them, execute it, then transfer data back. If your code transfers a lot of data back and forth (compared to the amount of computation done), or the child processes don’t have a lot of stuff to do before returning, it may very well be a lot slower than apply.

There’s a potential loophole here that can make your code faster. Because the main process is forked (at least on linux), every variable is still there, even the big ones. So, you can potentially do something like this:

bigData = [] # 1B items in the list
# summing up all items together. No input data transfers (because it's forked instead)
range(1_000_000_000) | batched(100) | applyMp(lambda r: r | apply(lambda i: bigData[i]) | toSum()) | toSum()
Parameters
  • prefetch – if not specified, schedules all jobs at the same time. If specified, schedules jobs so that there’ll only be a specified amount of jobs, and will only schedule more if results are actually being used.

  • timeout – seconds to wait for job before raising an error

  • utilization – how many percent cores are we running? 0 for no cores, 1 for all the cores. Defaulted to 0.8

  • bs – if specified, groups bs number of transforms into 1 job to be more efficient.

  • kwargs – extra arguments to be passed to the function. args not included as there’re a couple of options you can pass for this cli.

__ror__(it: Iterator[T])Iterator[T][source]
static clearPools()[source]

Terminate all existing pools. Do this before restarting/quitting the script/notebook to make sure all resources (like GPU) are freed. Update: you probably won’t have to call this manually anymore since version 0.9, but if you run into problems, try doing this.

static pools()[source]

Get set of all pools. Meant for debugging purposes only.

k1lib.cli.modifier.parallel

alias of k1lib.cli.modifier.applyMp

class k1lib.cli.modifier.applyTh(f, prefetch: int = 2, timeout: float = 5, bs: int = 1)[source]

Bases: k1lib.cli.init.BaseCli

__init__(f, prefetch: int = 2, timeout: float = 5, bs: int = 1)[source]

Kinda like the same as applyMp, but executes f on multiple threads, instead of on multiple processes. Advantages:

  • Relatively low overhead for thread creation

  • Fast, if f is io-bound

  • Does not have to serialize and deserialize the result, meaning iterators can be exchanged

Disadvantages:

  • Still has thread creation overhead, so it’s still recommended to specify bs

  • Is slow if f has to obtain the GIL to be able to do anything

All examples from applyMp should work perfectly here.

__ror__(it)[source]
class k1lib.cli.modifier.applySerial(f, includeFirst=False)[source]

Bases: k1lib.cli.init.BaseCli

__init__(f, includeFirst=False)[source]

Applies a function repeatedly. First yields input iterator x. Then yields f(x), then f(f(x)), then f(f(f(x))) and so on. Example:

# returns [4, 8, 16, 32, 64]
2 | applySerial(op()*2) | head(5) | deref()

If the result of your operation is an iterator, you might want to deref it, like this:

rs = iter(range(8)) | applySerial(rows()[::2])
# returns [0, 2, 4, 6]
next(rs) | deref()
# returns []. This is because all the elements are taken by the previous deref()
next(rs) | deref()
# returns [[10, -6], [4, 16], [20, -12]]
[2, 8] | ~applySerial(lambda a, b: (a + b, a - b)) | head(3) | deref()

rs = iter(range(8)) | applySerial(rows()[::2] | deref())
# returns [0, 2, 4, 6]
next(rs)
# returns [0, 4]
next(rs)
# returns [0]
next(rs)
Parameters
  • f – function to apply repeatedly

  • includeFirst – whether to include the raw input value or not

__ror__(it)[source]
__invert__()[source]
class k1lib.cli.modifier.sort(column: int = 0, numeric=True, reverse=False)[source]

Bases: k1lib.cli.init.BaseCli

__init__(column: int = 0, numeric=True, reverse=False)[source]

Sorts all lines based on a specific column. Example:

# returns [[5, 'a'], [1, 'b']]
[[1, "b"], [5, "a"]] | ~sort(0) | deref()
# returns [[2, 3]]
[[1, "b"], [5, "a"], [2, 3]] | ~sort(1) | deref()
# errors out, as you can't really compare str with int
[[1, "b"], [2, 3], [5, "a"]] | sort(1, False) | deref()
# returns [-1, 2, 3, 5, 8]
[2, 5, 3, -1, 8] | sort(None) | deref()
Parameters
  • column – if None, sort rows based on themselves and not an element

  • numeric – whether to convert column to float

  • reverse – False for smaller to bigger, True for bigger to smaller. Use __invert__() to quickly reverse the order instead of using this param

__ror__(it: Iterator[str])[source]
__invert__()[source]

Creates a clone that has the opposite sort order

class k1lib.cli.modifier.sortF(f: Callable[[T], float], column: Optional[int] = None, reverse=False)[source]

Bases: k1lib.cli.init.BaseCli

__init__(f: Callable[[T], float], column: Optional[int] = None, reverse=False)[source]

Sorts rows using a function. Example:

# returns ['a', 'aa', 'aaa', 'aaaa', 'aaaaa']
["a", "aaa", "aaaaa", "aa", "aaaa"] | sortF(lambda r: len(r)) | deref()
# returns ['aaaaa', 'aaaa', 'aaa', 'aa', 'a']
["a", "aaa", "aaaaa", "aa", "aaaa"] | ~sortF(lambda r: len(r)) | deref()
__ror__(it: Iterator[T])Iterator[T][source]
__invert__()k1lib.cli.modifier.sortF[source]
class k1lib.cli.modifier.consume(f: Union[k1lib.cli.init.BaseCli, Callable[[T], None]])[source]

Bases: k1lib.cli.init.BaseCli

__init__(f: Union[k1lib.cli.init.BaseCli, Callable[[T], None]])[source]

Consumes the iterator in a side stream. Returns the iterator. Kinda like the bash command tee. Example:

# prints "0\n1\n2" and returns [0, 1, 2]
range(3) | consume(headOut()) | toList()
# prints "range(0, 3)" and returns [0, 1, 2]
range(3) | consume(lambda it: print(it)) | toList()

This is useful whenever you want to mutate something, but don’t want to include the function result into the main stream.

See also: tee

__ror__(it: T)T[source]
class k1lib.cli.modifier.randomize(bs=100, seed=None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(bs=100, seed=None)[source]

Randomize input stream. In order to be efficient, this does not convert the input iterator to a giant list and yield random values from that. Instead, this fetches bs items at a time, randomizes them, returns and fetch another bs items. If you want to do the giant list, then just pass in float("inf"), or None. Example:

# returns [0, 1, 2, 3, 4], effectively no randomize at all
range(5) | randomize(1) | deref()
# returns something like this: [1, 0, 2, 3, 5, 4, 6, 8, 7, 9]. You can clearly see the batches
range(10) | randomize(3) | deref()
# returns something like this: [7, 0, 5, 2, 4, 9, 6, 3, 1, 8]
range(10) | randomize(float("inf")) | deref()
# same as above
range(10) | randomize(None) | deref()
# returns True, as the seed is the same
range(10) | randomize(seed=4) | deref() == range(10) | randomize(seed=4) | deref()
__ror__(it: Iterator[T])Iterator[T][source]
class k1lib.cli.modifier.stagger(every: int)[source]

Bases: k1lib.cli.init.BaseCli

__init__(every: int)[source]

Staggers input stream into multiple stream “windows” placed serially. Best explained with an example:

o = range(10) | stagger(3)
o | deref() # returns [0, 1, 2], 1st "window"
o | deref() # returns [3, 4, 5], 2nd "window"
o | deref() # returns [6, 7, 8]
o | deref() # returns [9]
o | deref() # returns []

This might be useful when you’re constructing a data loader:

dataset = [range(20), range(30, 50)] | transpose()
dl = dataset | batched(3) | (transpose() | toTensor()).all() | stagger(4)
for epoch in range(3):
    for xb, yb in dl: # looping over a window
        print(epoch)
        # then something like: model(xb)

The above code will print 6 lines. 4 of them is “0” (because we stagger every 4 batches), and xb’s shape’ will be (3,) (because we batched every 3 samples).

You should also keep in mind that this doesn’t really change the property of the stream itself. Essentially, treat these pairs of statement as being the same thing:

o = range(11, 100)

# both returns 11
o | stagger(20) | item()
o | item()

# both returns [11, 12, ..., 20]
o | head(10) | deref()
o | stagger(20) | head(10) | deref()

Lastly, multiple iterators might be getting values from the same stream window, meaning:

o = range(11, 100) | stagger(10)
it1 = iter(o); it2 = iter(o)
next(it1) # returns 11
next(it2) # returns 12

This may or may not be desirable. Also this should be obvious, but I want to mention this in case it’s not clear to you.

__ror__(it: Iterator[T])k1lib.cli.modifier.StaggeredStream[source]
static tv(every: int, ratio: float = 0.8)[source]

Convenience method to quickly stagger train and valid datasets. Example:

# returns [[16], [4]]
[range(100)]*2 | stagger.tv(20) | shape().all() | deref()
class k1lib.cli.modifier.op[source]

Bases: k1lib._baseClasses.Absorber, k1lib.cli.init.BaseCli

__init__()[source]

Absorbs operations done on it and applies it on the stream. Based on Absorber. Example:

t = torch.tensor([[1, 2, 3], [4, 5, 6.0]])
# returns [torch.tensor([[4., 5., 6., 7., 8., 9.]])]
[t] | (op() + 3).view(1, -1).all() | deref()

Basically, you can treat op() as the input tensor. Tbh, you can do the same thing with this:

[t] | applyS(lambda t: (t+3).view(-1, 1)).all() | deref()

But that’s kinda long and may not be obvious. This can be surprisingly resilient, as you can still combine with other cli tools as usual, for example:

# returns [2, 3], demonstrating "&" operator
torch.randn(2, 3) | (op().shape & iden()) | deref() | item()

a = torch.tensor([[1, 2, 3], [7, 8, 9]])
# returns torch.tensor([4, 5, 6]), demonstrating "+" operator for clis and not clis
(a | op() + 3 + iden() | item() == torch.tensor([4, 5, 6])).all()

# returns [[3], [3]], demonstrating .all() and "|" serial chaining
torch.randn(2, 3) | (op().shape.all() | deref())

# returns [[8, 18], [9, 19]], demonstrating you can treat `op()` as a regular function
[range(10), range(10, 20)] | transpose() | filt(op() > 7, 0) | deref()

# returns [3, 4, 5, 6, 7, 8, 9], demonstrating bounds comparison
range(100) | filt(3 <= op() < 10) | deref()

This can only deal with simple operations only. For complex operations, resort to the longer version applyS(lambda x: ...) instead!

There are also operations that are difficult to achieve, like len(op()), as Python is expecting an integer output, so op() can’t exactly take over. Instead, you have to use applyS, or do op().ab_len(). Get a list of all of these special operations in the source of Absorber.

Performance-wise, in most cases, there are no degradation, so don’t worry about it. Everything is pretty much on par with native lambdas:

n = 10_000_000
# takes 1.48s
for i in range(n): i**2
# takes 1.89s, 1.28x worse than for loop
range(n) | apply(lambda x: x**2) | ignore()
# takes 1.86s, 1.26x worse than for loop
range(n) | apply(op()**2) | ignore()
# takes 1.86s
range(n) | (op()**2).all() | ignore()

More complex operations still retains the same speeds, as there’s a JIT compiler embedded in:

# takes 2.15s
for i in range(n): (i**2-3)*0.1
# takes 2.53s, 1.18x worse than for loop
range(n) | apply(lambda x: (x**2-3)*0.1) | ignore()
# takes 2.46s, 1.14x worse than for loop
range(n) | apply((op()**2-3)*0.1) | ignore()

Reserved operations that are not absorbed are:

  • all

  • __ror__ (__or__ still works!)

  • ab_solidify

  • op_hint

static solidify(f)[source]

Static equivalent of a.ab_solidify(). Example:

f = op()**2
f = op.solidify(f)

If f is not an op, then just return it without doing anything to it

__ror__(it)[source]
op_hint(_hint)[source]

Specify output type hint

class k1lib.cli.modifier.integrate(dt=1)[source]

Bases: k1lib.cli.init.BaseCli

__init__(dt=1)[source]

Integrates the input. Example:

# returns [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]
range(10) | integrate() | deref()
# returns [0, 2, 6, 12, 20, 30, 42, 56, 72, 90]
range(10) | integrate(2) | deref()
Parameters

dt – Optional small step

__ror__(it)[source]

nb module

This is for everything related to ipython notebooks. Expected to use behind the “nb” module name, like this:

from k1lib.imports import *
nb.execute("file.ipynb")
k1lib.cli.nb.cells(fileName, outputs=False)[source]

Gets simplified notebook cells from file source, including fields cell_type and source only. Example:

nb.cells("file.ipynb")
class k1lib.cli.nb.pretty(magics: bool = False, whitelist: List[str] = [], blacklist: List[str] = [])[source]

Bases: k1lib.cli.init.BaseCli

__init__(magics: bool = False, whitelist: List[str] = [], blacklist: List[str] = [])[source]

Makes the cells prettier. Cell 1 in file.ipynb:

#notest, export
a = 3

Cell 2 in file.ipynb:

b = 6

Code:

# only cell 2 gets chosen
nb.cells("file.ipynb") | nb.pretty(blacklist=["notest"])
# only cell 1 gets chosen
nb.cells("file.ipynb") | nb.pretty(whitelist=["export"])
Parameters
  • magics – if False, then if detected magics (‘!’, ‘%%’ symbols), then remove that line in cell’s source

  • whitelist – every cell that doesn’t have any of these properties will be filtered out

  • blacklist – every cell that has any of these properties will be filtered out

__ror__(cells)[source]
class k1lib.cli.nb.execute(fileName=None, _globals: Optional[dict] = None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(fileName=None, _globals: Optional[dict] = None)[source]

Executes cells. Example:

nb.cells("file.ipynb") | nb.execute("nb.ipynb")

Most of the time, you’d want to pass cells through pretty first, to make sure everything is nice and clean

Parameters
  • fileName – not actually used to read the file. If specified, then changes the current working directory to that of the file

  • _globals – optional dict of global variables

__ror__(cells)[source]

output module

For operations that feel like the termination

class k1lib.cli.output.stdout[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Prints out all lines. If not iterable, then print out the input raw. Example:

# prints out "0\n1\n2"
range(3) | stdout()
# same as above, but (maybe?) more familiar
range(3) > stdout()

This is rarely used alone. It’s more common to use headOut() for list of items, and display() for tables.

__ror__(it: Iterator[str])[source]
class k1lib.cli.output.tee(f=<function <lambda>>, s=None, every=1)[source]

Bases: k1lib.cli.init.BaseCli

__init__(f=<function <lambda>>, s=None, every=1)[source]

Like the Linux tee command, this prints the elements to another specified stream, while yielding the elements. Example:

# prints "0\n1\n2\n3\n4\n" and returns [0, 1, 4, 9, 16]
range(5) | tee() | apply(op() ** 2) | deref()

See also: consume

Parameters
  • f – element transform function. Defaults to just adding a new line at the end

  • s – stream to write to. Defaults to sys.stdout

  • every – only prints out 1 line in every lines, to limit print rate

__ror__(it)[source]
cr()[source]

Tee, but replaces the previous line. “cr” stands for carriage return. Example:

# prints "4" and returns [0, 1, 4, 9, 16]. Does print all the numbers in the middle, but is overriden
range(5) | tee().cr() | apply(op() ** 2) | deref()
crt()[source]

Like tee.cr(), but includes an elapsed time text at the end. Example:

range(5) | tee().cr() | apply(op() ** 2) | deref()
class k1lib.cli.output.file(fileName: Optional[str] = None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(fileName: Optional[str] = None)[source]

Opens a new file for writing. This will iterate through the iterator fed to it and put each element on a separate line. Example:

# writes "0\n1\n2\n" to file
range(3) | file("test/f.txt")
# same as above, but (maybe?) more familiar
range(3) > file("text/f.txt")
# returns ['0', '1', '2']
cat("folder/f.txt") | deref()

If the input is a string, then it will just put the string into the file and does not iterate through the string:

# writes "some text\n123" to file, default iterator mode like above
["some text", "123"] | file("test/f.txt")
# same as above, but this is a special case when it detects you're piping in a string
"some text\n123" | file("test/f.txt")

If the input is a bytes object, then it will open the file in binary mode and dumps the bytes in:

# writes bytes to file
b'5643' | file("test/a.bin")
# returns ['5643']
cat("test/a.bin") | deref()

You can create temporary files on the fly by not specifying a file name:

# creates temporary file
url = range(3) > file()
# returns ['0', '1', '2']
cat(url) | deref()

This can be especially useful when integrating with shell scripts that wants to read in a file:

seq1 = "CCAAACCCCCCCTCCCCCGCTTC"
seq2 = "CCAAACCCCCCCCTCCCCCCGCTTC"
# use "needle" program to locally align 2 sequences
None | cmd(f"needle {seq1 > file()} {seq2 > file()} -filter")

You can also append to file with the “>>” operator:

url = range(3) > file()
# appended to file
range(10, 13) >> file(url)
# returns ['0', '1', '2', '10', '11', '12']
cat(url) | deref()
Parameters

fileName – if not specified, create new temporary file and returns the url when pipes into it

__ror__(it: Iterator[str])None[source]
property name

File name of this file

class k1lib.cli.output.pretty(delim='')[source]

Bases: k1lib.cli.init.BaseCli

__init__(delim='')[source]

Pretty prints a table. Not really used directly. Example:

# These 2 statements are pretty much the same
[range(10), range(10)] | head(5) | pretty() > stdout()
[range(10), range(10)] | display()
__ror__(it: k1lib.cli.init.Table[Any])Iterator[str][source]
k1lib.cli.output.display(lines: int = 10)[source]

Convenience method for displaying a table. Pretty much equivalent to head() | pretty() | stdout().

See also: pretty

k1lib.cli.output.headOut(lines: int = 10)[source]

Convenience method for head() | stdout()

class k1lib.cli.output.intercept(raiseError: bool = True)[source]

Bases: k1lib.cli.init.BaseCli

__init__(raiseError: bool = True)[source]

Intercept flow at a particular point, analyze the object piped in, and raises error to stop flow. Example:

3 | intercept()
Parameters

raiseError – whether to raise error when executed or not.

__ror__(s)[source]
class k1lib.cli.output.split(n=10, baseFolder='/tmp')[source]

Bases: k1lib.cli.init.BaseCli

__init__(n=10, baseFolder='/tmp')[source]

Splits a large file into multiple fragments, and returns the path to those files. Example:

# returns a list of 20 files
"big-file.csv" | split(20)

This uses the underlying split linux cli tool. This also means that it’s not guaranteed to work on macos or windows.

Overtime, there will be lots of split files after a session, so be sure to clean them up to reduce disk size:

split.clear()
Parameters
  • n – Number of files to split into

  • baseFolder – Base folder where all the splitted files are

__ror__(file)[source]
static clear(baseFolder='/tmp')[source]

Clears all splitted temporary files.

sam module

This is for functions that are .sam or .bam related

k1lib.cli.sam.cat(bamFile: Optional[str] = None, header: bool = True)[source]

Get sam file outputs from bam file. Example:

sam.cat("file.bam") | display()
"file.bam" | sam.cat(header=False) | display()
Parameters

header – whether to include headers or not

class k1lib.cli.sam.header(long=True)[source]

Bases: k1lib.cli.init.BaseCli

__init__(long=True)[source]

Adds a header to the table. Example:

sam.cat("file.bam") | sam.header() | display()

You can change the header labels like this:

settings.cli.sam.header.long = ["Query template name", ...]
Parameters

long – whether to use a long descriptive header, or a short one

__ror__(it)[source]
class k1lib.cli.sam.flag(f=None)[source]

Bases: k1lib.cli.utils.bindec

__init__(f=None)[source]

Decodes flags attribute. Example:

# returns ['PAIRED', 'UNMAP']
5 | flag()
# returns 'PAIRED, UNMAP'
5 | flag(cli.join(", "))

You’ll mostly use this in this format:

sam.cat("file.bam", False) | apply(sam.flag(), 1) | display()

You can change the flag labels like this:

settings.cli.sam.flags = ["paired", ...]
Parameters

f – transform function fed into bindec, defaulted to join(“, “)

structural module

This is for functions that sort of changes the table structure in a dramatic way. They’re the core transformations

class k1lib.cli.structural.joinStreamsRandom[source]
__init__()[source]

Join multiple streams randomly. If any streams runs out, then quits. If any stream yields yieldT, then just ignores that result and continue. Could be useful in active learning. Example:

# could return [0, 1, 10, 2, 11, 12, 13, ...], with max length 20, typical length 18
[range(0, 10), range(10, 20)] | joinStreamsRandom() | deref()

stream2 = [[-5, yieldT, -4, -3], yieldT | repeat()] | joinStreams()
# could return [-5, -4, 0, -3, 1, 2, 3, 4, 5, 6], demonstrating yieldT
[range(7), stream2] | joinStreamsRandom() | deref()
__ror__(streams: Iterator[Iterator[T]])Iterator[T][source]
class k1lib.cli.structural.transpose(dim1: int = 0, dim2: int = 1, fill=None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(dim1: int = 0, dim2: int = 1, fill=None)[source]

Join multiple columns and loop through all rows. Aka transpose. Example:

# returns [[1, 4], [2, 5], [3, 6]]
[[1, 2, 3], [4, 5, 6]] | transpose() | deref()
# returns [[1, 4], [2, 5], [3, 6], [0, 7]]
[[1, 2, 3], [4, 5, 6, 7]] | transpose(fill=0) | deref()

Multidimensional transpose works just like torch.transpose() too:

# returns (2, 7, 5, 3), but detected Tensor, so it will use builtin :meth:`torch.transpose`
torch.randn(2, 3, 5, 7) | transpose(3, 1) | shape()
# also returns (2, 7, 5, 3), but actually does every required computation. Can be slow if shape is huge
torch.randn(2, 3, 5, 7) | deref(igT=False) | transpose(3, 1) | shape()

Can also work with numpy arrays:

# returns (5, 3, 2)
np.random.randn(2, 3, 5) | transpose(0, 2) | op().shape

Be careful with infinite streams, as transposing stream of shape (inf, 5) will hang this operation! Either don’t do it, or temporarily limit all infinite streams like this:

with settings.cli.context(inf=21):
    # returns (3, 21)
    [2, 1, 3] | repeat() | transpose() | shape()

Also be careful with empty streams, as you might not get any results at all:

# returns [], as the last stream has no elements
[[1, 2], [3, 4], []] | transpose() | deref()
# returns [[1, 3, 0], [2, 4, 0]]
[[1, 2], [3, 4], []] | transpose(fill=0) | deref()
Parameters

fill – if not None, then will try to zip longest with this fill value

__ror__(it: Iterator[Iterator[T]])k1lib.cli.init.Table[T][source]
static fill(fill='', dim1: int = 0, dim2: int = 1)[source]

Convenience method to fill in missing elements of a table. Example:

# returns [[1, 2, 3], [4, 5, 0]]
[[1, 2, 3], [4, 5]] | transpose.fill(0) | deref()
# also returns [[1, 2, 3], [4, 5, 0]], demonstrating how it works underneath
[[1, 2, 3], [4, 5]] | transpose(fill=0) | transpose(fill=0) | deref()
static wrap(f, dim1: int = 0, dim2: int = 1, fill=None)[source]

Wraps f around 2 transpose, can be useful in combination with k1lib.cli.init.mtmS. Example:

# returns [[1, 4, 3, 4], [8, 81, 10, 11]]
[range(1, 5), range(8, 12)] | transpose.wrap(mtmS.f(apply(op()**2), 1)) | deref()
# also returns [[1, 4, 3, 4], [8, 81, 10, 11]], demonstrating the typical way to do this
[range(1, 5), range(8, 12)] | apply(op()**2, 1) | deref()

The example given is sort of to demonstrate this only. Most of the time, just use apply with columns instead. But sometimes you need direct access to a column, so this is how you can do it.

class k1lib.cli.structural.reshape(*dims)[source]

Bases: k1lib.cli.init.BaseCli

__init__(*dims)[source]

Reshapes the input stream into the desired shape. Example:

# returns [[0, 1, 2], [3, 4, 5]]
range(6) | reshape(2, 3) | deref()
# returns [[0, 1], [2, 3], [4, 5]]
range(6) | reshape(3, 2) | deref()
# returns [[0, 1], [2, 3], [4, 5]], stopped early
range(100) | reshape(3, 2) | deref()
# returns [[0, 1, 2], [3, 4, 5]], can leave out first dimension
range(6) | reshape(-1, 3) | deref()
# returns [[0, 1, 2]], won't include 2nd element, as it ran out of elements
range(5) | reshape(-1, 3) | deref()
# throws error, as it ran out of elements and can't fulfill the request
range(6) | reshape(3, 3) | deref()

Unlike torch.reshape(), the input piped into this has to be a simple iterator. If you have a complex data structure with multiple dimensions, turn that into a simple iterator with joinStreams first, like this:

# returns [[[0, 1, 2]], [[3, 4, 5]]]
[[[0], [1]], [[2], [3]], [[4], [5]]] | joinStreams(2) | reshape(2, 1, 3) | deref()
__ror__(it)[source]
class k1lib.cli.structural.insert(*elements, begin=True)[source]

Bases: k1lib.cli.init.BaseCli

__init__(*elements, begin=True)[source]

Join element into list. Example:

# returns [5, 2, 6, 8]
[5, [2, 6, 8]] | insert() | deref()
# returns [5, 2, 6, 8]
[2, 6, 8] | insert(5) | deref()
# returns [2, 6, 8, 5]
[2, 6, 8] | insert(5, begin=False) | deref()

# returns [[3, 1], 2, 6, 8]
[2, 6, 8] | insert([3, 1]) | deref()
# returns [[3, 1], 2, 6, 8]
[2, 6, 8] | ~insert(3, 1) | deref()
# returns [[3, 1], 2, 6, 8]
[[3, 1], [2, 6, 8]] | ~insert() | deref()
Parameters

element – the element to insert. If None, then takes the input [e, […]], else takes the input […] as usual

__ror__(it: Tuple[T, Iterator[T]])Iterator[T][source]
__invert__()[source]
class k1lib.cli.structural.splitW(*weights: List[float])[source]

Bases: k1lib.cli.init.BaseCli

__init__(*weights: List[float])[source]

Splits elements into multiple weighted lists. If no weights are provided, then automatically defaults to [0.8, 0.2]. Example:

# returns [[0, 1, 2, 3, 4, 5, 6, 7], [8, 9]]
range(10) | splitW(0.8, 0.2) | deref()
# same as the above
range(10) | splitW() | deref()
__ror__(it)[source]
class k1lib.cli.structural.joinStreams(dims=1)[source]

Bases: k1lib.cli.init.BaseCli

__init__(dims=1)[source]

Joins multiple streams. Example:

# returns [1, 2, 3, 4, 5]
[[1, 2, 3], [4, 5]] | joinStreams() | deref()
# returns [[0, 1], [2], [3, 4, 5], [6, 7, 8], [], [9, 10]]
[[[0, 1], [2], [3, 4, 5]], [[6, 7, 8], [], [9, 10]]] | joinStreams() | deref()
# returns [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[[[0, 1], [2], [3, 4, 5]], [[6, 7, 8], [], [9, 10]]] | joinStreams(2) | deref()

Sometimes, you may want to impose some dimensional structure after joining all streams together, which reshape does.

__ror__(streams: Iterator[Iterator[T]])Iterator[T][source]
class k1lib.cli.structural.activeSamples(limit: int = 100, p: float = 0.95)[source]

Bases: k1lib.cli.init.BaseCli

__init__(limit: int = 100, p: float = 0.95)[source]

Yields active learning samples. Example:

o = activeSamples()
ds = range(10) # normal dataset
ds = [o, ds] | joinStreamsRandom() # dataset with active learning capability
next(ds) # returns 0
next(ds) # returns 1
next(ds) # returns 2
o.append(20)
next(ds) # can return     3     or 20
next(ds) # can return (4 or 20) or 4

So the point of this is to be a generator of samples. You can define your dataset as a mix of active learning samples and standard samples. Whenever there’s a data point that you want to focus on, you can add it to o and it will eventially yield it.

Warning

It might not be a good idea to set param limit to higher numbers than 100. This is because, the network might still not understand a wrong sample after being shown multiple times, and will keep adding that wrong sample back in, distracting it from other samples, and reduce network’s accuracy after removing active learning from it.

If limit is low enough (from my testing, 30-100 should be fine), then old wrong samples will be kicked out, allowing for a fresh stream of wrong samples coming in, and preventing the problem above. If you found that removing active learning makes the accuracy drops dramatically, then try decreasing the limit.

Parameters
  • limit – max number of active samples. Discards samples if number of samples is over this.

  • p – probability of actually adding the samples in

append(item)[source]

Adds 1 sample.

extend(items)[source]

Adds multiple samples.

k1lib.cli.structural.table(delim: Optional[str] = None)[source]

Basically op().split(delim).all(). This exists because this is used quite a lot in bioinformatics. Example:

# returns [['a', 'bd'], ['1', '2', '3']]
["a|bd", "1|2|3"] | table("|") | deref()
class k1lib.cli.structural.batched(bs=32, includeLast=False)[source]

Bases: k1lib.cli.init.BaseCli

__init__(bs=32, includeLast=False)[source]

Batches the input stream. Example:

# returns [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
range(11) | batched(3) | deref()
# returns [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
range(11) | batched(3, True) | deref()
# returns [[0, 1, 2, 3, 4]]
range(5) | batched(float("inf"), True) | deref()
# returns []
range(5) | batched(float("inf"), False) | deref()

Can work well and fast with torch.Tensor and numpy.ndarray:

# both returns torch.Tensor of shape (2, 3, 4, 5)
torch.randn(6, 4, 5) | batched(3)
torch.randn(7, 4, 5) | batched(3)

Also, if input is a range, then to save time, a bunch of other ranges will be returned, instead of a bunch of lists, for performance:

# returns [range(0, 3), range(3, 6), range(6, 9)]
range(11) | batched(3) | toList()
__ror__(it)[source]
class k1lib.cli.structural.window(n, newList=False)[source]

Bases: k1lib.cli.init.BaseCli

__init__(n, newList=False)[source]

Slides window of size n forward and yields the windows. Example:

# returns [[0, 1, 2], [1, 2, 3], [2, 3, 4]]
range(5) | window(3) | deref()

If you are doing strange transformations to the result, like transposing it, then it might complain that the internal deque (double-ended queue) mutated during iteration. In that case, then set newList to True. It’s not True by default because multiple lists will be created, all of which needs memory allocation, which will be slower:

# takes 15ms
range(100000) | window(100) | ignore()
# takes 48ms, because of allocating lists
range(100000) | window(100) | ignore()
__ror__(it)[source]
class k1lib.cli.structural.groupBy(column: int)[source]

Bases: k1lib.cli.init.BaseCli

__init__(column: int)[source]

Groups table by some column. Example:

[[2.3, 5],
 [3.4, 2],
 [4.5, 2],
 [5.6, 5],
 [6.7, 1]] | groupBy(1) | deref()

This returns:

[[[2.3, 5],
  [5.6, 5]],
 [[3.4, 2],
  [4.5, 2]],
 [[6.7, 1]]]

Should have O(n log(n)) time complexity

See also: grep

Parameters

column – which column to group by

__ror__(it)[source]
class k1lib.cli.structural.insertColumn(column: List[T], begin=True, fill='')[source]

Bases: k1lib.cli.init.BaseCli

__init__(column: List[T], begin=True, fill='')[source]

Inserts a column at beginning or end. Example:

# returns [['a', 1, 2], ['b', 3, 4]]
[[1, 2], [3, 4]] | insertColumn(["a", "b"]) | deref()
# returns [[1, 2, 'a'], [3, 4, 'b']]
[[1, 2], [3, 4]] | insertColumn(["a", "b"], begin=False) | deref()
__ror__(it)[source]
k1lib.cli.structural.insertIdColumn(table=False, begin=True, fill='')[source]

Inserts an id column at the beginning (or end). Example:

# returns [[0, 'a', 2], [1, 'b', 4]]
[["a", 2], ["b", 4]] | insertIdColumn(True) | deref()
# returns [[0, 'a'], [1, 'b']]
"ab" | insertIdColumn()
Parameters

table – if False, then insert column to an Iterator[str], else treat input as a full fledged table

class k1lib.cli.structural.expandE(f: Callable[[T], List[T]], column: int)[source]

Bases: k1lib.cli.init.BaseCli

__init__(f: Callable[[T], List[T]], column: int)[source]

Expands table element to multiple columns. Example:

# returns [['abc', 3, -2], ['de', 2, -5]]
[["abc", -2], ["de", -5]] | expandE(lambda e: (e, len(e)), 0) | deref()
Parameters

f – Function that transforms 1 row element to multiple elements

__ror__(it)[source]
k1lib.cli.structural.unsqueeze(dim: int = 0)[source]

Unsqueeze input iterator. Example:

t = [[1, 2], [3, 4], [5, 6]]
# returns (3, 2)
t | shape()
# returns (1, 3, 2)
t | unsqueeze(0) | shape()
# returns (3, 1, 2)
t | unsqueeze(1) | shape()
# returns (3, 2, 1)
t | unsqueeze(2) | shape()

Behind the scenes, it’s really just wrapList().all(dim), but the “unsqueeze” name is a lot more familiar. Also note that the inverse operation “squeeze” is sort of item().all(dim), if you’re sure that this is desirable:

t = [[1, 2], [3, 4], [5, 6]]
# returns (3, 2)
t | unsqueeze(1) | item().all(1) | shape()
class k1lib.cli.structural.count[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Finds unique elements and returns a table with [frequency, value, percent] columns. Example:

# returns [[1, 'a', '33%'], [2, 'b', '67%']]
['a', 'b', 'b'] | count() | deref()
__ror__(it: Iterator[str])[source]
static join()[source]

Joins multiple counts together. Example:

# returns [[2, 'a', '33%'], [4, 'b', '67%']]
['a', 'b', 'b'] | repeat(2) | applyMp(count() | deref()) | count.join() | deref()

This is useful when you want to get the count of a really long list/iterator using multiple cores

class k1lib.cli.structural.permute(*permutations: List[int])[source]

Bases: k1lib.cli.init.BaseCli

__init__(*permutations: List[int])[source]

Permutes the columns. Acts kinda like torch.Tensor.permute(). Example:

# returns [['b', 'a'], ['d', 'c']]
["ab", "cd"] | permute(1, 0) | deref()
__ror__(it: Iterator[str])[source]
class k1lib.cli.structural.accumulate(columnIdx: int = 0, avg=False)[source]

Bases: k1lib.cli.init.BaseCli

__init__(columnIdx: int = 0, avg=False)[source]

Groups lines that have the same row[columnIdx], and add together all other columns, assuming they’re numbers. Example:

# returns [['a', 10.5, 9.5, 14.5], ['b', 1.1, 2.2, 3.3]]
[["a", 1.1, 2.2, 3.4],
 ["a", 1.1, 2.2, 7.8],
 ["a", 8.3, 5.1, 3.3],
 ["b", 1.1, 2.2, 3.3]] | accumulate(0) | deref()
Parameters
  • columnIdx – common column index to accumulate

  • avg – calculate average values instead of sum

__ror__(it: Iterator[str])[source]
class k1lib.cli.structural.AA_(*idxs: List[int], wraps=False)[source]

Bases: k1lib.cli.init.BaseCli

__init__(*idxs: List[int], wraps=False)[source]

Returns 2 streams, one that has the selected element, and the other the rest. Example:

# returns [5, [1, 6, 3, 7]]
[1, 5, 6, 3, 7] | AA_(1)
# returns [[5, [1, 6, 3, 7]]]
[1, 5, 6, 3, 7] | AA_(1, wraps=True)

You can also put multiple indexes through:

# returns [[1, [5, 6]], [6, [1, 5]]]
[1, 5, 6] | AA_(0, 2)

If you don’t specify anything, then all indexes will be sliced:

# returns [[1, [5, 6]], [5, [1, 6]], [6, [1, 5]]]
[1, 5, 6] | AA_()

As for why the strange name, think of this operation as “AĀ”. In statistics, say you have a set “A”, then “not A” is commonly written as A with an overline “Ā”. So “AA_” represents “AĀ”, and that it first returns the selection A.

Parameters

wraps – if True, then the first example will return [[5, [1, 6, 3, 7]]] instead, so that A has the same signature as Ā

__ror__(it: List[Any])List[List[List[Any]]][source]
class k1lib.cli.structural.peek[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Returns (firstRow, iterator). This sort of peaks at the first row, to potentially gain some insights about the internal formats. The returned iterator is not tampered. Example:

e, it = iter([[1, 2, 3], [1, 2]]) | peek()
print(e) # prints "[1, 2, 3]"
s = 0
for e in it: s += len(e)
print(s) # prints "5", or length of 2 lists

You kinda have to be careful about handling the firstRow, because you might inadvertently alter the iterator:

e, it = iter([iter(range(3)), range(4), range(2)]) | peek()
e = list(e) # e is [0, 1, 2]
list(next(it)) # supposed to be the same as `e`, but is [] instead

The example happens because you have already consumed all elements of the first row, and thus there aren’t any left when you try to call next(it).

__ror__(it: Iterator[T])Tuple[T, Iterator[T]][source]
class k1lib.cli.structural.peekF(f: Union[k1lib.cli.init.BaseCli, Callable[[T], T]])[source]

Bases: k1lib.cli.init.BaseCli

__init__(f: Union[k1lib.cli.init.BaseCli, Callable[[T], T]])[source]

Similar to peek, but will execute f(row) and return the input Iterator, which is not tampered. Example:

it = lambda: iter([[1, 2, 3], [1, 2]])
# prints "[1, 2, 3]" and returns [[1, 2, 3], [1, 2]]
it() | peekF(lambda x: print(x)) | deref()
# prints "1\n2\n3"
it() | peekF(headOut()) | deref()
__ror__(it: Iterator[T])Iterator[T][source]
class k1lib.cli.structural.repeat(limit: Optional[int] = None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(limit: Optional[int] = None)[source]

Yields a specified amount of the passed in object. If you intend to pass in an iterator, then make a list out of it first, as second copy of iterator probably won’t work as you will have used it the first time. Example:

# returns [[1, 2, 3], [1, 2, 3], [1, 2, 3]]
[1, 2, 3] | repeat(3) | toList()
Parameters

repeat – if None, then repeats indefinitely

__ror__(o: T)Iterator[T][source]
k1lib.cli.structural.repeatF(f, limit: Optional[int] = None, **kwargs)[source]

Yields a specified amount generated by a specified function. Example:

# returns [4, 4, 4]
repeatF(lambda: 4, 3) | toList()
# returns 10
repeatF(lambda: 4) | head() | shape(0)

f = lambda a: a+2
# returns [8, 8, 8]
repeatF(f, 3, a=6) | toList()
Parameters
  • limit – if None, then repeats indefinitely

  • kwargs – extra keyword arguments that you can pass into the function

See also: repeatFrom

class k1lib.cli.structural.repeatFrom(limit: Optional[int] = None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(limit: Optional[int] = None)[source]

Yields from a list. If runs out of elements, then do it again for limit times. Example:

# returns [1, 2, 3, 1, 2]
[1, 2, 3] | repeatFrom() | head(5) | deref()
# returns [1, 2, 3, 1, 2, 3]
[1, 2, 3] | repeatFrom(2) | deref()
Parameters

limit – if None, then repeats indefinitely

__ror__(it: Iterator[T])Iterator[T][source]
class k1lib.cli.structural.oneHot(col, n: int = 0, group: Optional[str] = None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(col, n: int = 0, group: Optional[str] = None)[source]

One-hot encode some column in a table. Example:

a = [
 [1, 2, "A"],
 [3, 4, "B"],
 [5, 6, "C"]]
b = [
 [7, 8, "A"],
 [9, 10, "B"],
 [11, 12, "B"]]
[*a, *b] | oneHot(2) | deref()
[*a, *b] | oneHot(2, 3, "abcd") | deref()

Last 2 statements both return this:

[[1, 2, 1, 0, 0],
 [3, 4, 0, 1, 0],
 [5, 6, 0, 0, 1],
 [7, 8, 1, 0, 0],
 [9, 10, 0, 1, 0],
 [11, 12, 0, 1, 0]]

The natural way to do this is to use with without n and group parameters. But sometimes, your one hot encoding is spreaded across multiple datasets in multiple dataloaders, and so the order and length of the encoding might not be the same, which will mess up your training process.

That’s why, you can specify group, which will share encoding information across all oneHot clis that have the same group name. If you choose to do this then you have to also specify what’s the size of the encoding, because the cli can’t really infer the size when it potentially has not seen all the data right?

Parameters
  • col – which column one hot encode and expand into

  • n – (optional) total number of different elements

  • group – (optional) group name

__ror__(it)[source]

trace module

class k1lib.cli.trace.trace(f=<k1lib.cli.utils.size object>, maxDepth=inf)[source]

Bases: k1lib.cli.trace._trace

last = None

Last instantiated trace object. Access this to view the previous (possibly nested) trace.

__init__(f=<k1lib.cli.utils.size object>, maxDepth=inf)[source]

Traces out how the data stream is transformed through complex cli tools. Example:

# returns [1, 4, 9, 16], normal command
range(1, 5) | apply(lambda x: x**2) | deref()
# traced command, will display how the shapes evolve through cli tools
range(1, 5) | trace() | apply(lambda x: x**2) | deref()

There’re a lot more instructions and code examples over the tutorial section. Go check it out!

This also works well with tOpt, and will actually display inferred type data in the graph:

range(5) | tOpt() | trace() | apply(op()**2)
Parameters

f – function to display the data stream. Defaulted to shape, and to iden if is None.

utils module

This is for all short and random quality-of-life utilities.

class k1lib.cli.utils.size(idx=None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(idx=None)[source]

Returns number of rows and columns in the input. Example:

# returns (3, 2)
[[2, 3], [4, 5, 6], [3]] | size()
# returns 3
[[2, 3], [4, 5, 6], [3]] | size(0)
# returns 2
[[2, 3], [4, 5, 6], [3]] | size(1)
# returns (2, 0)
[[], [2, 3]] | size()
# returns (3,)
[2, 3, 5] | size()
# returns 3
[2, 3, 5] | size(0)
# returns (3, 2, 2)
[[[2, 1], [0, 6, 7]], 3, 5] | size()
# returns (1, 3)
["abc"] | size()
# returns (1, 2, 3)
[torch.randn(2, 3)] | size()
# returns (2, 3, 5)
size()(np.random.randn(2, 3, 5))

There’s also lengths, which is sort of a simplified/faster version of this, but only use it if you are sure that len(it) can be called.

If encounter PyTorch tensors or Numpy arrays, then this will just get the shape instead of actually looping over them.

Parameters

idx – if idx is None return (rows, columns). If 0 or 1, then rows or columns

__ror__(it: Iterator[str])[source]
k1lib.cli.utils.shape

alias of k1lib.cli.utils.size

class k1lib.cli.utils.item(amt: int = 1, fill=<object object>)[source]

Bases: k1lib.cli.init.BaseCli

__init__(amt: int = 1, fill=<object object>)[source]

Returns the first row. Example:

# returns 0
iter(range(5)) | item()
# returns torch.Size([5])
torch.randn(3,4,5) | item(2) | shape()
# returns 3
[] | item(fill=3)
Parameters
  • amt – how many times do you want to call item() back to back?

  • fill – if iterator length is 0, return this

__ror__(it: Iterator[str])[source]
class k1lib.cli.utils.iden[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Yields whatever the input is. Useful for multiple streams. Example:

# returns range(5)
range(5) | iden()
__ror__(it: Iterator[Any])[source]
class k1lib.cli.utils.join(delim: Optional[str] = None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(delim: Optional[str] = None)[source]

Merges all strings into 1, with delim in the middle. Basically str.join(). Example:

# returns '2\na'
[2, "a"] | join("\n")
__ror__(it: Iterator[str])[source]
class k1lib.cli.utils.wrapList[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Wraps inputs inside a list. There’s a more advanced cli tool built from this, which is unsqueeze().

__ror__(it: T)List[T][source]
class k1lib.cli.utils.equals[source]

Bases: object

__init__()[source]

Checks if all incoming columns/streams are identical

__ror__(streams: Iterator[Iterator[str]])[source]
class k1lib.cli.utils.reverse[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Reverses incoming list. Example:

# returns [3, 5, 2]
[2, 5, 3] | reverse() | deref()
__ror__(it: Iterator[str])List[str][source]
class k1lib.cli.utils.ignore[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Just loops through everything, ignoring the output. Example:

# will just return an iterator, and not print anything
[2, 3] | apply(lambda x: print(x))
# will prints "2\n3"
[2, 3] | apply(lambda x: print(x)) | ignore()
__ror__(it: Iterator[Any])[source]
class k1lib.cli.utils.rateLimit(f, delay=0.1)[source]

Bases: k1lib.cli.init.BaseCli

__init__(f, delay=0.1)[source]

Limits the execution flow rate upon a condition. Example:

s = 0; semaphore = 0
def heavyAsyncOperation(i):
    global semaphore, s
    semaphore += 1
    s += i; time.sleep(1)
    semaphore -= 1; return i**2

# returns (20,), takes 1s to run
range(20) | applyTh(heavyAsyncOperation, 100) | shape()
# returns (20,), takes 4s to run (20/5 = 4)
range(20) | rateLimit(lambda: semaphore < 5) | applyTh(heavyAsyncOperation, 100) | shape()

The first test case is not rate-limited, so it will run all 20 threads at the same time, and all of them will finish after 1 second.

The second test case is rate-limited, so that there can only be 5 concurrently executing threads because of the semaphore count check. Therefore this takes around 4 seconds to run.

Parameters
  • f – checking function. Should return true if execution is allowed

  • delay – delay in seconds between calling f()

__ror__(it)[source]
static cpu(maxUtilization=90)[source]

Limits flow rate when cpu utilization is more than a specified percentage amount. Needs to install the package psutil to actually work. Example:

# returns [0, 1, 4, 9, 16]
range(5) | rateLimit.cpu() | apply(op()**2) | deref()
class k1lib.cli.utils.timeLimit(t)[source]

Bases: k1lib.cli.init.BaseCli

__init__(t)[source]

Caps the flow after a specified amount of time has passed. Example:

# returns 20, or roughly close to that
repeatF(lambda: time.sleep(0.1)) | timeLimit(2) | shape(0)
__ror__(it)[source]
k1lib.cli.utils.tab(pad: str = '    ')[source]

Indents incoming string iterator. Example:

# prints out indented 0 to 9
range(10) | tab() | headOut()
k1lib.cli.utils.indent(pad: str = '    ')

Indents incoming string iterator. Example:

# prints out indented 0 to 9
range(10) | tab() | headOut()
class k1lib.cli.utils.clipboard[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Saves the input to clipboard. Example:

# copies "abc" into the clipboard. Just use Ctrl+V to paste as usual
"abc" | clipboard()
__ror__(s)[source]
class k1lib.cli.utils.deref(maxDepth=inf, igT=True)[source]

Bases: k1lib.cli.init.BaseCli

__init__(maxDepth=inf, igT=True)[source]

Recursively converts any iterator into a list. Example:

# returns something like "<range_iterator at 0x7fa8c52ca870>"
iter(range(5))
# returns [0, 1, 2, 3, 4]
iter(range(5)) | deref()
# returns [2, 3], yieldT stops things early
[2, 3, yieldT, 6] | deref()

You can also specify a maxDepth:

# returns something like "<list_iterator at 0x7f810cf0fdc0>"
iter([range(3)]) | deref(0)
# returns [range(3)]
iter([range(3)]) | deref(1)
# returns [[0, 1, 2]]
iter([range(3)]) | deref(2)

There are a few classes/types that are considered atomic, and deref will never try to iterate over it. If you wish to change it, do something like:

settings.cli.atomic.deref = (int, float, ...)
Parameters
  • maxDepth – maximum depth to dereference. Starts at 0 for not doing anything at all

  • igT – short for “ignore tensor”. If True, then don’t loop over torch.Tensor and numpy.ndarray internals

__ror__(it: Iterator[T])List[T][source]
__invert__()k1lib.cli.init.BaseCli[source]

Returns a BaseCli that makes everything an iterator. Not entirely sure when this comes in handy, but it’s there.

class k1lib.cli.utils.bindec(cats: List[Any], f=None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(cats: List[Any], f=None)[source]

Binary decodes the input. Example:

# returns ['a', 'c']
5 | bindec("abcdef")
# returns 'a,c'
5 | bindec("abcdef", join(","))
Parameters
  • cats – categories

  • f – transformation function of the selected elements. Defaulted to toList, but others like join is useful too

__ror__(it)[source]
class k1lib.cli.utils.smooth(consecutives=None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(consecutives=None)[source]

Smoothes out the input stream. Literally just a shortcut for:

batched(consecutives) | toMean().all()

Example:

# returns [4.5, 14.5, 24.5]
range(30) | smooth(10) | deref()

Smoothing over torch.Tensor or numpy.ndarray will be much faster, and produce high dimensional results:

# returns torch.Tensor with shape (2, 3, 4)
torch.randn(10, 3, 4) | smooth(4)

The default consecutive value is in settings.cli.smooth. This is useful if you are smoothing over multiple lists at the same time, like this:

# can change a single smooth value temporarily here, and all sequences will be smoothed in the same way
with settings.cli.context(smooth=5):
    x = list(np.linspace(-2, 2, 50))
    y = x | apply(op()**2) | deref()
    plt.plot(x | smooth() | deref(), y | smooth() | deref())
Parameters

consecutives – if not defined, then used the value inside settings.cli.smooth

__ror__(it)[source]
k1lib.cli.utils.disassemble(f=None)[source]

Disassembles anything piped into it. Normal usage:

def f(a, b):
    return a**2 + b
# both of these print out disassembled info
f | disassemble()
disassemble(f)

# you can pass in lambdas
disassemble(lambda x: x + 3)

# or even raw code
"lambda x: x + 3" | disassemble()
k1lib.cli.utils.tree(fL=10, dL=10, depth=inf, ff: Callable[[str], bool] = <function <lambda>>, df: Callable[[str], bool] = <function <lambda>>)[source]

Recursively gets all files and folders. Output format might be a bit strange, so this is mainly for visualization. Example:

"." | tree() | deref()
Parameters
  • fL – max number of file per directory included in output

  • dL – max number of child directories per directory included in output

  • depth – explore depth

  • ff – optional file filter function

  • df – optional directory filter function

class k1lib.cli.utils.lookup(d: dict, col: Optional[int] = None, fill=None)[source]

Bases: k1lib.cli.init.BaseCli

__init__(d: dict, col: Optional[int] = None, fill=None)[source]

Looks up items from a dictionary/object. Example:

d = {"a": 3, "b": 5, "c": 52}
# returns [3, 5, 52, 52, 3]
"abcca" | lookup(d) | deref()

# returns [[0, 3], [1, 5], [2, 52], [3, 52], [4, 3]]
[range(5), "abcca"] | transpose() | lookup(d, 1) | deref()
Parameters
  • d – any object that can be sliced with the inputs

  • col – if None, lookup on each row, else lookup a specific column only

  • fill – if None, throws error if looked up element is not available, else returns the fill value

__ror__(it)[source]
class k1lib.cli.utils.dictFields(*fields, default='')[source]

Bases: k1lib.cli.init.BaseCli

__init__(*fields, default='')[source]

Grab a bunch of dictionary fields. Example:

# returns [3, 1, '']
{"a": 1, "b": 2, "c": 3} | dictFields("c", "a", "d")
__ror__(d)[source]

typehint module

Lots of type hint mechanisms to be used by the LLVM optimizer

class k1lib.cli.typehint.tBase(child=<class 'NoneType'>)[source]

Bases: object

check(v)[source]

Checks whether a specific object adhears to this type hint or not. Returns yieldT if object does not adhere. If it does, then return the object.

Note that in the case that the object is actually an iterator, it will return a new iterator containing all elements from the old iterator.

item()[source]

Gets the child type of this type. Basically what’s the type if it were to go through item. Example:

# returns tTensor(torch.float32, 2)
tTensor(torch.float32, 3).item()
expand(n)List[k1lib.cli.typehint.tBase][source]

Expands the type to a list with n elements. Example:

# returns [int, int, int, int]
tList(int).expand(4)
# returns [int, float, float, str]
tCollection(int, tExpand(float), str).expand(4)
class k1lib.cli.typehint.tAny[source]

Bases: k1lib.cli.typehint.tBase

check(v)[source]

Checks whether a specific object adhears to this type hint or not. Returns yieldT if object does not adhere. If it does, then return the object.

Note that in the case that the object is actually an iterator, it will return a new iterator containing all elements from the old iterator.

item()[source]

Gets the child type of this type. Basically what’s the type if it were to go through item. Example:

# returns tTensor(torch.float32, 2)
tTensor(torch.float32, 3).item()
class k1lib.cli.typehint.tList(child=<class 'NoneType'>)[source]

Bases: k1lib.cli.typehint.tBase

check(v)[source]

Checks whether a specific object adhears to this type hint or not. Returns yieldT if object does not adhere. If it does, then return the object.

Note that in the case that the object is actually an iterator, it will return a new iterator containing all elements from the old iterator.

class k1lib.cli.typehint.tIter(child=<class 'NoneType'>)[source]

Bases: k1lib.cli.typehint.tBase

check(v)[source]

Checks whether a specific object adhears to this type hint or not. Returns yieldT if object does not adhere. If it does, then return the object.

Note that in the case that the object is actually an iterator, it will return a new iterator containing all elements from the old iterator.

class k1lib.cli.typehint.tSet(child=<class 'NoneType'>)[source]

Bases: k1lib.cli.typehint.tBase

check(v)[source]

Checks whether a specific object adhears to this type hint or not. Returns yieldT if object does not adhere. If it does, then return the object.

Note that in the case that the object is actually an iterator, it will return a new iterator containing all elements from the old iterator.

class k1lib.cli.typehint.tCollection(*children)[source]

Bases: k1lib.cli.typehint.tBase

__init__(*children)[source]

Fixed-length collection of things. Let’s say you want a tuple with 5 values:

a = [3, [2, 3], "e", 2.0, b'3']

Then, this would be represented like this:

tCollection(int, tList(int), str, float, bytes)

This also works in conjunction with tExpand, like this:

a = [3, [2, 3], "e", 2.0, 3.0]
tCollection(int, tList(int), str, tExpand(float))
check(v)[source]

Checks whether a specific object adhears to this type hint or not. Returns yieldT if object does not adhere. If it does, then return the object.

Note that in the case that the object is actually an iterator, it will return a new iterator containing all elements from the old iterator.

reduce()[source]

Tries to reduce tCollection(int, int) to tIter(int) if possible

item()[source]

Gets the child type of this type. Basically what’s the type if it were to go through item. Example:

# returns tTensor(torch.float32, 2)
tTensor(torch.float32, 3).item()
expand(n: int)List[k1lib.cli.typehint.tBase][source]

Expands out this collection so that it has a specified length

class k1lib.cli.typehint.tExpand(child)[source]

Bases: k1lib.cli.typehint.tBase

__init__(child)[source]

Supplement to tCollection

check(v)[source]

Checks whether a specific object adhears to this type hint or not. Returns yieldT if object does not adhere. If it does, then return the object.

Note that in the case that the object is actually an iterator, it will return a new iterator containing all elements from the old iterator.

class k1lib.cli.typehint.tNpArray(child=None, rank=None)[source]

Bases: k1lib.cli.typehint.tBase

__init__(child=None, rank=None)[source]

Numpy array type. Example:

# returns np.array([2, 3])
tNpArray(np.int64, 1).check(np.array([2, 3]))
Parameters
  • child – the dtype of the array

  • rank – the rank/dimension of the array

check(v)[source]

Checks whether a specific object adhears to this type hint or not. Returns yieldT if object does not adhere. If it does, then return the object.

Note that in the case that the object is actually an iterator, it will return a new iterator containing all elements from the old iterator.

item()[source]

Gets the child type of this type. Basically what’s the type if it were to go through item. Example:

# returns tTensor(torch.float32, 2)
tTensor(torch.float32, 3).item()
expand(n)[source]

Expands the type to a list with n elements. Example:

# returns [int, int, int, int]
tList(int).expand(4)
# returns [int, float, float, str]
tCollection(int, tExpand(float), str).expand(4)
class k1lib.cli.typehint.tTensor(child=None, rank=None)[source]

Bases: k1lib.cli.typehint.tBase

__init__(child=None, rank=None)[source]

PyTorch tensor type. Example:

# returns torch.tensor([2.0, 3.0])
tTensor(torch.float32, 1).check(torch.tensor([2.0, 3.0]))
Parameters
  • child – the dtype of the array

  • rank – the rank/dimension of the tensor

check(v)[source]

Checks whether a specific object adhears to this type hint or not. Returns yieldT if object does not adhere. If it does, then return the object.

Note that in the case that the object is actually an iterator, it will return a new iterator containing all elements from the old iterator.

item()[source]

Gets the child type of this type. Basically what’s the type if it were to go through item. Example:

# returns tTensor(torch.float32, 2)
tTensor(torch.float32, 3).item()
expand(n)[source]

Expands the type to a list with n elements. Example:

# returns [int, int, int, int]
tList(int).expand(4)
# returns [int, float, float, str]
tCollection(int, tExpand(float), str).expand(4)
k1lib.cli.typehint.inferType(o)[source]

Tries to infer the type of the input. Example:

# returns tList(int)
inferType(range(10))
# returns tTensor(torch.float32, 2)
inferType(torch.randn(2, 3))
exception k1lib.cli.typehint.TypeHintException[source]

Bases: Exception

k1lib.cli.typehint.tLowest(*ts)[source]

Grabs the lowest possible shared type of all the example types. Example:

# returns tIter(float)
tLowest(tIter(float), tList(int))
class k1lib.cli.typehint.tCheck[source]

Bases: k1lib.cli.init.BaseCli

__init__()[source]

Tool similar to trace to check whether all type hint outputs of all clis are good or not. Example:

assert range(1, 3) | tCheck() | item() | op()*2 == 2

Mainly used in cli unit tests. Return type of statement will be tCheck, which might be undesirable, so you can pipe it to yieldT like this:

# returns tCheck object
range(1, 3) | tCheck() | item() | op()*2
# returns number "2"
range(1, 3) | tCheck() | item() | op()*2 | yieldT
__ror__(v)[source]
class k1lib.cli.typehint.tOpt[source]

Bases: k1lib.cli.init.BaseCli

n = 10
__init__()[source]

Optimizes clis. Let’s say you have something like this:

range(1000) | toList() | head() | deref()

For whatever reason you forgot that you’ve dereferenced everything in the middle, although you’re only using 10 first elements, so the code can’t be lazy anymore. You can apply optimizations to it like this:

range(1000) | tOpt() | toList() | head() | deref()

This will effectively turn it into this:

range(1000) | tOpt() | head() | deref()

Checkout the llvm optimizer tutorial <llvm.html> for a more in-depth explanation of this

More over, this combines nicely with trace like this:

range(5) | tOpt() | trace() | apply(op()**2) | deref()
static addPass(p, klasses: List[k1lib.cli.init.BaseCli] = [], abstractness=3)[source]

Adds an optimization pass that acts upon multiple clis in series. Example:

# cs: list of clis, ts: list of input type hints, 1 for each cli
def o1(cs:List[BaseCli], ts:List[tBase], metadata={}):
    return [cs[1], cs[0]] # reorder the clis
tOpt.addPass(o1, [toList, head], 3)

Here, we’re declaring an optimization pass o1. You will be given a list of cli objects, the cli’s input type hints and some extra metadata. If you can optimize it, then you should return a list of new clis, else you should return None

Also, abstractness has varying number of legal values: - 1-5: generic optimizations - 6-10: analysis passes. Passes must not return anything

Higher abstraction optimizations will be called first, and then lower abstraction optimizations will be called later. So, the idea is, just like LLVM, you can do some analysis which will compute metadata that you can use in your optimization passes, which will return optimized clis if it can.

Within optimization passes, you can prioritize optimizations that look at the global picture first, before breaking the code up into tiny fragments with more detailed optimizations, at which point it’s hard to look at the global picture.

Parameters
  • p – the optimization pass

  • klasses – list of cli classes in series that will trigger the pass

  • abstractness – how abstract is this optimization

static clearPasses()[source]

Clears all passes

property out
property optCli

Grabs the optimized cli. Example:

# returns optimized cli
(range(5) | tOpt() | apply(op()**2) | deref()).optCli
# you can also do it like this:
range(5) | tOpt() | apply(op()**2) | deref() | tOpt.optCli
__ror__(it)[source]

optimizations module

This is for optimizing the hell out of cli tools. Optimizations that focus around a specific cli should be included close to their definitions, so this is for optimizations that unusually span multiple clis, and serve as examples of how to create optimization passes.

See over the LLVM optimizer tutorial for more background.

k1lib.cli.optimizations.dummy()[source]

Does nothing. Only here so that you can read the source code

others module

This is for pretty random clis that’s scattered everywhere.

k1lib.cli.others.crissCross()[source]

Like the monkey-patched function torch.crissCross(). Example:

# returns another Tensor
[torch.randn(3, 3), torch.randn(3)] | crissCross()

Elsewhere in the library

There might still be more cli tools scattered around the library. These are pretty rare, quite dynamic and most likely a cool extra feature, not a core functionality, so not worth it/can’t mention it here. Anyway, execute this:

cli.scatteredClis()

to get a list of them.